001/* 002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012 013package org.nuxeo.ecm.core.storage.sql; 014 015import java.io.Serializable; 016import java.util.Calendar; 017import java.util.Collection; 018import java.util.Random; 019import java.util.concurrent.CopyOnWriteArrayList; 020 021import javax.naming.Reference; 022import javax.resource.ResourceException; 023import javax.resource.cci.ConnectionSpec; 024import javax.resource.cci.RecordFactory; 025import javax.resource.cci.ResourceAdapterMetaData; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.ecm.core.api.NuxeoException; 031import org.nuxeo.ecm.core.model.LockManager; 032import org.nuxeo.ecm.core.storage.DefaultFulltextParser; 033import org.nuxeo.ecm.core.storage.FulltextParser; 034import org.nuxeo.ecm.core.storage.lock.LockManagerService; 035import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 036import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend; 037import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator; 038import org.nuxeo.runtime.api.Framework; 039import org.nuxeo.runtime.metrics.MetricsService; 040 041import com.codahale.metrics.Counter; 042import com.codahale.metrics.Gauge; 043import com.codahale.metrics.MetricRegistry; 044import com.codahale.metrics.SharedMetricRegistries; 045 046/** 047 * {@link Repository} implementation, to be extended by backend-specific initialization code. 048 * 049 * @see RepositoryBackend 050 */ 051public class RepositoryImpl implements Repository { 052 053 private static final long serialVersionUID = 1L; 054 055 private static final Log log = LogFactory.getLog(RepositoryImpl.class); 056 057 private static final Random RANDOM = new Random(); 058 059 protected final RepositoryDescriptor repositoryDescriptor; 060 061 protected final Class<? extends FulltextParser> fulltextParserClass; 062 063 private final RepositoryBackend backend; 064 065 private final Collection<SessionImpl> sessions; 066 067 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 068 069 protected final Counter repositoryUp; 070 071 protected final Counter sessionCount; 072 073 private LockManager lockManager; 074 075 /** 076 * @since 7.4 : used to know if the LockManager was provided by this repository or externally 077 */ 078 protected boolean selfRegisteredLockManager = false; 079 080 /** Propagator of invalidations to all mappers' caches. */ 081 protected final InvalidationsPropagator invalidationsPropagator; 082 083 private Model model; 084 085 /** 086 * Transient id for this repository assigned by the server on first connection. This is not persisted. 087 */ 088 public String repositoryId; 089 090 public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) { 091 this.repositoryDescriptor = repositoryDescriptor; 092 sessions = new CopyOnWriteArrayList<SessionImpl>(); 093 invalidationsPropagator = new InvalidationsPropagator(); 094 095 String className = repositoryDescriptor.fulltextParser; 096 if (StringUtils.isBlank(className)) { 097 className = DefaultFulltextParser.class.getName(); 098 } 099 Class<?> klass; 100 try { 101 klass = Thread.currentThread().getContextClassLoader().loadClass(className); 102 } catch (ClassNotFoundException e) { 103 throw new NuxeoException("Unknown fulltext parser class: " + className, e); 104 } 105 if (!FulltextParser.class.isAssignableFrom(klass)) { 106 throw new NuxeoException("Invalid fulltext parser class: " + className); 107 } 108 fulltextParserClass = (Class<? extends FulltextParser>) klass; 109 110 backend = createBackend(); 111 repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 112 "instance-up")); 113 repositoryUp.inc(); 114 sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 115 "sessions")); 116 createMetricsGauges(); 117 } 118 119 protected void createMetricsGauges() { 120 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size"); 121 registry.remove(gaugeName); 122 registry.register(gaugeName, new Gauge<Long>() { 123 @Override 124 public Long getValue() { 125 return getCacheSize(); 126 } 127 }); 128 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines"); 129 registry.remove(gaugeName); 130 registry.register(gaugeName, new Gauge<Long>() { 131 @Override 132 public Long getValue() { 133 return getCachePristineSize(); 134 } 135 }); 136 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections"); 137 registry.remove(gaugeName); 138 registry.register(gaugeName, new Gauge<Long>() { 139 @Override 140 public Long getValue() { 141 return getCacheSelectionSize(); 142 } 143 }); 144 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers"); 145 registry.remove(gaugeName); 146 registry.register(gaugeName, new Gauge<Long>() { 147 @Override 148 public Long getValue() { 149 return getCacheMapperSize(); 150 } 151 }); 152 } 153 154 protected RepositoryBackend createBackend() { 155 Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass; 156 if (backendClass == null) { 157 backendClass = JDBCBackend.class; 158 } 159 try { 160 RepositoryBackend backend = backendClass.newInstance(); 161 backend.initialize(this); 162 return backend; 163 } catch (ReflectiveOperationException e) { 164 throw new NuxeoException(e); 165 } 166 } 167 168 protected Mapper createCachingMapper(Model model, Mapper mapper) { 169 try { 170 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 171 if (cachingMapperClass == null) { 172 return mapper; 173 } 174 CachingMapper cachingMapper = cachingMapperClass.newInstance(); 175 cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator, 176 repositoryDescriptor.cachingMapperProperties); 177 return cachingMapper; 178 } catch (ReflectiveOperationException e) { 179 throw new NuxeoException(e); 180 } 181 } 182 183 protected Class<? extends CachingMapper> getCachingMapperClass() { 184 if (!repositoryDescriptor.getCachingMapperEnabled()) { 185 return null; 186 } 187 Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass; 188 if (cachingMapperClass == null) { 189 // default cache 190 cachingMapperClass = SoftRefCachingMapper.class; 191 } 192 return cachingMapperClass; 193 } 194 195 public RepositoryDescriptor getRepositoryDescriptor() { 196 return repositoryDescriptor; 197 } 198 199 public LockManager getLockManager() { 200 return lockManager; 201 } 202 203 public Model getModel() { 204 return model; 205 } 206 207 public InvalidationsPropagator getInvalidationsPropagator() { 208 return invalidationsPropagator; 209 } 210 211 public Class<? extends FulltextParser> getFulltextParserClass() { 212 return fulltextParserClass; 213 } 214 215 /* 216 * ----- javax.resource.cci.ConnectionFactory ----- 217 */ 218 219 /** 220 * Gets a new connection. 221 * 222 * @param connectionSpec the parameters to use to connect (unused) 223 * @return the session 224 */ 225 @Override 226 public SessionImpl getConnection(ConnectionSpec connectionSpec) { 227 return getConnection(); 228 } 229 230 /** 231 * Gets a new connection. 232 * 233 * @return the session 234 */ 235 @Override 236 public synchronized SessionImpl getConnection() { 237 if (Framework.getRuntime().isShuttingDown()) { 238 throw new IllegalStateException("Cannot open connection, runtime is shutting down"); 239 } 240 if (model == null) { 241 initRepository(); 242 } 243 SessionPathResolver pathResolver = new SessionPathResolver(); 244 Mapper mapper = newMapper(pathResolver, true); 245 SessionImpl session = newSession(model, mapper); 246 pathResolver.setSession(session); 247 sessions.add(session); 248 sessionCount.inc(); 249 return session; 250 } 251 252 /** 253 * Creates a new mapper. 254 * 255 * @param pathResolver the path resolver (for regular mappers) 256 * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager / 257 * cluster invalidator) 258 * @return the new mapper. 259 * @since 7.4 260 */ 261 public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) { 262 return backend.newMapper(model, pathResolver, useInvalidations); 263 } 264 265 protected void initRepository() { 266 log.debug("Initializing"); 267 ModelSetup modelSetup = new ModelSetup(); 268 modelSetup.repositoryDescriptor = repositoryDescriptor; 269 backend.initializeModelSetup(modelSetup); 270 model = new Model(modelSetup); 271 backend.initializeModel(model); 272 initLockManager(); 273 274 // create the cluster invalidator 275 if (repositoryDescriptor.getClusteringEnabled()) { 276 initClusterInvalidator(); 277 } 278 279 // log once which mapper cache is being used 280 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 281 if (cachingMapperClass == null) { 282 log.warn("VCS Mapper cache is disabled."); 283 } else { 284 log.info("VCS Mapper cache using: " + cachingMapperClass.getName()); 285 } 286 } 287 288 protected String getLockManagerName() { 289 // TODO configure in repo descriptor 290 return getName(); 291 } 292 293 protected void initLockManager() { 294 String lockManagerName = getLockManagerName(); 295 LockManagerService lockManagerService = Framework.getService(LockManagerService.class); 296 lockManager = lockManagerService.getLockManager(lockManagerName); 297 if (lockManager == null) { 298 // no descriptor 299 // default to a VCSLockManager 300 lockManager = new VCSLockManager(lockManagerName); 301 lockManagerService.registerLockManager(lockManagerName, lockManager); 302 selfRegisteredLockManager = true; 303 } else { 304 selfRegisteredLockManager = false; 305 } 306 log.info("Repository " + getName() + " using lock manager " + lockManager); 307 } 308 309 protected void initClusterInvalidator() { 310 String nodeId = repositoryDescriptor.getClusterNodeId(); 311 if (StringUtils.isBlank(nodeId)) { 312 // need a smallish int because of SQL Server legacy node ids 313 nodeId = String.valueOf(RANDOM.nextInt(32768)); 314 log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). " 315 + "Using random cluster node id instead: " + nodeId); 316 } else { 317 nodeId = nodeId.trim(); 318 } 319 ClusterInvalidator clusterInvalidator = createClusterInvalidator(); 320 clusterInvalidator.initialize(nodeId, this); 321 backend.setClusterInvalidator(clusterInvalidator); 322 } 323 324 protected ClusterInvalidator createClusterInvalidator() { 325 Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass; 326 if (klass == null) { 327 klass = JDBCClusterInvalidator.class; 328 } 329 try { 330 return klass.newInstance(); 331 } catch (ReflectiveOperationException e) { 332 throw new NuxeoException(e); 333 } 334 } 335 336 protected SessionImpl newSession(Model model, Mapper mapper) { 337 mapper = createCachingMapper(model, mapper); 338 return new SessionImpl(this, model, mapper); 339 } 340 341 public static class SessionPathResolver implements PathResolver { 342 343 private Session session; 344 345 protected void setSession(Session session) { 346 this.session = session; 347 } 348 349 @Override 350 public Serializable getIdForPath(String path) { 351 Node node = session.getNodeByPath(path, null); 352 return node == null ? null : node.getId(); 353 } 354 } 355 356 /* 357 * ----- 358 */ 359 360 @Override 361 public ResourceAdapterMetaData getMetaData() { 362 throw new UnsupportedOperationException(); 363 } 364 365 @Override 366 public RecordFactory getRecordFactory() { 367 throw new UnsupportedOperationException(); 368 } 369 370 /* 371 * ----- javax.resource.Referenceable ----- 372 */ 373 374 private Reference reference; 375 376 @Override 377 public void setReference(Reference reference) { 378 this.reference = reference; 379 } 380 381 @Override 382 public Reference getReference() { 383 return reference; 384 } 385 386 /* 387 * ----- Repository ----- 388 */ 389 390 @Override 391 public synchronized void close() { 392 closeAllSessions(); 393 model = null; 394 backend.shutdown(); 395 396 registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size")); 397 registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size")); 398 registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size")); 399 400 if (selfRegisteredLockManager) { 401 LockManagerService lms = Framework.getService(LockManagerService.class); 402 if (lms != null) { 403 lms.unregisterLockManager(getLockManagerName()); 404 } 405 } 406 } 407 408 protected synchronized void closeAllSessions() { 409 for (SessionImpl session : sessions) { 410 if (!session.isLive()) { 411 continue; 412 } 413 session.closeSession(); 414 } 415 sessions.clear(); 416 sessionCount.dec(sessionCount.getCount()); 417 if (lockManager != null) { 418 lockManager.closeLockManager(); 419 } 420 } 421 422 /* 423 * ----- RepositoryManagement ----- 424 */ 425 426 @Override 427 public String getName() { 428 return repositoryDescriptor.name; 429 } 430 431 @Override 432 public int getActiveSessionsCount() { 433 return sessions.size(); 434 } 435 436 @Override 437 public int clearCaches() { 438 int n = 0; 439 for (SessionImpl session : sessions) { 440 n += session.clearCaches(); 441 } 442 if (lockManager != null) { 443 lockManager.clearLockManagerCaches(); 444 } 445 return n; 446 } 447 448 @Override 449 public long getCacheSize() { 450 long size = 0; 451 for (SessionImpl session : sessions) { 452 size += session.getCacheSize(); 453 } 454 return size; 455 } 456 457 public long getCacheMapperSize() { 458 long size = 0; 459 for (SessionImpl session : sessions) { 460 size += session.getCacheMapperSize(); 461 } 462 return size; 463 } 464 465 @Override 466 public long getCachePristineSize() { 467 long size = 0; 468 for (SessionImpl session : sessions) { 469 size += session.getCachePristineSize(); 470 } 471 return size; 472 } 473 474 @Override 475 public long getCacheSelectionSize() { 476 long size = 0; 477 for (SessionImpl session : sessions) { 478 size += session.getCacheSelectionSize(); 479 } 480 return size; 481 } 482 483 @Override 484 public void processClusterInvalidationsNext() { 485 // TODO pass through or something 486 } 487 488 @Override 489 public void markReferencedBinaries() { 490 try { 491 SessionImpl conn = getConnection(); 492 try { 493 conn.markReferencedBinaries(); 494 } finally { 495 conn.close(); 496 } 497 } catch (ResourceException e) { 498 throw new RuntimeException(e); 499 } 500 } 501 502 @Override 503 public int cleanupDeletedDocuments(int max, Calendar beforeTime) { 504 if (!repositoryDescriptor.getSoftDeleteEnabled()) { 505 return 0; 506 } 507 try { 508 SessionImpl conn = getConnection(); 509 try { 510 return conn.cleanupDeletedDocuments(max, beforeTime); 511 } finally { 512 conn.close(); 513 } 514 } catch (ResourceException e) { 515 throw new RuntimeException(e); 516 } 517 } 518 519 /* 520 * ----- ----- 521 */ 522 523 // callback by session at close time 524 protected void closeSession(SessionImpl session) { 525 sessions.remove(session); 526 sessionCount.dec(); 527 } 528 529}