001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 */
017package org.nuxeo.connect.tools.report.client;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.net.InetSocketAddress;
023import java.net.MalformedURLException;
024import java.net.ServerSocket;
025import java.net.URL;
026import java.net.URLClassLoader;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.Paths;
030import java.util.Iterator;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.ThreadFactory;
036
037import javax.json.Json;
038import javax.json.JsonObjectBuilder;
039import javax.json.stream.JsonGenerator;
040import javax.json.stream.JsonParser;
041import javax.json.stream.JsonParsingException;
042import javax.management.JMX;
043import javax.management.MBeanServerConnection;
044import javax.management.MalformedObjectNameException;
045import javax.management.ObjectName;
046import javax.management.remote.JMXConnectorFactory;
047import javax.management.remote.JMXServiceURL;
048
049import org.apache.commons.logging.LogFactory;
050import org.nuxeo.connect.tools.report.ReportServer;
051
052import com.sun.tools.attach.AgentInitializationException;
053import com.sun.tools.attach.AgentLoadException;
054import com.sun.tools.attach.AttachNotSupportedException;
055import com.sun.tools.attach.VirtualMachine;
056import com.sun.tools.attach.VirtualMachineDescriptor;
057
058/**
059 *
060 * @since 8.3
061 */
062public class ReportConnector {
063
064    static final ObjectName NAME = name();
065
066    static ObjectName name() {
067        try {
068            return new ObjectName("org.nuxeo:type=service,name=connect-report");
069        } catch (MalformedObjectNameException cause) {
070            throw new AssertionError("Cannot name report", cause);
071        }
072    }
073
074    public static ReportConnector of() {
075        return new ReportConnector();
076    }
077
078    public JsonGenerator feed(JsonGenerator generator) throws IOException, InterruptedException, ExecutionException {
079        class Feeder implements Consumer {
080            StreamFeeder feeder = new StreamFeeder();
081
082            @Override
083            public void consume(JsonParser stream) {
084                feeder.feed(generator, stream);
085            }
086        }
087        connect(new Feeder());
088        return generator;
089    }
090
091    public JsonObjectBuilder feed(JsonObjectBuilder builder) throws IOException, InterruptedException, ExecutionException {
092        class Feeder implements Consumer {
093            ObjectFeeder feeder = new ObjectFeeder();
094
095            @Override
096            public void consume(JsonParser stream) {
097                feeder.feed(builder, stream);
098            }
099        }
100        connect(new Feeder());
101        return builder;
102    }
103
104    static class Discovery implements Iterable<ReportServer> {
105        @Override
106        public Iterator<ReportServer> iterator() {
107            return new Iterator<ReportServer>() {
108
109                final Iterator<VirtualMachineDescriptor> source = VirtualMachine.list().iterator();
110
111                ReportServer next = fetchNext();
112
113                ReportServer fetchNext() {
114                    if (!source.hasNext()) {
115                        return null;
116                    }
117                    while (source.hasNext()) {
118                        VirtualMachineDescriptor pid = source.next();
119                        try {
120                            MBeanServerConnection connection = new Management(pid).connect();
121                            if (!connection.isRegistered(NAME)) {
122                                continue;
123                            }
124                            return JMX.newMXBeanProxy(connection, NAME, ReportServer.class);
125                        } catch (IOException cause) {
126                            LogFactory.getLog(Discovery.class).error("Cannot connect to " + pid, cause);
127                        }
128                    }
129                    return null;
130                }
131
132                class Management {
133
134                    Management(VirtualMachineDescriptor anIdentifier) {
135                        pid = anIdentifier;
136                    }
137
138                    final VirtualMachineDescriptor pid;
139
140                    MBeanServerConnection connect() throws IOException {
141                        VirtualMachine vm;
142                        try {
143                            vm = pid.provider().attachVirtualMachine(pid);
144                        } catch (AttachNotSupportedException cause) {
145                            throw new IOException("Cannot attach to " + pid, cause);
146                        }
147                        try {
148                            return connect(lookup(vm));
149                        } finally {
150                            vm.detach();
151                        }
152                    }
153
154                    JMXServiceURL lookup(VirtualMachine vm) throws IOException {
155                        JMXServiceURL url = lookupRemote(vm);
156                        if (url != null) {
157                            return url;
158                        }
159                        return lookupAgent(vm);
160                    }
161
162                    JMXServiceURL lookupRemote(VirtualMachine vm) throws IOException {
163                        boolean isRemote =
164                                Boolean.valueOf(vm.getSystemProperties().getProperty("com.sun.management.jmxremote", "false")).booleanValue();
165                        if (!isRemote) {
166                            return null;
167                        }
168                        int port = Integer.valueOf(vm.getSystemProperties().getProperty("com.sun.management.jmxremote.port", "1089")).intValue();
169                        return new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxrmi", port));
170                    }
171
172                    JMXServiceURL lookupAgent(VirtualMachine vm) throws IOException {
173                        String address = vm.getAgentProperties().getProperty("com.sun.management.jmxremote.localConnectorAddress");
174                        if (address != null) {
175                            return new JMXServiceURL(address);
176                        }
177                        startAgent(vm);
178                        return lookupAgent(vm);
179                    }
180
181                    void startAgent(VirtualMachine vm) throws IOException {
182                        String home = vm.getSystemProperties().getProperty("java.home");
183
184                        // Normally in
185                        // ${java.home}/jre/lib/management-agent.jar but
186                        // might
187                        // be in ${java.home}/lib in build environments.
188
189                        String agent = home + File.separator + "jre" + File.separator +
190                                "lib" + File.separator + "management-agent.jar";
191                        File f = new File(agent);
192                        if (!f.exists()) {
193                            agent = home + File.separator + "lib" + File.separator +
194                                    "management-agent.jar";
195                            f = new File(agent);
196                            if (!f.exists()) {
197                                throw new IOException("Management agent not found");
198                            }
199                        }
200
201                        agent = f.getCanonicalPath();
202                        try {
203                            vm.loadAgent(agent, "com.sun.management.jmxremote");
204                        } catch (AgentLoadException x) {
205                            IOException ioe = new IOException(x.getMessage());
206                            ioe.initCause(x);
207                            throw ioe;
208                        } catch (AgentInitializationException x) {
209                            IOException ioe = new IOException(x.getMessage());
210                            ioe.initCause(x);
211                            throw ioe;
212                        }
213                    }
214
215                    MBeanServerConnection connect(JMXServiceURL url) throws IOException {
216                        return JMXConnectorFactory.connect(url).getMBeanServerConnection();
217                    }
218                }
219
220                @Override
221                public boolean hasNext() {
222                    return next != null;
223                }
224
225                @Override
226                public ReportServer next() {
227                    try {
228                        return next;
229                    } finally {
230                        next = fetchNext();
231                    }
232                }
233
234            };
235        }
236    }
237
238    interface Consumer {
239        void consume(JsonParser stream);
240    }
241
242    <A> void connect(Consumer consumer) throws IOException, InterruptedException, ExecutionException {
243        ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
244
245            @Override
246            public Thread newThread(Runnable target) {
247                Thread thread = new Thread(target, "connect-report");
248                thread.setDaemon(true);
249                return thread;
250            }
251        });
252        try {
253            for (ReportServer server : new Discovery()) {
254                try (ServerSocket callback = new ServerSocket(0)) {
255                    final Future<?> consumed = executor.submit(new Runnable() {
256
257                        @Override
258                        public void run() {
259
260                            String name = Thread.currentThread().getName();
261                            Thread.currentThread().setName("connect-report-consumer-" + server);
262                            try (InputStream source = callback.accept().getInputStream()) {
263                                consumer.consume(Json.createParser(source));
264                            } catch (IOException | JsonParsingException cause) {
265                                throw new AssertionError("Cannot consume connect report", cause);
266                            } finally {
267                                Thread.currentThread().setName(name);
268                            }
269                            LogFactory.getLog(ReportConnector.class).info("Consumed " + server);
270                        }
271                    });
272                    final Future<?> served = executor.submit(new Runnable() {
273
274                        @Override
275                        public void run() {
276                            String name = Thread.currentThread().getName();
277                            Thread.currentThread().setName("connect-report-server-" + server);
278                            InetSocketAddress address = (InetSocketAddress) callback.getLocalSocketAddress();
279                            try {
280                                server.run(address.getHostName(), address.getPort());
281                            } catch (IOException cause) {
282                                throw new AssertionError("Cannot run connect report", cause);
283                            } finally {
284                                Thread.currentThread().setName(name);
285                            }
286                        }
287
288                    });
289                    ExecutionException consumerError = null;
290                    try {
291                        consumed.get();
292                    } catch (ExecutionException cause) {
293                        consumerError = cause;
294                    }
295                    try {
296                        served.get();
297                    } catch (ExecutionException cause) {
298                        if (consumerError != null) {
299                            consumerError.addSuppressed(cause);
300                            throw consumerError;
301                        }
302                        throw cause;
303                    }
304                }
305            }
306        } finally {
307            executor.shutdownNow();
308        }
309    }
310
311    public Iterable<ReportServer> discover() {
312        class ToolsRunner {
313
314            @SuppressWarnings("unchecked")
315            Iterable<ReportServer> discover() {
316                try {
317                    ReportConnector.class.getClassLoader().loadClass("com.sun.tools.attach.VirtualMachine");
318                } catch (ClassNotFoundException cause) {
319                    class Loader extends URLClassLoader {
320                        Loader(Path path) {
321                            super(new URL[] { fileof(path) });
322                        }
323
324                        @Override
325                        protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
326                            if (name.equals(Discovery.class.getName())) {
327                                return findClass(name);
328                            }
329                            return super.loadClass(name, resolve);
330                        }
331                    }
332                    ClassLoader previous = Thread.currentThread().getContextClassLoader();
333                    ClassLoader loader = new Loader(findTools());
334                    Thread.currentThread().setContextClassLoader(loader);
335                    try {
336                        return (Iterable<ReportServer>) loader.loadClass(Discovery.class.getName()).newInstance();
337                    } catch (ReflectiveOperationException cause1) {
338                        throw new AssertionError("Cannot discover servers", cause1);
339                    } finally {
340                        Thread.currentThread().setContextClassLoader(previous);
341                    }
342                }
343                return new Discovery();
344            }
345
346            URL fileof(Path path) {
347                try {
348                    return new URL("file://".concat(path.toString()));
349                } catch (MalformedURLException cause) {
350                    throw new AssertionError("Cannot create url for " + path, cause);
351                }
352            }
353
354            Path findTools() {
355                Path home = Paths.get(System.getProperty("java.home"));
356                for (Path path : new Path[] {
357                        Paths.get("../lib/tools.jar"),
358                        Paths.get("../Classes/classes.jar")
359                }) {
360                    Path tools = home.resolve(path);
361                    if (Files.exists(tools)) {
362                        return tools;
363                    }
364                }
365                throw new AssertionError("Cannot find tools in system");
366            }
367
368        }
369        try {
370            return new ToolsRunner().discover();
371        } catch (Exception cause) {
372            throw new AssertionError("Cannot discover servers", cause);
373        }
374    }
375
376}