001/*
002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     matic
018 */
019package org.nuxeo.ecm.core.management.jtajca.internal;
020
021import java.lang.reflect.Field;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Method;
024import java.lang.reflect.Proxy;
025
026import javax.management.ObjectInstance;
027import javax.resource.spi.ManagedConnection;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.geronimo.connector.outbound.AbstractConnectionManager;
032import org.apache.geronimo.connector.outbound.AbstractConnectionManager.Interceptors;
033import org.apache.geronimo.connector.outbound.ConnectionInfo;
034import org.apache.geronimo.connector.outbound.ConnectionInterceptor;
035import org.apache.log4j.MDC;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.management.jtajca.ConnectionPoolMonitor;
038import org.nuxeo.ecm.core.storage.sql.Mapper;
039import org.nuxeo.ecm.core.storage.sql.Mapper.Identification;
040import org.nuxeo.ecm.core.storage.sql.SessionImpl;
041import org.nuxeo.ecm.core.storage.sql.SoftRefCachingMapper;
042import org.nuxeo.ecm.core.storage.sql.ra.ManagedConnectionImpl;
043import org.nuxeo.runtime.metrics.MetricsService;
044import org.tranql.connector.AbstractManagedConnection;
045
046import com.codahale.metrics.JmxAttributeGauge;
047import com.codahale.metrics.MetricRegistry;
048import com.codahale.metrics.SharedMetricRegistries;
049
050/**
051 * @author matic
052 */
053public class DefaultConnectionPoolMonitor implements ConnectionPoolMonitor {
054
055    private static final Log log = LogFactory.getLog(DefaultConnectionPoolMonitor.class);
056
057    // @since 5.7.2
058    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
059
060    protected final String name;
061
062    protected AbstractConnectionManager cm;
063
064    protected DefaultConnectionPoolMonitor(String mame, AbstractConnectionManager cm) {
065        name = mame;
066        this.cm = enhanceConnectionManager(cm);
067    }
068
069    protected static Field field(Class<?> clazz, String name) {
070        try {
071            Field field = clazz.getDeclaredField(name);
072            field.setAccessible(true);
073            return field;
074        } catch (ReflectiveOperationException e) {
075            throw new RuntimeException("Cannot get access to " + clazz + "#" + name + " field");
076        }
077    }
078
079    @SuppressWarnings("unchecked")
080    protected static <T> T fetch(Field field, Object object) {
081        try {
082            return (T) field.get(object);
083        } catch (ReflectiveOperationException e) {
084            throw new RuntimeException("Cannot get access to field content", e);
085        }
086    }
087
088    protected static void save(Field field, Object object, Object value) {
089        try {
090            field.set(object, value);
091        } catch (ReflectiveOperationException e) {
092            throw new RuntimeException("Cannot set field content", e);
093        }
094    }
095
096    protected AbstractConnectionManager enhanceConnectionManager(AbstractConnectionManager cm) {
097        if (!log.isTraceEnabled()) {
098            return cm;
099        }
100        Field field = field(AbstractConnectionManager.class, "interceptors");
101        Interceptors interceptors = fetch(field, cm);
102        interceptors = enhanceInterceptors(interceptors);
103        save(field, cm, interceptors);
104        return cm;
105    }
106
107    protected Interceptors enhanceInterceptors(Interceptors interceptors) {
108        Field field = field(interceptors.getClass(), "stack");
109        ConnectionInterceptor stack = fetch(field, interceptors);
110        save(field, interceptors, enhanceStack(stack));
111        return interceptors;
112    }
113
114    protected ConnectionInterceptor enhanceStack(ConnectionInterceptor stack) {
115        try {
116            Field field = field(stack.getClass(), "next");
117            ConnectionInterceptor next = fetch(field, stack);
118            save(field, stack, enhanceStack(next));
119        } catch (RuntimeException e) {;
120        }
121        return (ConnectionInterceptor) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
122                new Class[] { ConnectionInterceptor.class }, new StackHandler(stack));
123    }
124
125    protected class StackHandler implements InvocationHandler {
126
127        protected final ConnectionInterceptor stack;
128
129        public StackHandler(ConnectionInterceptor stack) {
130            this.stack = stack;
131        }
132
133        protected void traceInvoke(Method m, Object[] args) {
134            Throwable stackTrace = null;
135            if (ConnectionInterceptor.class.isAssignableFrom(m.getDeclaringClass())) {
136                stackTrace = new Throwable("debug stack trace");
137            }
138            log.trace("invoked " + stack.getClass().getSimpleName() + "." + m.getName(), stackTrace);
139        }
140
141        IdProvider midProvider;
142
143        @Override
144        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
145            try {
146                return method.invoke(stack, args);
147            } finally {
148                String name = method.getName();
149                traceInvoke(method, args);
150                if (args != null && args.length > 0) {
151                    ConnectionInfo info = (ConnectionInfo) args[0];
152                    ManagedConnection connection = info.getManagedConnectionInfo().getManagedConnection();
153                    if (connection != null) {
154                        if (midProvider == null) {
155                            midProvider = guessProvider(connection);
156                        }
157                        if (name.startsWith("get")) {
158                            MDC.put(midProvider.key(), midProvider.id(connection));
159                        } else if (name.startsWith("return")) {
160                            MDC.remove(midProvider.key());
161                        }
162                    }
163                }
164            }
165        }
166
167        protected IdProvider guessProvider(ManagedConnection connection) {
168            if (connection instanceof ManagedConnectionImpl) {
169                return new IdProvider() {
170
171                    @Override
172                    public String key() {
173                        return "vcs";
174                    }
175
176                    @Override
177                    public Object id(ManagedConnection connection) {
178                        return mapperId(connection);
179                    }
180
181                };
182            }
183            if (connection instanceof AbstractManagedConnection) {
184                return new IdProvider() {
185
186                    @Override
187                    public String key() {
188                        return "db";
189                    }
190
191                    @Override
192                    public Object id(ManagedConnection connection) {
193                        return ((AbstractManagedConnection<?,?>) connection).getPhysicalConnection();
194                    }
195
196                };
197            }
198            throw new IllegalArgumentException("unknown connection type of " + connection.getClass());
199        }
200    }
201
202    interface IdProvider {
203        String key();
204
205        Object id(ManagedConnection connection);
206    }
207
208    private static final Field SESSION_FIELD = field(ManagedConnectionImpl.class, "session");
209
210    private static final Field WRAPPED_FIELD = field(SoftRefCachingMapper.class, "mapper");
211
212    protected Identification mapperId(ManagedConnection connection) {
213        SessionImpl session = fetch(SESSION_FIELD, connection);
214        Mapper mapper = session.getMapper();
215        if (mapper instanceof SoftRefCachingMapper) {
216            mapper = fetch(WRAPPED_FIELD, mapper);
217        }
218        try {
219            return mapper.getIdentification();
220        } catch (NuxeoException e) {
221            log.error("Cannot fetch mapper identification", e);
222            return null;
223        }
224    }
225
226    protected ObjectInstance self;
227
228    @Override
229    public void install() {
230        self = DefaultMonitorComponent.bind(this, name);
231        registry.register(MetricRegistry.name("nuxeo", "repositories", name, "connections", "count"),
232                new JmxAttributeGauge(self.getObjectName(), "ConnectionCount"));
233        registry.register(MetricRegistry.name("nuxeo", "repositories", name, "connections", "idle"),
234                new JmxAttributeGauge(self.getObjectName(), "IdleConnectionCount"));
235    }
236
237    @Override
238    public void uninstall() {
239        DefaultMonitorComponent.unbind(self);
240        registry.remove(MetricRegistry.name("nuxeo", "repositories", name, "connections", "count"));
241        registry.remove(MetricRegistry.name("nuxeo", "repositories", name, "connections", "idle"));
242        self = null;
243    }
244
245    @Override
246    public int getConnectionCount() {
247        return cm.getConnectionCount();
248    }
249
250    @Override
251    public int getIdleConnectionCount() {
252        return cm.getIdleConnectionCount();
253    }
254
255    @Override
256    public int getBlockingTimeoutMilliseconds() {
257        return cm.getBlockingTimeoutMilliseconds();
258    }
259
260    @Override
261    public int getIdleTimeoutMinutes() {
262        return cm.getIdleTimeoutMinutes();
263    }
264
265    @Override
266    public int getPartitionCount() {
267        return cm.getPartitionCount();
268    }
269
270    @Override
271    public int getPartitionMaxSize() {
272        return cm.getPartitionMaxSize();
273    }
274
275    @Override
276    public void setPartitionMaxSize(int maxSize) throws InterruptedException {
277        cm.setPartitionMaxSize(maxSize);
278    }
279
280    @Override
281    public int getPartitionMinSize() {
282        return cm.getPartitionMinSize();
283    }
284
285    @Override
286    public void setPartitionMinSize(int minSize) {
287        cm.setPartitionMinSize(minSize);
288    }
289
290    @Override
291    public void setBlockingTimeoutMilliseconds(int timeoutMilliseconds) {
292        cm.setBlockingTimeoutMilliseconds(timeoutMilliseconds);
293    }
294
295    @Override
296    public void setIdleTimeoutMinutes(int idleTimeoutMinutes) {
297        cm.setIdleTimeoutMinutes(idleTimeoutMinutes);
298    }
299
300    /**
301     * @since 5.8
302     */
303    public void handleNewConnectionManager(AbstractConnectionManager cm) {
304        this.cm = enhanceConnectionManager(cm);
305    }
306
307}