001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.util.Calendar; 024import java.util.Collection; 025import java.util.Random; 026import java.util.concurrent.CopyOnWriteArrayList; 027 028import javax.naming.Reference; 029import javax.resource.ResourceException; 030import javax.resource.cci.ConnectionSpec; 031import javax.resource.cci.RecordFactory; 032import javax.resource.cci.ResourceAdapterMetaData; 033 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.ecm.core.model.LockManager; 039import org.nuxeo.ecm.core.storage.DefaultFulltextParser; 040import org.nuxeo.ecm.core.storage.FulltextParser; 041import org.nuxeo.ecm.core.storage.lock.LockManagerService; 042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend; 044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.metrics.MetricsService; 047 048import com.codahale.metrics.Counter; 049import com.codahale.metrics.Gauge; 050import com.codahale.metrics.MetricRegistry; 051import com.codahale.metrics.SharedMetricRegistries; 052 053/** 054 * {@link Repository} implementation, to be extended by backend-specific initialization code. 055 * 056 * @see RepositoryBackend 057 */ 058public class RepositoryImpl implements Repository { 059 060 private static final long serialVersionUID = 1L; 061 062 private static final Log log = LogFactory.getLog(RepositoryImpl.class); 063 064 private static final Random RANDOM = new Random(); 065 066 protected final RepositoryDescriptor repositoryDescriptor; 067 068 protected final Class<? extends FulltextParser> fulltextParserClass; 069 070 private final RepositoryBackend backend; 071 072 private final Collection<SessionImpl> sessions; 073 074 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 075 076 protected final Counter repositoryUp; 077 078 protected final Counter sessionCount; 079 080 private LockManager lockManager; 081 082 /** 083 * @since 7.4 : used to know if the LockManager was provided by this repository or externally 084 */ 085 protected boolean selfRegisteredLockManager = false; 086 087 /** Propagator of invalidations to all mappers' caches. */ 088 protected final InvalidationsPropagator invalidationsPropagator; 089 090 private Model model; 091 092 /** 093 * Transient id for this repository assigned by the server on first connection. This is not persisted. 094 */ 095 public String repositoryId; 096 097 public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) { 098 this.repositoryDescriptor = repositoryDescriptor; 099 sessions = new CopyOnWriteArrayList<SessionImpl>(); 100 invalidationsPropagator = new InvalidationsPropagator(); 101 102 String className = repositoryDescriptor.getFulltextDescriptor().getFulltextParser(); 103 if (StringUtils.isBlank(className)) { 104 className = DefaultFulltextParser.class.getName(); 105 } 106 Class<?> klass; 107 try { 108 klass = Thread.currentThread().getContextClassLoader().loadClass(className); 109 } catch (ClassNotFoundException e) { 110 throw new NuxeoException("Unknown fulltext parser class: " + className, e); 111 } 112 if (!FulltextParser.class.isAssignableFrom(klass)) { 113 throw new NuxeoException("Invalid fulltext parser class: " + className); 114 } 115 fulltextParserClass = (Class<? extends FulltextParser>) klass; 116 117 backend = createBackend(); 118 repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 119 "instance-up")); 120 repositoryUp.inc(); 121 sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 122 "sessions")); 123 createMetricsGauges(); 124 } 125 126 protected void createMetricsGauges() { 127 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size"); 128 registry.remove(gaugeName); 129 registry.register(gaugeName, new Gauge<Long>() { 130 @Override 131 public Long getValue() { 132 return getCacheSize(); 133 } 134 }); 135 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines"); 136 registry.remove(gaugeName); 137 registry.register(gaugeName, new Gauge<Long>() { 138 @Override 139 public Long getValue() { 140 return getCachePristineSize(); 141 } 142 }); 143 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections"); 144 registry.remove(gaugeName); 145 registry.register(gaugeName, new Gauge<Long>() { 146 @Override 147 public Long getValue() { 148 return getCacheSelectionSize(); 149 } 150 }); 151 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers"); 152 registry.remove(gaugeName); 153 registry.register(gaugeName, new Gauge<Long>() { 154 @Override 155 public Long getValue() { 156 return getCacheMapperSize(); 157 } 158 }); 159 } 160 161 protected RepositoryBackend createBackend() { 162 Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass; 163 if (backendClass == null) { 164 backendClass = JDBCBackend.class; 165 } 166 try { 167 RepositoryBackend backend = backendClass.newInstance(); 168 backend.initialize(this); 169 return backend; 170 } catch (ReflectiveOperationException e) { 171 throw new NuxeoException(e); 172 } 173 } 174 175 protected Mapper createCachingMapper(Model model, Mapper mapper) { 176 try { 177 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 178 if (cachingMapperClass == null) { 179 return mapper; 180 } 181 CachingMapper cachingMapper = cachingMapperClass.newInstance(); 182 cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator, 183 repositoryDescriptor.cachingMapperProperties); 184 return cachingMapper; 185 } catch (ReflectiveOperationException e) { 186 throw new NuxeoException(e); 187 } 188 } 189 190 protected Class<? extends CachingMapper> getCachingMapperClass() { 191 if (!repositoryDescriptor.getCachingMapperEnabled()) { 192 return null; 193 } 194 Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass; 195 if (cachingMapperClass == null) { 196 // default cache 197 cachingMapperClass = SoftRefCachingMapper.class; 198 } 199 return cachingMapperClass; 200 } 201 202 public RepositoryDescriptor getRepositoryDescriptor() { 203 return repositoryDescriptor; 204 } 205 206 public LockManager getLockManager() { 207 return lockManager; 208 } 209 210 public Model getModel() { 211 return model; 212 } 213 214 public InvalidationsPropagator getInvalidationsPropagator() { 215 return invalidationsPropagator; 216 } 217 218 public Class<? extends FulltextParser> getFulltextParserClass() { 219 return fulltextParserClass; 220 } 221 222 /* 223 * ----- javax.resource.cci.ConnectionFactory ----- 224 */ 225 226 /** 227 * Gets a new connection. 228 * 229 * @param connectionSpec the parameters to use to connect (unused) 230 * @return the session 231 */ 232 @Override 233 public SessionImpl getConnection(ConnectionSpec connectionSpec) { 234 return getConnection(); 235 } 236 237 /** 238 * Gets a new connection. 239 * 240 * @return the session 241 */ 242 @Override 243 public synchronized SessionImpl getConnection() { 244 if (Framework.getRuntime().isShuttingDown()) { 245 throw new IllegalStateException("Cannot open connection, runtime is shutting down"); 246 } 247 if (model == null) { 248 initRepository(); 249 } 250 SessionPathResolver pathResolver = new SessionPathResolver(); 251 Mapper mapper = newMapper(pathResolver, true); 252 SessionImpl session = newSession(model, mapper); 253 pathResolver.setSession(session); 254 sessions.add(session); 255 sessionCount.inc(); 256 return session; 257 } 258 259 /** 260 * Creates a new mapper. 261 * 262 * @param pathResolver the path resolver (for regular mappers) 263 * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager / 264 * cluster invalidator) 265 * @return the new mapper. 266 * @since 7.4 267 */ 268 public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) { 269 return backend.newMapper(model, pathResolver, useInvalidations); 270 } 271 272 protected void initRepository() { 273 log.debug("Initializing"); 274 ModelSetup modelSetup = new ModelSetup(); 275 modelSetup.repositoryDescriptor = repositoryDescriptor; 276 backend.initializeModelSetup(modelSetup); 277 model = new Model(modelSetup); 278 backend.initializeModel(model); 279 initLockManager(); 280 281 // create the cluster invalidator 282 if (repositoryDescriptor.getClusteringEnabled()) { 283 initClusterInvalidator(); 284 } 285 286 // log once which mapper cache is being used 287 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 288 if (cachingMapperClass == null) { 289 log.warn("VCS Mapper cache is disabled."); 290 } else { 291 log.info("VCS Mapper cache using: " + cachingMapperClass.getName()); 292 } 293 } 294 295 protected String getLockManagerName() { 296 // TODO configure in repo descriptor 297 return getName(); 298 } 299 300 protected void initLockManager() { 301 String lockManagerName = getLockManagerName(); 302 LockManagerService lockManagerService = Framework.getService(LockManagerService.class); 303 lockManager = lockManagerService.getLockManager(lockManagerName); 304 if (lockManager == null) { 305 // no descriptor 306 // default to a VCSLockManager 307 lockManager = new VCSLockManager(lockManagerName); 308 lockManagerService.registerLockManager(lockManagerName, lockManager); 309 selfRegisteredLockManager = true; 310 } else { 311 selfRegisteredLockManager = false; 312 } 313 log.info("Repository " + getName() + " using lock manager " + lockManager); 314 } 315 316 protected void initClusterInvalidator() { 317 String nodeId = repositoryDescriptor.getClusterNodeId(); 318 if (StringUtils.isBlank(nodeId)) { 319 // need a smallish int because of SQL Server legacy node ids 320 nodeId = String.valueOf(RANDOM.nextInt(32768)); 321 log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). " 322 + "Using random cluster node id instead: " + nodeId); 323 } else { 324 nodeId = nodeId.trim(); 325 } 326 ClusterInvalidator clusterInvalidator = createClusterInvalidator(); 327 clusterInvalidator.initialize(nodeId, this); 328 backend.setClusterInvalidator(clusterInvalidator); 329 } 330 331 protected ClusterInvalidator createClusterInvalidator() { 332 Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass; 333 if (klass == null) { 334 klass = JDBCClusterInvalidator.class; 335 } 336 try { 337 return klass.newInstance(); 338 } catch (ReflectiveOperationException e) { 339 throw new NuxeoException(e); 340 } 341 } 342 343 protected SessionImpl newSession(Model model, Mapper mapper) { 344 mapper = createCachingMapper(model, mapper); 345 return new SessionImpl(this, model, mapper); 346 } 347 348 public static class SessionPathResolver implements PathResolver { 349 350 private Session session; 351 352 protected void setSession(Session session) { 353 this.session = session; 354 } 355 356 @Override 357 public Serializable getIdForPath(String path) { 358 Node node = session.getNodeByPath(path, null); 359 return node == null ? null : node.getId(); 360 } 361 } 362 363 /* 364 * ----- 365 */ 366 367 @Override 368 public ResourceAdapterMetaData getMetaData() { 369 throw new UnsupportedOperationException(); 370 } 371 372 @Override 373 public RecordFactory getRecordFactory() { 374 throw new UnsupportedOperationException(); 375 } 376 377 /* 378 * ----- javax.resource.Referenceable ----- 379 */ 380 381 private Reference reference; 382 383 @Override 384 public void setReference(Reference reference) { 385 this.reference = reference; 386 } 387 388 @Override 389 public Reference getReference() { 390 return reference; 391 } 392 393 /* 394 * ----- Repository ----- 395 */ 396 397 @Override 398 public synchronized void close() { 399 closeAllSessions(); 400 model = null; 401 backend.shutdown(); 402 403 registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size")); 404 registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size")); 405 registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size")); 406 407 if (selfRegisteredLockManager) { 408 LockManagerService lms = Framework.getService(LockManagerService.class); 409 if (lms != null) { 410 lms.unregisterLockManager(getLockManagerName()); 411 } 412 } 413 } 414 415 protected synchronized void closeAllSessions() { 416 for (SessionImpl session : sessions) { 417 if (!session.isLive()) { 418 continue; 419 } 420 session.closeSession(); 421 } 422 sessions.clear(); 423 sessionCount.dec(sessionCount.getCount()); 424 if (lockManager != null) { 425 lockManager.closeLockManager(); 426 } 427 } 428 429 /* 430 * ----- RepositoryManagement ----- 431 */ 432 433 @Override 434 public String getName() { 435 return repositoryDescriptor.name; 436 } 437 438 @Override 439 public int getActiveSessionsCount() { 440 return sessions.size(); 441 } 442 443 @Override 444 public int clearCaches() { 445 int n = 0; 446 for (SessionImpl session : sessions) { 447 n += session.clearCaches(); 448 } 449 if (lockManager != null) { 450 lockManager.clearLockManagerCaches(); 451 } 452 return n; 453 } 454 455 @Override 456 public long getCacheSize() { 457 long size = 0; 458 for (SessionImpl session : sessions) { 459 size += session.getCacheSize(); 460 } 461 return size; 462 } 463 464 public long getCacheMapperSize() { 465 long size = 0; 466 for (SessionImpl session : sessions) { 467 size += session.getCacheMapperSize(); 468 } 469 return size; 470 } 471 472 @Override 473 public long getCachePristineSize() { 474 long size = 0; 475 for (SessionImpl session : sessions) { 476 size += session.getCachePristineSize(); 477 } 478 return size; 479 } 480 481 @Override 482 public long getCacheSelectionSize() { 483 long size = 0; 484 for (SessionImpl session : sessions) { 485 size += session.getCacheSelectionSize(); 486 } 487 return size; 488 } 489 490 @Override 491 public void processClusterInvalidationsNext() { 492 // TODO pass through or something 493 } 494 495 @Override 496 public void markReferencedBinaries() { 497 try { 498 SessionImpl conn = getConnection(); 499 try { 500 conn.markReferencedBinaries(); 501 } finally { 502 conn.close(); 503 } 504 } catch (ResourceException e) { 505 throw new RuntimeException(e); 506 } 507 } 508 509 @Override 510 public int cleanupDeletedDocuments(int max, Calendar beforeTime) { 511 if (!repositoryDescriptor.getSoftDeleteEnabled()) { 512 return 0; 513 } 514 try { 515 SessionImpl conn = getConnection(); 516 try { 517 return conn.cleanupDeletedDocuments(max, beforeTime); 518 } finally { 519 conn.close(); 520 } 521 } catch (ResourceException e) { 522 throw new RuntimeException(e); 523 } 524 } 525 526 /* 527 * ----- ----- 528 */ 529 530 // callback by session at close time 531 protected void closeSession(SessionImpl session) { 532 sessions.remove(session); 533 sessionCount.dec(); 534 } 535 536}