001/* 002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 */ 017package org.nuxeo.connect.tools.report.client; 018 019import java.io.File; 020import java.io.IOException; 021import java.io.InputStream; 022import java.net.InetSocketAddress; 023import java.net.MalformedURLException; 024import java.net.ServerSocket; 025import java.net.URL; 026import java.net.URLClassLoader; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.Paths; 030import java.util.Iterator; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.concurrent.ThreadFactory; 036 037import javax.json.Json; 038import javax.json.JsonObjectBuilder; 039import javax.json.stream.JsonGenerator; 040import javax.json.stream.JsonParser; 041import javax.json.stream.JsonParsingException; 042import javax.management.JMX; 043import javax.management.MBeanServerConnection; 044import javax.management.MalformedObjectNameException; 045import javax.management.ObjectName; 046import javax.management.remote.JMXConnectorFactory; 047import javax.management.remote.JMXServiceURL; 048 049import org.apache.commons.logging.LogFactory; 050import org.nuxeo.connect.tools.report.ReportServer; 051 052import com.sun.tools.attach.AgentInitializationException; 053import com.sun.tools.attach.AgentLoadException; 054import com.sun.tools.attach.AttachNotSupportedException; 055import com.sun.tools.attach.VirtualMachine; 056import com.sun.tools.attach.VirtualMachineDescriptor; 057 058/** 059 * 060 * @since 8.3 061 */ 062public class ReportConnector { 063 064 static final ObjectName NAME = name(); 065 066 static ObjectName name() { 067 try { 068 return new ObjectName("org.nuxeo:type=service,name=connect-report"); 069 } catch (MalformedObjectNameException cause) { 070 throw new AssertionError("Cannot name report", cause); 071 } 072 } 073 074 public static ReportConnector of() { 075 return new ReportConnector(); 076 } 077 078 public JsonGenerator feed(JsonGenerator generator) throws IOException, InterruptedException, ExecutionException { 079 class Feeder implements Consumer { 080 StreamFeeder feeder = new StreamFeeder(); 081 082 @Override 083 public void consume(JsonParser stream) { 084 feeder.feed(generator, stream); 085 } 086 } 087 connect(new Feeder()); 088 return generator; 089 } 090 091 public JsonObjectBuilder feed(JsonObjectBuilder builder) throws IOException, InterruptedException, ExecutionException { 092 class Feeder implements Consumer { 093 ObjectFeeder feeder = new ObjectFeeder(); 094 095 @Override 096 public void consume(JsonParser stream) { 097 feeder.feed(builder, stream); 098 } 099 } 100 connect(new Feeder()); 101 return builder; 102 } 103 104 static class Discovery implements Iterable<ReportServer> { 105 @Override 106 public Iterator<ReportServer> iterator() { 107 return new Iterator<ReportServer>() { 108 109 final Iterator<VirtualMachineDescriptor> source = VirtualMachine.list().iterator(); 110 111 ReportServer next = fetchNext(); 112 113 ReportServer fetchNext() { 114 if (!source.hasNext()) { 115 return null; 116 } 117 while (source.hasNext()) { 118 VirtualMachineDescriptor pid = source.next(); 119 try { 120 MBeanServerConnection connection = new Management(pid).connect(); 121 if (!connection.isRegistered(NAME)) { 122 continue; 123 } 124 return JMX.newMXBeanProxy(connection, NAME, ReportServer.class); 125 } catch (IOException cause) { 126 LogFactory.getLog(Discovery.class).error("Cannot connect to " + pid, cause); 127 } 128 } 129 return null; 130 } 131 132 class Management { 133 134 Management(VirtualMachineDescriptor anIdentifier) { 135 pid = anIdentifier; 136 } 137 138 final VirtualMachineDescriptor pid; 139 140 MBeanServerConnection connect() throws IOException { 141 VirtualMachine vm; 142 try { 143 vm = pid.provider().attachVirtualMachine(pid); 144 } catch (AttachNotSupportedException cause) { 145 throw new IOException("Cannot attach to " + pid, cause); 146 } 147 try { 148 return connect(lookup(vm)); 149 } finally { 150 vm.detach(); 151 } 152 } 153 154 JMXServiceURL lookup(VirtualMachine vm) throws IOException { 155 JMXServiceURL url = lookupRemote(vm); 156 if (url != null) { 157 return url; 158 } 159 return lookupAgent(vm); 160 } 161 162 JMXServiceURL lookupRemote(VirtualMachine vm) throws IOException { 163 boolean isRemote = 164 Boolean.valueOf(vm.getSystemProperties().getProperty("com.sun.management.jmxremote", "false")).booleanValue(); 165 if (!isRemote) { 166 return null; 167 } 168 int port = Integer.valueOf(vm.getSystemProperties().getProperty("com.sun.management.jmxremote.port", "1089")).intValue(); 169 return new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxrmi", port)); 170 } 171 172 JMXServiceURL lookupAgent(VirtualMachine vm) throws IOException { 173 String address = vm.getAgentProperties().getProperty("com.sun.management.jmxremote.localConnectorAddress"); 174 if (address != null) { 175 return new JMXServiceURL(address); 176 } 177 startAgent(vm); 178 return lookupAgent(vm); 179 } 180 181 void startAgent(VirtualMachine vm) throws IOException { 182 String home = vm.getSystemProperties().getProperty("java.home"); 183 184 // Normally in 185 // ${java.home}/jre/lib/management-agent.jar but 186 // might 187 // be in ${java.home}/lib in build environments. 188 189 String agent = home + File.separator + "jre" + File.separator + 190 "lib" + File.separator + "management-agent.jar"; 191 File f = new File(agent); 192 if (!f.exists()) { 193 agent = home + File.separator + "lib" + File.separator + 194 "management-agent.jar"; 195 f = new File(agent); 196 if (!f.exists()) { 197 throw new IOException("Management agent not found"); 198 } 199 } 200 201 agent = f.getCanonicalPath(); 202 try { 203 vm.loadAgent(agent, "com.sun.management.jmxremote"); 204 } catch (AgentLoadException x) { 205 IOException ioe = new IOException(x.getMessage()); 206 ioe.initCause(x); 207 throw ioe; 208 } catch (AgentInitializationException x) { 209 IOException ioe = new IOException(x.getMessage()); 210 ioe.initCause(x); 211 throw ioe; 212 } 213 } 214 215 MBeanServerConnection connect(JMXServiceURL url) throws IOException { 216 return JMXConnectorFactory.connect(url).getMBeanServerConnection(); 217 } 218 } 219 220 @Override 221 public boolean hasNext() { 222 return next != null; 223 } 224 225 @Override 226 public ReportServer next() { 227 try { 228 return next; 229 } finally { 230 next = fetchNext(); 231 } 232 } 233 234 }; 235 } 236 } 237 238 interface Consumer { 239 void consume(JsonParser stream); 240 } 241 242 <A> void connect(Consumer consumer) throws IOException, InterruptedException, ExecutionException { 243 ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() { 244 245 @Override 246 public Thread newThread(Runnable target) { 247 Thread thread = new Thread(target, "connect-report"); 248 thread.setDaemon(true); 249 return thread; 250 } 251 }); 252 try { 253 for (ReportServer server : new Discovery()) { 254 try (ServerSocket callback = new ServerSocket(0)) { 255 final Future<?> consumed = executor.submit(new Runnable() { 256 257 @Override 258 public void run() { 259 260 String name = Thread.currentThread().getName(); 261 Thread.currentThread().setName("connect-report-consumer-" + server); 262 try (InputStream source = callback.accept().getInputStream()) { 263 consumer.consume(Json.createParser(source)); 264 } catch (IOException | JsonParsingException cause) { 265 throw new AssertionError("Cannot consume connect report", cause); 266 } finally { 267 Thread.currentThread().setName(name); 268 } 269 LogFactory.getLog(ReportConnector.class).info("Consumed " + server); 270 } 271 }); 272 final Future<?> served = executor.submit(new Runnable() { 273 274 @Override 275 public void run() { 276 String name = Thread.currentThread().getName(); 277 Thread.currentThread().setName("connect-report-server-" + server); 278 InetSocketAddress address = (InetSocketAddress) callback.getLocalSocketAddress(); 279 try { 280 server.run(address.getHostName(), address.getPort()); 281 } catch (IOException cause) { 282 throw new AssertionError("Cannot run connect report", cause); 283 } finally { 284 Thread.currentThread().setName(name); 285 } 286 } 287 288 }); 289 ExecutionException consumerError = null; 290 try { 291 consumed.get(); 292 } catch (ExecutionException cause) { 293 consumerError = cause; 294 } 295 try { 296 served.get(); 297 } catch (ExecutionException cause) { 298 if (consumerError != null) { 299 consumerError.addSuppressed(cause); 300 throw consumerError; 301 } 302 throw cause; 303 } 304 } 305 } 306 } finally { 307 executor.shutdownNow(); 308 } 309 } 310 311 public Iterable<ReportServer> discover() { 312 class ToolsRunner { 313 314 @SuppressWarnings("unchecked") 315 Iterable<ReportServer> discover() { 316 try { 317 ReportConnector.class.getClassLoader().loadClass("com.sun.tools.attach.VirtualMachine"); 318 } catch (ClassNotFoundException cause) { 319 class Loader extends URLClassLoader { 320 Loader(Path path) { 321 super(new URL[] { fileof(path) }); 322 } 323 324 @Override 325 protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { 326 if (name.equals(Discovery.class.getName())) { 327 return findClass(name); 328 } 329 return super.loadClass(name, resolve); 330 } 331 } 332 ClassLoader previous = Thread.currentThread().getContextClassLoader(); 333 ClassLoader loader = new Loader(findTools()); 334 Thread.currentThread().setContextClassLoader(loader); 335 try { 336 return (Iterable<ReportServer>) loader.loadClass(Discovery.class.getName()).newInstance(); 337 } catch (ReflectiveOperationException cause1) { 338 throw new AssertionError("Cannot discover servers", cause1); 339 } finally { 340 Thread.currentThread().setContextClassLoader(previous); 341 } 342 } 343 return new Discovery(); 344 } 345 346 URL fileof(Path path) { 347 try { 348 return new URL("file://".concat(path.toString())); 349 } catch (MalformedURLException cause) { 350 throw new AssertionError("Cannot create url for " + path, cause); 351 } 352 } 353 354 Path findTools() { 355 Path home = Paths.get(System.getProperty("java.home")); 356 for (Path path : new Path[] { 357 Paths.get("../lib/tools.jar"), 358 Paths.get("../Classes/classes.jar") 359 }) { 360 Path tools = home.resolve(path); 361 if (Files.exists(tools)) { 362 return tools; 363 } 364 } 365 throw new AssertionError("Cannot find tools in system"); 366 } 367 368 } 369 try { 370 return new ToolsRunner().discover(); 371 } catch (Exception cause) { 372 throw new AssertionError("Cannot discover servers", cause); 373 } 374 } 375 376}