001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.util.Calendar; 024import java.util.Collection; 025import java.util.Random; 026import java.util.concurrent.CopyOnWriteArrayList; 027 028import javax.naming.Reference; 029import javax.resource.ResourceException; 030import javax.resource.cci.ConnectionSpec; 031import javax.resource.cci.RecordFactory; 032import javax.resource.cci.ResourceAdapterMetaData; 033 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.ecm.core.model.LockManager; 039import org.nuxeo.ecm.core.storage.DefaultFulltextParser; 040import org.nuxeo.ecm.core.storage.FulltextParser; 041import org.nuxeo.ecm.core.storage.lock.LockManagerService; 042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend; 044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.metrics.MetricsService; 047 048import com.codahale.metrics.Counter; 049import com.codahale.metrics.Gauge; 050import com.codahale.metrics.MetricRegistry; 051import com.codahale.metrics.SharedMetricRegistries; 052 053/** 054 * {@link Repository} implementation, to be extended by backend-specific initialization code. 055 * 056 * @see RepositoryBackend 057 */ 058public class RepositoryImpl implements Repository { 059 060 private static final long serialVersionUID = 1L; 061 062 private static final Log log = LogFactory.getLog(RepositoryImpl.class); 063 064 private static final Random RANDOM = new Random(); 065 066 protected final RepositoryDescriptor repositoryDescriptor; 067 068 protected final Class<? extends FulltextParser> fulltextParserClass; 069 070 private RepositoryBackend backend; 071 072 private final Collection<SessionImpl> sessions; 073 074 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 075 076 protected final Counter repositoryUp; 077 078 protected final Counter sessionCount; 079 080 private LockManager lockManager; 081 082 /** 083 * @since 7.4 : used to know if the LockManager was provided by this repository or externally 084 */ 085 protected boolean selfRegisteredLockManager = false; 086 087 /** Propagator of invalidations to all mappers' caches. */ 088 protected final InvalidationsPropagator invalidationsPropagator; 089 090 private Model model; 091 092 /** 093 * Transient id for this repository assigned by the server on first connection. This is not persisted. 094 */ 095 public String repositoryId; 096 097 public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) { 098 this.repositoryDescriptor = repositoryDescriptor; 099 sessions = new CopyOnWriteArrayList<SessionImpl>(); 100 invalidationsPropagator = new InvalidationsPropagator(); 101 102 String className = repositoryDescriptor.getFulltextDescriptor().getFulltextParser(); 103 if (StringUtils.isBlank(className)) { 104 className = DefaultFulltextParser.class.getName(); 105 } 106 Class<?> klass; 107 try { 108 klass = Thread.currentThread().getContextClassLoader().loadClass(className); 109 } catch (ClassNotFoundException e) { 110 throw new NuxeoException("Unknown fulltext parser class: " + className, e); 111 } 112 if (!FulltextParser.class.isAssignableFrom(klass)) { 113 throw new NuxeoException("Invalid fulltext parser class: " + className); 114 } 115 fulltextParserClass = (Class<? extends FulltextParser>) klass; 116 117 repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 118 "instance-up")); 119 repositoryUp.inc(); 120 sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 121 "sessions")); 122 createMetricsGauges(); 123 124 initRepository(); 125 } 126 127 protected void createMetricsGauges() { 128 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size"); 129 registry.remove(gaugeName); 130 registry.register(gaugeName, new Gauge<Long>() { 131 @Override 132 public Long getValue() { 133 return getCacheSize(); 134 } 135 }); 136 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines"); 137 registry.remove(gaugeName); 138 registry.register(gaugeName, new Gauge<Long>() { 139 @Override 140 public Long getValue() { 141 return getCachePristineSize(); 142 } 143 }); 144 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections"); 145 registry.remove(gaugeName); 146 registry.register(gaugeName, new Gauge<Long>() { 147 @Override 148 public Long getValue() { 149 return getCacheSelectionSize(); 150 } 151 }); 152 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers"); 153 registry.remove(gaugeName); 154 registry.register(gaugeName, new Gauge<Long>() { 155 @Override 156 public Long getValue() { 157 return getCacheMapperSize(); 158 } 159 }); 160 } 161 162 protected RepositoryBackend createBackend() { 163 Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass; 164 if (backendClass == null) { 165 backendClass = JDBCBackend.class; 166 } 167 try { 168 RepositoryBackend backend = backendClass.newInstance(); 169 return backend; 170 } catch (ReflectiveOperationException e) { 171 throw new NuxeoException(e); 172 } 173 } 174 175 protected Mapper createCachingMapper(Model model, Mapper mapper) { 176 try { 177 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 178 if (cachingMapperClass == null) { 179 return mapper; 180 } 181 CachingMapper cachingMapper = cachingMapperClass.newInstance(); 182 cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator, 183 repositoryDescriptor.cachingMapperProperties); 184 return cachingMapper; 185 } catch (ReflectiveOperationException e) { 186 throw new NuxeoException(e); 187 } 188 } 189 190 protected Class<? extends CachingMapper> getCachingMapperClass() { 191 if (!repositoryDescriptor.getCachingMapperEnabled()) { 192 return null; 193 } 194 Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass; 195 if (cachingMapperClass == null) { 196 // default cache 197 cachingMapperClass = SoftRefCachingMapper.class; 198 } 199 return cachingMapperClass; 200 } 201 202 public RepositoryDescriptor getRepositoryDescriptor() { 203 return repositoryDescriptor; 204 } 205 206 public LockManager getLockManager() { 207 return lockManager; 208 } 209 210 public Model getModel() { 211 return model; 212 } 213 214 public InvalidationsPropagator getInvalidationsPropagator() { 215 return invalidationsPropagator; 216 } 217 218 public Class<? extends FulltextParser> getFulltextParserClass() { 219 return fulltextParserClass; 220 } 221 222 public boolean isChangeTokenEnabled() { 223 return repositoryDescriptor.isChangeTokenEnabled(); 224 } 225 226 /* 227 * ----- javax.resource.cci.ConnectionFactory ----- 228 */ 229 230 /** 231 * Gets a new connection. 232 * 233 * @param connectionSpec the parameters to use to connect (unused) 234 * @return the session 235 */ 236 @Override 237 public SessionImpl getConnection(ConnectionSpec connectionSpec) { 238 return getConnection(); 239 } 240 241 /** 242 * Gets a new connection. 243 * 244 * @return the session 245 */ 246 @Override 247 public synchronized SessionImpl getConnection() { 248 if (Framework.getRuntime().isShuttingDown()) { 249 throw new IllegalStateException("Cannot open connection, runtime is shutting down"); 250 } 251 SessionPathResolver pathResolver = new SessionPathResolver(); 252 Mapper mapper = newMapper(pathResolver, true); 253 SessionImpl session = newSession(model, mapper); 254 pathResolver.setSession(session); 255 sessions.add(session); 256 sessionCount.inc(); 257 return session; 258 } 259 260 /** 261 * Creates a new mapper. 262 * 263 * @param pathResolver the path resolver (for regular mappers) 264 * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager / 265 * cluster invalidator) 266 * @return the new mapper. 267 * @since 7.4 268 */ 269 public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) { 270 return backend.newMapper(pathResolver, useInvalidations); 271 } 272 273 protected void initRepository() { 274 log.debug("Initializing"); 275 backend = createBackend(); 276 model = backend.initialize(this); 277 initLockManager(); 278 279 // create the cluster invalidator 280 if (repositoryDescriptor.getClusteringEnabled()) { 281 initClusterInvalidator(); 282 } 283 284 // log once which mapper cache is being used 285 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 286 if (cachingMapperClass == null) { 287 log.warn("VCS Mapper cache is disabled."); 288 } else { 289 log.info("VCS Mapper cache using: " + cachingMapperClass.getName()); 290 } 291 292 initRootNode(); 293 } 294 295 protected void initRootNode() { 296 try { 297 // access a session once so that SessionImpl.computeRootNode can create the root node 298 getConnection().close(); 299 } catch (ResourceException e) { 300 throw new RuntimeException(e); 301 } 302 } 303 304 protected String getLockManagerName() { 305 // TODO configure in repo descriptor 306 return getName(); 307 } 308 309 protected void initLockManager() { 310 String lockManagerName = getLockManagerName(); 311 LockManagerService lockManagerService = Framework.getService(LockManagerService.class); 312 lockManager = lockManagerService.getLockManager(lockManagerName); 313 if (lockManager == null) { 314 // no descriptor 315 // default to a VCSLockManager 316 lockManager = new VCSLockManager(this); 317 lockManagerService.registerLockManager(lockManagerName, lockManager); 318 selfRegisteredLockManager = true; 319 } else { 320 selfRegisteredLockManager = false; 321 } 322 log.info("Repository " + getName() + " using lock manager " + lockManager); 323 } 324 325 protected void initClusterInvalidator() { 326 String nodeId = repositoryDescriptor.getClusterNodeId(); 327 if (StringUtils.isBlank(nodeId)) { 328 // need a smallish int because of SQL Server legacy node ids 329 nodeId = String.valueOf(RANDOM.nextInt(32768)); 330 log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). " 331 + "Using random cluster node id instead: " + nodeId); 332 } else { 333 nodeId = nodeId.trim(); 334 } 335 ClusterInvalidator clusterInvalidator = createClusterInvalidator(); 336 clusterInvalidator.initialize(nodeId, this); 337 backend.setClusterInvalidator(clusterInvalidator); 338 } 339 340 protected ClusterInvalidator createClusterInvalidator() { 341 Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass; 342 if (klass == null) { 343 klass = JDBCClusterInvalidator.class; 344 } 345 try { 346 return klass.newInstance(); 347 } catch (ReflectiveOperationException e) { 348 throw new NuxeoException(e); 349 } 350 } 351 352 protected SessionImpl newSession(Model model, Mapper mapper) { 353 mapper = createCachingMapper(model, mapper); 354 return new SessionImpl(this, model, mapper); 355 } 356 357 public static class SessionPathResolver implements PathResolver { 358 359 private Session session; 360 361 protected void setSession(Session session) { 362 this.session = session; 363 } 364 365 @Override 366 public Serializable getIdForPath(String path) { 367 Node node = session.getNodeByPath(path, null); 368 return node == null ? null : node.getId(); 369 } 370 } 371 372 /* 373 * ----- 374 */ 375 376 @Override 377 public ResourceAdapterMetaData getMetaData() { 378 throw new UnsupportedOperationException(); 379 } 380 381 @Override 382 public RecordFactory getRecordFactory() { 383 throw new UnsupportedOperationException(); 384 } 385 386 /* 387 * ----- javax.resource.Referenceable ----- 388 */ 389 390 private Reference reference; 391 392 @Override 393 public void setReference(Reference reference) { 394 this.reference = reference; 395 } 396 397 @Override 398 public Reference getReference() { 399 return reference; 400 } 401 402 /* 403 * ----- Repository ----- 404 */ 405 406 @Override 407 public synchronized void close() { 408 closeAllSessions(); 409 model = null; 410 backend.shutdown(); 411 412 registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size")); 413 registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size")); 414 registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size")); 415 416 if (selfRegisteredLockManager) { 417 LockManagerService lms = Framework.getService(LockManagerService.class); 418 if (lms != null) { 419 lms.unregisterLockManager(getLockManagerName()); 420 } 421 } 422 } 423 424 protected synchronized void closeAllSessions() { 425 for (SessionImpl session : sessions) { 426 if (!session.isLive()) { 427 continue; 428 } 429 session.closeSession(); 430 } 431 sessions.clear(); 432 sessionCount.dec(sessionCount.getCount()); 433 if (lockManager != null) { 434 lockManager.closeLockManager(); 435 } 436 } 437 438 /* 439 * ----- RepositoryManagement ----- 440 */ 441 442 @Override 443 public String getName() { 444 return repositoryDescriptor.name; 445 } 446 447 @Override 448 public int getActiveSessionsCount() { 449 return sessions.size(); 450 } 451 452 @Override 453 public int clearCaches() { 454 int n = 0; 455 for (SessionImpl session : sessions) { 456 n += session.clearCaches(); 457 } 458 if (lockManager != null) { 459 lockManager.clearLockManagerCaches(); 460 } 461 return n; 462 } 463 464 @Override 465 public long getCacheSize() { 466 long size = 0; 467 for (SessionImpl session : sessions) { 468 size += session.getCacheSize(); 469 } 470 return size; 471 } 472 473 public long getCacheMapperSize() { 474 long size = 0; 475 for (SessionImpl session : sessions) { 476 size += session.getCacheMapperSize(); 477 } 478 return size; 479 } 480 481 @Override 482 public long getCachePristineSize() { 483 long size = 0; 484 for (SessionImpl session : sessions) { 485 size += session.getCachePristineSize(); 486 } 487 return size; 488 } 489 490 @Override 491 public long getCacheSelectionSize() { 492 long size = 0; 493 for (SessionImpl session : sessions) { 494 size += session.getCacheSelectionSize(); 495 } 496 return size; 497 } 498 499 @Override 500 public void processClusterInvalidationsNext() { 501 // TODO pass through or something 502 } 503 504 @Override 505 public void markReferencedBinaries() { 506 try { 507 SessionImpl conn = getConnection(); 508 try { 509 conn.markReferencedBinaries(); 510 } finally { 511 conn.close(); 512 } 513 } catch (ResourceException e) { 514 throw new RuntimeException(e); 515 } 516 } 517 518 @Override 519 public int cleanupDeletedDocuments(int max, Calendar beforeTime) { 520 if (!repositoryDescriptor.getSoftDeleteEnabled()) { 521 return 0; 522 } 523 try { 524 SessionImpl conn = getConnection(); 525 try { 526 return conn.cleanupDeletedDocuments(max, beforeTime); 527 } finally { 528 conn.close(); 529 } 530 } catch (ResourceException e) { 531 throw new RuntimeException(e); 532 } 533 } 534 535 /* 536 * ----- ----- 537 */ 538 539 // callback by session at close time 540 protected void closeSession(SessionImpl session) { 541 sessions.remove(session); 542 sessionCount.dec(); 543 } 544 545}