001/*
002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     matic
016 */
017package org.nuxeo.ecm.core.management.jtajca.internal;
018
019import java.lang.reflect.Field;
020import java.lang.reflect.InvocationHandler;
021import java.lang.reflect.Method;
022import java.lang.reflect.Proxy;
023
024import javax.management.ObjectInstance;
025import javax.resource.spi.ManagedConnection;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.geronimo.connector.outbound.AbstractConnectionManager;
030import org.apache.geronimo.connector.outbound.AbstractConnectionManager.Interceptors;
031import org.apache.geronimo.connector.outbound.ConnectionInfo;
032import org.apache.geronimo.connector.outbound.ConnectionInterceptor;
033import org.apache.log4j.MDC;
034import org.nuxeo.ecm.core.api.NuxeoException;
035import org.nuxeo.ecm.core.management.jtajca.ConnectionPoolMonitor;
036import org.nuxeo.ecm.core.storage.sql.Mapper;
037import org.nuxeo.ecm.core.storage.sql.Mapper.Identification;
038import org.nuxeo.ecm.core.storage.sql.SessionImpl;
039import org.nuxeo.ecm.core.storage.sql.SoftRefCachingMapper;
040import org.nuxeo.ecm.core.storage.sql.ra.ManagedConnectionImpl;
041import org.nuxeo.runtime.metrics.MetricsService;
042import org.tranql.connector.AbstractManagedConnection;
043
044import com.codahale.metrics.JmxAttributeGauge;
045import com.codahale.metrics.MetricRegistry;
046import com.codahale.metrics.SharedMetricRegistries;
047
048/**
049 * @author matic
050 */
051public class DefaultConnectionPoolMonitor implements ConnectionPoolMonitor {
052
053    private static final Log log = LogFactory.getLog(DefaultConnectionPoolMonitor.class);
054
055    // @since 5.7.2
056    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
057
058    protected final String name;
059
060    protected AbstractConnectionManager cm;
061
062    protected DefaultConnectionPoolMonitor(String mame, AbstractConnectionManager cm) {
063        name = mame;
064        this.cm = enhanceConnectionManager(cm);
065    }
066
067    protected static Field field(Class<?> clazz, String name) {
068        try {
069            Field field = clazz.getDeclaredField(name);
070            field.setAccessible(true);
071            return field;
072        } catch (ReflectiveOperationException e) {
073            throw new RuntimeException("Cannot get access to " + clazz + "#" + name + " field");
074        }
075    }
076
077    @SuppressWarnings("unchecked")
078    protected static <T> T fetch(Field field, Object object) {
079        try {
080            return (T) field.get(object);
081        } catch (ReflectiveOperationException e) {
082            throw new RuntimeException("Cannot get access to field content", e);
083        }
084    }
085
086    protected static void save(Field field, Object object, Object value) {
087        try {
088            field.set(object, value);
089        } catch (ReflectiveOperationException e) {
090            throw new RuntimeException("Cannot set field content", e);
091        }
092    }
093
094    protected AbstractConnectionManager enhanceConnectionManager(AbstractConnectionManager cm) {
095        if (!log.isTraceEnabled()) {
096            return cm;
097        }
098        Field field = field(AbstractConnectionManager.class, "interceptors");
099        Interceptors interceptors = fetch(field, cm);
100        interceptors = enhanceInterceptors(interceptors);
101        save(field, cm, interceptors);
102        return cm;
103    }
104
105    protected Interceptors enhanceInterceptors(Interceptors interceptors) {
106        Field field = field(interceptors.getClass(), "stack");
107        ConnectionInterceptor stack = fetch(field, interceptors);
108        save(field, interceptors, enhanceStack(stack));
109        return interceptors;
110    }
111
112    protected ConnectionInterceptor enhanceStack(ConnectionInterceptor stack) {
113        try {
114            Field field = field(stack.getClass(), "next");
115            ConnectionInterceptor next = fetch(field, stack);
116            save(field, stack, enhanceStack(next));
117        } catch (RuntimeException e) {;
118        }
119        return (ConnectionInterceptor) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
120                new Class[] { ConnectionInterceptor.class }, new StackHandler(stack));
121    }
122
123    protected class StackHandler implements InvocationHandler {
124
125        protected final ConnectionInterceptor stack;
126
127        public StackHandler(ConnectionInterceptor stack) {
128            this.stack = stack;
129        }
130
131        protected void traceInvoke(Method m, Object[] args) {
132            Throwable stackTrace = null;
133            if (ConnectionInterceptor.class.isAssignableFrom(m.getDeclaringClass())) {
134                stackTrace = new Throwable("debug stack trace");
135            }
136            log.trace("invoked " + stack.getClass().getSimpleName() + "." + m.getName(), stackTrace);
137        }
138
139        @Override
140        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
141            try {
142                return method.invoke(stack, args);
143            } finally {
144                String name = method.getName();
145                traceInvoke(method, args);
146                if (args != null && args.length > 0) {
147                    ConnectionInfo info = (ConnectionInfo) args[0];
148                    ManagedConnection connection = info.getManagedConnectionInfo().getManagedConnection();
149                    IdProvider midProvider = guessProvider(connection);
150                    if (name.startsWith("get")) {
151                        MDC.put(midProvider.key(), midProvider.id(connection));
152                    } else if (name.startsWith("return")) {
153                        MDC.remove(midProvider.key());
154                    }
155                }
156            }
157        }
158
159        protected IdProvider midProvider;
160
161        protected IdProvider guessProvider(ManagedConnection connection) {
162            if (midProvider != null) {
163                return midProvider;
164            }
165            if (connection instanceof ManagedConnectionImpl) {
166                return new IdProvider() {
167
168                    @Override
169                    public String key() {
170                        return "vcs";
171                    }
172
173                    @Override
174                    public Object id(ManagedConnection connection) {
175                        return mapperId(connection);
176                    }
177
178                };
179            }
180            if (connection instanceof AbstractManagedConnection) {
181                return new IdProvider() {
182
183                    @Override
184                    public String key() {
185                        return "db";
186                    }
187
188                    @Override
189                    public Object id(ManagedConnection connection) {
190                        return ((AbstractManagedConnection) connection).getPhysicalConnection();
191                    }
192
193                };
194            }
195            throw new IllegalArgumentException("unknown connection type of " + connection.getClass());
196        }
197    }
198
199    interface IdProvider {
200        String key();
201
202        Object id(ManagedConnection connection);
203    }
204
205    private static final Field SESSION_FIELD = field(ManagedConnectionImpl.class, "session");
206
207    private static final Field WRAPPED_FIELD = field(SoftRefCachingMapper.class, "mapper");
208
209    protected Identification mapperId(ManagedConnection connection) {
210        SessionImpl session = fetch(SESSION_FIELD, connection);
211        Mapper mapper = session.getMapper();
212        if (mapper instanceof SoftRefCachingMapper) {
213            mapper = fetch(WRAPPED_FIELD, mapper);
214        }
215        try {
216            return mapper.getIdentification();
217        } catch (NuxeoException e) {
218            log.error("Cannot fetch mapper identification", e);
219            return null;
220        }
221    }
222
223    protected ObjectInstance self;
224
225    @Override
226    public void install() {
227        self = DefaultMonitorComponent.bind(this, name);
228        registry.register(MetricRegistry.name("nuxeo", "repositories", name, "connections", "count"),
229                new JmxAttributeGauge(self.getObjectName(), "ConnectionCount"));
230        registry.register(MetricRegistry.name("nuxeo", "repositories", name, "connections", "idle"),
231                new JmxAttributeGauge(self.getObjectName(), "IdleConnectionCount"));
232    }
233
234    @Override
235    public void uninstall() {
236        DefaultMonitorComponent.unbind(self);
237        registry.remove(MetricRegistry.name("nuxeo", "repositories", name, "connections", "count"));
238        registry.remove(MetricRegistry.name("nuxeo", "repositories", name, "connections", "idle"));
239        self = null;
240    }
241
242    @Override
243    public int getConnectionCount() {
244        return cm.getConnectionCount();
245    }
246
247    @Override
248    public int getIdleConnectionCount() {
249        return cm.getIdleConnectionCount();
250    }
251
252    @Override
253    public int getBlockingTimeoutMilliseconds() {
254        return cm.getBlockingTimeoutMilliseconds();
255    }
256
257    @Override
258    public int getIdleTimeoutMinutes() {
259        return cm.getIdleTimeoutMinutes();
260    }
261
262    @Override
263    public int getPartitionCount() {
264        return cm.getPartitionCount();
265    }
266
267    @Override
268    public int getPartitionMaxSize() {
269        return cm.getPartitionMaxSize();
270    }
271
272    @Override
273    public void setPartitionMaxSize(int maxSize) throws InterruptedException {
274        cm.setPartitionMaxSize(maxSize);
275    }
276
277    @Override
278    public int getPartitionMinSize() {
279        return cm.getPartitionMinSize();
280    }
281
282    @Override
283    public void setPartitionMinSize(int minSize) {
284        cm.setPartitionMinSize(minSize);
285    }
286
287    @Override
288    public void setBlockingTimeoutMilliseconds(int timeoutMilliseconds) {
289        cm.setBlockingTimeoutMilliseconds(timeoutMilliseconds);
290    }
291
292    @Override
293    public void setIdleTimeoutMinutes(int idleTimeoutMinutes) {
294        cm.setIdleTimeoutMinutes(idleTimeoutMinutes);
295    }
296
297    /**
298     * @since 5.8
299     */
300    public void handleNewConnectionManager(AbstractConnectionManager cm) {
301        this.cm = enhanceConnectionManager(cm);
302    }
303
304}