001/*
002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     matic
018 */
019package org.nuxeo.ecm.core.management.jtajca.internal;
020
021import java.lang.reflect.Field;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Method;
024import java.lang.reflect.Proxy;
025
026import javax.management.ObjectInstance;
027import javax.resource.spi.ManagedConnection;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.geronimo.connector.outbound.AbstractConnectionManager;
032import org.apache.geronimo.connector.outbound.AbstractConnectionManager.Interceptors;
033import org.apache.geronimo.connector.outbound.ConnectionInfo;
034import org.apache.geronimo.connector.outbound.ConnectionInterceptor;
035import org.apache.log4j.MDC;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.management.jtajca.ConnectionPoolMonitor;
038import org.nuxeo.ecm.core.storage.sql.Mapper;
039import org.nuxeo.ecm.core.storage.sql.Mapper.Identification;
040import org.nuxeo.ecm.core.storage.sql.SessionImpl;
041import org.nuxeo.ecm.core.storage.sql.SoftRefCachingMapper;
042import org.nuxeo.ecm.core.storage.sql.ra.ManagedConnectionImpl;
043import org.nuxeo.runtime.metrics.MetricsService;
044import org.tranql.connector.AbstractManagedConnection;
045
046import com.codahale.metrics.JmxAttributeGauge;
047import com.codahale.metrics.MetricRegistry;
048import com.codahale.metrics.SharedMetricRegistries;
049
050/**
051 * @author matic
052 */
053public class DefaultConnectionPoolMonitor implements ConnectionPoolMonitor {
054
055    private static final Log log = LogFactory.getLog(DefaultConnectionPoolMonitor.class);
056
057    // @since 5.7.2
058    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
059
060    protected final String name;
061
062    protected AbstractConnectionManager cm;
063
064    protected DefaultConnectionPoolMonitor(String mame, AbstractConnectionManager cm) {
065        name = mame;
066        this.cm = enhanceConnectionManager(cm);
067    }
068
069    protected static Field field(Class<?> clazz, String name) {
070        try {
071            Field field = clazz.getDeclaredField(name);
072            field.setAccessible(true);
073            return field;
074        } catch (ReflectiveOperationException e) {
075            throw new RuntimeException("Cannot get access to " + clazz + "#" + name + " field");
076        }
077    }
078
079    @SuppressWarnings("unchecked")
080    protected static <T> T fetch(Field field, Object object) {
081        try {
082            return (T) field.get(object);
083        } catch (ReflectiveOperationException e) {
084            throw new RuntimeException("Cannot get access to field content", e);
085        }
086    }
087
088    protected static void save(Field field, Object object, Object value) {
089        try {
090            field.set(object, value);
091        } catch (ReflectiveOperationException e) {
092            throw new RuntimeException("Cannot set field content", e);
093        }
094    }
095
096    protected AbstractConnectionManager enhanceConnectionManager(AbstractConnectionManager cm) {
097        if (!log.isTraceEnabled()) {
098            return cm;
099        }
100        Field field = field(AbstractConnectionManager.class, "interceptors");
101        Interceptors interceptors = fetch(field, cm);
102        interceptors = enhanceInterceptors(interceptors);
103        save(field, cm, interceptors);
104        return cm;
105    }
106
107    protected Interceptors enhanceInterceptors(Interceptors interceptors) {
108        Field field = field(interceptors.getClass(), "stack");
109        ConnectionInterceptor stack = fetch(field, interceptors);
110        save(field, interceptors, enhanceStack(stack));
111        return interceptors;
112    }
113
114    protected ConnectionInterceptor enhanceStack(ConnectionInterceptor stack) {
115        try {
116            Field field = field(stack.getClass(), "next");
117            ConnectionInterceptor next = fetch(field, stack);
118            save(field, stack, enhanceStack(next));
119        } catch (RuntimeException e) {;
120        }
121        return (ConnectionInterceptor) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
122                new Class[] { ConnectionInterceptor.class }, new StackHandler(stack));
123    }
124
125    protected class StackHandler implements InvocationHandler {
126
127        protected final ConnectionInterceptor stack;
128
129        public StackHandler(ConnectionInterceptor stack) {
130            this.stack = stack;
131        }
132
133        protected void traceInvoke(Method m, Object[] args) {
134            Throwable stackTrace = null;
135            if (ConnectionInterceptor.class.isAssignableFrom(m.getDeclaringClass())) {
136                stackTrace = new Throwable("debug stack trace");
137            }
138            log.trace("invoked " + stack.getClass().getSimpleName() + "." + m.getName(), stackTrace);
139        }
140
141        @Override
142        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
143            try {
144                return method.invoke(stack, args);
145            } finally {
146                String name = method.getName();
147                traceInvoke(method, args);
148                if (args != null && args.length > 0) {
149                    ConnectionInfo info = (ConnectionInfo) args[0];
150                    ManagedConnection connection = info.getManagedConnectionInfo().getManagedConnection();
151                    IdProvider midProvider = guessProvider(connection);
152                    if (name.startsWith("get")) {
153                        MDC.put(midProvider.key(), midProvider.id(connection));
154                    } else if (name.startsWith("return")) {
155                        MDC.remove(midProvider.key());
156                    }
157                }
158            }
159        }
160
161        protected IdProvider midProvider;
162
163        protected IdProvider guessProvider(ManagedConnection connection) {
164            if (midProvider != null) {
165                return midProvider;
166            }
167            if (connection instanceof ManagedConnectionImpl) {
168                return new IdProvider() {
169
170                    @Override
171                    public String key() {
172                        return "vcs";
173                    }
174
175                    @Override
176                    public Object id(ManagedConnection connection) {
177                        return mapperId(connection);
178                    }
179
180                };
181            }
182            if (connection instanceof AbstractManagedConnection) {
183                return new IdProvider() {
184
185                    @Override
186                    public String key() {
187                        return "db";
188                    }
189
190                    @Override
191                    public Object id(ManagedConnection connection) {
192                        return ((AbstractManagedConnection) connection).getPhysicalConnection();
193                    }
194
195                };
196            }
197            throw new IllegalArgumentException("unknown connection type of " + connection.getClass());
198        }
199    }
200
201    interface IdProvider {
202        String key();
203
204        Object id(ManagedConnection connection);
205    }
206
207    private static final Field SESSION_FIELD = field(ManagedConnectionImpl.class, "session");
208
209    private static final Field WRAPPED_FIELD = field(SoftRefCachingMapper.class, "mapper");
210
211    protected Identification mapperId(ManagedConnection connection) {
212        SessionImpl session = fetch(SESSION_FIELD, connection);
213        Mapper mapper = session.getMapper();
214        if (mapper instanceof SoftRefCachingMapper) {
215            mapper = fetch(WRAPPED_FIELD, mapper);
216        }
217        try {
218            return mapper.getIdentification();
219        } catch (NuxeoException e) {
220            log.error("Cannot fetch mapper identification", e);
221            return null;
222        }
223    }
224
225    protected ObjectInstance self;
226
227    @Override
228    public void install() {
229        self = DefaultMonitorComponent.bind(this, name);
230        registry.register(MetricRegistry.name("nuxeo", "repositories", name, "connections", "count"),
231                new JmxAttributeGauge(self.getObjectName(), "ConnectionCount"));
232        registry.register(MetricRegistry.name("nuxeo", "repositories", name, "connections", "idle"),
233                new JmxAttributeGauge(self.getObjectName(), "IdleConnectionCount"));
234    }
235
236    @Override
237    public void uninstall() {
238        DefaultMonitorComponent.unbind(self);
239        registry.remove(MetricRegistry.name("nuxeo", "repositories", name, "connections", "count"));
240        registry.remove(MetricRegistry.name("nuxeo", "repositories", name, "connections", "idle"));
241        self = null;
242    }
243
244    @Override
245    public int getConnectionCount() {
246        return cm.getConnectionCount();
247    }
248
249    @Override
250    public int getIdleConnectionCount() {
251        return cm.getIdleConnectionCount();
252    }
253
254    @Override
255    public int getBlockingTimeoutMilliseconds() {
256        return cm.getBlockingTimeoutMilliseconds();
257    }
258
259    @Override
260    public int getIdleTimeoutMinutes() {
261        return cm.getIdleTimeoutMinutes();
262    }
263
264    @Override
265    public int getPartitionCount() {
266        return cm.getPartitionCount();
267    }
268
269    @Override
270    public int getPartitionMaxSize() {
271        return cm.getPartitionMaxSize();
272    }
273
274    @Override
275    public void setPartitionMaxSize(int maxSize) throws InterruptedException {
276        cm.setPartitionMaxSize(maxSize);
277    }
278
279    @Override
280    public int getPartitionMinSize() {
281        return cm.getPartitionMinSize();
282    }
283
284    @Override
285    public void setPartitionMinSize(int minSize) {
286        cm.setPartitionMinSize(minSize);
287    }
288
289    @Override
290    public void setBlockingTimeoutMilliseconds(int timeoutMilliseconds) {
291        cm.setBlockingTimeoutMilliseconds(timeoutMilliseconds);
292    }
293
294    @Override
295    public void setIdleTimeoutMinutes(int idleTimeoutMinutes) {
296        cm.setIdleTimeoutMinutes(idleTimeoutMinutes);
297    }
298
299    /**
300     * @since 5.8
301     */
302    public void handleNewConnectionManager(AbstractConnectionManager cm) {
303        this.cm = enhanceConnectionManager(cm);
304    }
305
306}