001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.util.Calendar; 024import java.util.Collection; 025import java.util.Random; 026import java.util.concurrent.CopyOnWriteArrayList; 027 028import javax.naming.Reference; 029import javax.resource.ResourceException; 030import javax.resource.cci.ConnectionSpec; 031import javax.resource.cci.RecordFactory; 032import javax.resource.cci.ResourceAdapterMetaData; 033 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.ecm.core.model.LockManager; 039import org.nuxeo.ecm.core.storage.DefaultFulltextParser; 040import org.nuxeo.ecm.core.storage.FulltextParser; 041import org.nuxeo.ecm.core.storage.lock.LockManagerService; 042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend; 044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.metrics.MetricsService; 047 048import com.codahale.metrics.Counter; 049import com.codahale.metrics.Gauge; 050import com.codahale.metrics.MetricRegistry; 051import com.codahale.metrics.SharedMetricRegistries; 052 053/** 054 * {@link Repository} implementation, to be extended by backend-specific initialization code. 055 * 056 * @see RepositoryBackend 057 */ 058public class RepositoryImpl implements Repository { 059 060 private static final long serialVersionUID = 1L; 061 062 private static final Log log = LogFactory.getLog(RepositoryImpl.class); 063 064 private static final Random RANDOM = new Random(); 065 066 protected final RepositoryDescriptor repositoryDescriptor; 067 068 protected final Class<? extends FulltextParser> fulltextParserClass; 069 070 private final RepositoryBackend backend; 071 072 private final Collection<SessionImpl> sessions; 073 074 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 075 076 protected final Counter repositoryUp; 077 078 protected final Counter sessionCount; 079 080 private LockManager lockManager; 081 082 /** 083 * @since 7.4 : used to know if the LockManager was provided by this repository or externally 084 */ 085 protected boolean selfRegisteredLockManager = false; 086 087 /** Propagator of invalidations to all mappers' caches. */ 088 protected final InvalidationsPropagator invalidationsPropagator; 089 090 private Model model; 091 092 /** 093 * Transient id for this repository assigned by the server on first connection. This is not persisted. 094 */ 095 public String repositoryId; 096 097 public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) { 098 this.repositoryDescriptor = repositoryDescriptor; 099 sessions = new CopyOnWriteArrayList<SessionImpl>(); 100 invalidationsPropagator = new InvalidationsPropagator(); 101 102 String className = repositoryDescriptor.getFulltextDescriptor().getFulltextParser(); 103 if (StringUtils.isBlank(className)) { 104 className = DefaultFulltextParser.class.getName(); 105 } 106 Class<?> klass; 107 try { 108 klass = Thread.currentThread().getContextClassLoader().loadClass(className); 109 } catch (ClassNotFoundException e) { 110 throw new NuxeoException("Unknown fulltext parser class: " + className, e); 111 } 112 if (!FulltextParser.class.isAssignableFrom(klass)) { 113 throw new NuxeoException("Invalid fulltext parser class: " + className); 114 } 115 fulltextParserClass = (Class<? extends FulltextParser>) klass; 116 117 backend = createBackend(); 118 repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 119 "instance-up")); 120 repositoryUp.inc(); 121 sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 122 "sessions")); 123 createMetricsGauges(); 124 } 125 126 protected void createMetricsGauges() { 127 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size"); 128 registry.remove(gaugeName); 129 registry.register(gaugeName, new Gauge<Long>() { 130 @Override 131 public Long getValue() { 132 return getCacheSize(); 133 } 134 }); 135 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines"); 136 registry.remove(gaugeName); 137 registry.register(gaugeName, new Gauge<Long>() { 138 @Override 139 public Long getValue() { 140 return getCachePristineSize(); 141 } 142 }); 143 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections"); 144 registry.remove(gaugeName); 145 registry.register(gaugeName, new Gauge<Long>() { 146 @Override 147 public Long getValue() { 148 return getCacheSelectionSize(); 149 } 150 }); 151 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers"); 152 registry.remove(gaugeName); 153 registry.register(gaugeName, new Gauge<Long>() { 154 @Override 155 public Long getValue() { 156 return getCacheMapperSize(); 157 } 158 }); 159 } 160 161 protected RepositoryBackend createBackend() { 162 Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass; 163 if (backendClass == null) { 164 backendClass = JDBCBackend.class; 165 } 166 try { 167 RepositoryBackend backend = backendClass.newInstance(); 168 backend.initialize(this); 169 return backend; 170 } catch (ReflectiveOperationException e) { 171 throw new NuxeoException(e); 172 } 173 } 174 175 protected Mapper createCachingMapper(Model model, Mapper mapper) { 176 try { 177 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 178 if (cachingMapperClass == null) { 179 return mapper; 180 } 181 CachingMapper cachingMapper = cachingMapperClass.newInstance(); 182 cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator, 183 repositoryDescriptor.cachingMapperProperties); 184 return cachingMapper; 185 } catch (ReflectiveOperationException e) { 186 throw new NuxeoException(e); 187 } 188 } 189 190 protected Class<? extends CachingMapper> getCachingMapperClass() { 191 if (!repositoryDescriptor.getCachingMapperEnabled()) { 192 return null; 193 } 194 Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass; 195 if (cachingMapperClass == null) { 196 // default cache 197 cachingMapperClass = SoftRefCachingMapper.class; 198 } 199 return cachingMapperClass; 200 } 201 202 public RepositoryDescriptor getRepositoryDescriptor() { 203 return repositoryDescriptor; 204 } 205 206 public LockManager getLockManager() { 207 return lockManager; 208 } 209 210 public Model getModel() { 211 return model; 212 } 213 214 public InvalidationsPropagator getInvalidationsPropagator() { 215 return invalidationsPropagator; 216 } 217 218 public Class<? extends FulltextParser> getFulltextParserClass() { 219 return fulltextParserClass; 220 } 221 222 public boolean isChangeTokenEnabled() { 223 return repositoryDescriptor.isChangeTokenEnabled(); 224 } 225 226 /* 227 * ----- javax.resource.cci.ConnectionFactory ----- 228 */ 229 230 /** 231 * Gets a new connection. 232 * 233 * @param connectionSpec the parameters to use to connect (unused) 234 * @return the session 235 */ 236 @Override 237 public SessionImpl getConnection(ConnectionSpec connectionSpec) { 238 return getConnection(); 239 } 240 241 /** 242 * Gets a new connection. 243 * 244 * @return the session 245 */ 246 @Override 247 public synchronized SessionImpl getConnection() { 248 if (Framework.getRuntime().isShuttingDown()) { 249 throw new IllegalStateException("Cannot open connection, runtime is shutting down"); 250 } 251 if (model == null) { 252 initRepository(); 253 } 254 SessionPathResolver pathResolver = new SessionPathResolver(); 255 Mapper mapper = newMapper(pathResolver, true); 256 SessionImpl session = newSession(model, mapper); 257 pathResolver.setSession(session); 258 sessions.add(session); 259 sessionCount.inc(); 260 return session; 261 } 262 263 /** 264 * Creates a new mapper. 265 * 266 * @param pathResolver the path resolver (for regular mappers) 267 * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager / 268 * cluster invalidator) 269 * @return the new mapper. 270 * @since 7.4 271 */ 272 public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) { 273 return backend.newMapper(model, pathResolver, useInvalidations); 274 } 275 276 protected void initRepository() { 277 log.debug("Initializing"); 278 ModelSetup modelSetup = new ModelSetup(); 279 modelSetup.repositoryDescriptor = repositoryDescriptor; 280 backend.initializeModelSetup(modelSetup); 281 model = new Model(modelSetup); 282 backend.initializeModel(model); 283 initLockManager(); 284 285 // create the cluster invalidator 286 if (repositoryDescriptor.getClusteringEnabled()) { 287 initClusterInvalidator(); 288 } 289 290 // log once which mapper cache is being used 291 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 292 if (cachingMapperClass == null) { 293 log.warn("VCS Mapper cache is disabled."); 294 } else { 295 log.info("VCS Mapper cache using: " + cachingMapperClass.getName()); 296 } 297 } 298 299 protected String getLockManagerName() { 300 // TODO configure in repo descriptor 301 return getName(); 302 } 303 304 protected void initLockManager() { 305 String lockManagerName = getLockManagerName(); 306 LockManagerService lockManagerService = Framework.getService(LockManagerService.class); 307 lockManager = lockManagerService.getLockManager(lockManagerName); 308 if (lockManager == null) { 309 // no descriptor 310 // default to a VCSLockManager 311 lockManager = new VCSLockManager(lockManagerName); 312 lockManagerService.registerLockManager(lockManagerName, lockManager); 313 selfRegisteredLockManager = true; 314 } else { 315 selfRegisteredLockManager = false; 316 } 317 log.info("Repository " + getName() + " using lock manager " + lockManager); 318 } 319 320 protected void initClusterInvalidator() { 321 String nodeId = repositoryDescriptor.getClusterNodeId(); 322 if (StringUtils.isBlank(nodeId)) { 323 // need a smallish int because of SQL Server legacy node ids 324 nodeId = String.valueOf(RANDOM.nextInt(32768)); 325 log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). " 326 + "Using random cluster node id instead: " + nodeId); 327 } else { 328 nodeId = nodeId.trim(); 329 } 330 ClusterInvalidator clusterInvalidator = createClusterInvalidator(); 331 clusterInvalidator.initialize(nodeId, this); 332 backend.setClusterInvalidator(clusterInvalidator); 333 } 334 335 protected ClusterInvalidator createClusterInvalidator() { 336 Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass; 337 if (klass == null) { 338 klass = JDBCClusterInvalidator.class; 339 } 340 try { 341 return klass.newInstance(); 342 } catch (ReflectiveOperationException e) { 343 throw new NuxeoException(e); 344 } 345 } 346 347 protected SessionImpl newSession(Model model, Mapper mapper) { 348 mapper = createCachingMapper(model, mapper); 349 return new SessionImpl(this, model, mapper); 350 } 351 352 public static class SessionPathResolver implements PathResolver { 353 354 private Session session; 355 356 protected void setSession(Session session) { 357 this.session = session; 358 } 359 360 @Override 361 public Serializable getIdForPath(String path) { 362 Node node = session.getNodeByPath(path, null); 363 return node == null ? null : node.getId(); 364 } 365 } 366 367 /* 368 * ----- 369 */ 370 371 @Override 372 public ResourceAdapterMetaData getMetaData() { 373 throw new UnsupportedOperationException(); 374 } 375 376 @Override 377 public RecordFactory getRecordFactory() { 378 throw new UnsupportedOperationException(); 379 } 380 381 /* 382 * ----- javax.resource.Referenceable ----- 383 */ 384 385 private Reference reference; 386 387 @Override 388 public void setReference(Reference reference) { 389 this.reference = reference; 390 } 391 392 @Override 393 public Reference getReference() { 394 return reference; 395 } 396 397 /* 398 * ----- Repository ----- 399 */ 400 401 @Override 402 public synchronized void close() { 403 closeAllSessions(); 404 model = null; 405 backend.shutdown(); 406 407 registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size")); 408 registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size")); 409 registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size")); 410 411 if (selfRegisteredLockManager) { 412 LockManagerService lms = Framework.getService(LockManagerService.class); 413 if (lms != null) { 414 lms.unregisterLockManager(getLockManagerName()); 415 } 416 } 417 } 418 419 protected synchronized void closeAllSessions() { 420 for (SessionImpl session : sessions) { 421 if (!session.isLive()) { 422 continue; 423 } 424 session.closeSession(); 425 } 426 sessions.clear(); 427 sessionCount.dec(sessionCount.getCount()); 428 if (lockManager != null) { 429 lockManager.closeLockManager(); 430 } 431 } 432 433 /* 434 * ----- RepositoryManagement ----- 435 */ 436 437 @Override 438 public String getName() { 439 return repositoryDescriptor.name; 440 } 441 442 @Override 443 public int getActiveSessionsCount() { 444 return sessions.size(); 445 } 446 447 @Override 448 public int clearCaches() { 449 int n = 0; 450 for (SessionImpl session : sessions) { 451 n += session.clearCaches(); 452 } 453 if (lockManager != null) { 454 lockManager.clearLockManagerCaches(); 455 } 456 return n; 457 } 458 459 @Override 460 public long getCacheSize() { 461 long size = 0; 462 for (SessionImpl session : sessions) { 463 size += session.getCacheSize(); 464 } 465 return size; 466 } 467 468 public long getCacheMapperSize() { 469 long size = 0; 470 for (SessionImpl session : sessions) { 471 size += session.getCacheMapperSize(); 472 } 473 return size; 474 } 475 476 @Override 477 public long getCachePristineSize() { 478 long size = 0; 479 for (SessionImpl session : sessions) { 480 size += session.getCachePristineSize(); 481 } 482 return size; 483 } 484 485 @Override 486 public long getCacheSelectionSize() { 487 long size = 0; 488 for (SessionImpl session : sessions) { 489 size += session.getCacheSelectionSize(); 490 } 491 return size; 492 } 493 494 @Override 495 public void processClusterInvalidationsNext() { 496 // TODO pass through or something 497 } 498 499 @Override 500 public void markReferencedBinaries() { 501 try { 502 SessionImpl conn = getConnection(); 503 try { 504 conn.markReferencedBinaries(); 505 } finally { 506 conn.close(); 507 } 508 } catch (ResourceException e) { 509 throw new RuntimeException(e); 510 } 511 } 512 513 @Override 514 public int cleanupDeletedDocuments(int max, Calendar beforeTime) { 515 if (!repositoryDescriptor.getSoftDeleteEnabled()) { 516 return 0; 517 } 518 try { 519 SessionImpl conn = getConnection(); 520 try { 521 return conn.cleanupDeletedDocuments(max, beforeTime); 522 } finally { 523 conn.close(); 524 } 525 } catch (ResourceException e) { 526 throw new RuntimeException(e); 527 } 528 } 529 530 /* 531 * ----- ----- 532 */ 533 534 // callback by session at close time 535 protected void closeSession(SessionImpl session) { 536 sessions.remove(session); 537 sessionCount.dec(); 538 } 539 540}