001/* 002 * (C) Copyright 2006-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.ecm.core.storage.sql; 021 022import java.io.Serializable; 023import java.security.SecureRandom; 024import java.util.Calendar; 025import java.util.Collection; 026import java.util.Random; 027import java.util.concurrent.CopyOnWriteArrayList; 028 029import javax.naming.Reference; 030import javax.resource.ResourceException; 031import javax.resource.cci.ConnectionSpec; 032import javax.resource.cci.RecordFactory; 033import javax.resource.cci.ResourceAdapterMetaData; 034 035import org.apache.commons.lang3.StringUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.ecm.core.api.repository.FulltextConfiguration; 040import org.nuxeo.ecm.core.model.LockManager; 041import org.nuxeo.ecm.core.storage.lock.LockManagerService; 042import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 043import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCBackend; 044import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCClusterInvalidator; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.metrics.MetricsService; 047 048import com.codahale.metrics.Counter; 049import com.codahale.metrics.Gauge; 050import com.codahale.metrics.MetricRegistry; 051import com.codahale.metrics.SharedMetricRegistries; 052 053/** 054 * {@link Repository} implementation, to be extended by backend-specific initialization code. 055 * 056 * @see RepositoryBackend 057 */ 058public class RepositoryImpl implements Repository { 059 060 private static final long serialVersionUID = 1L; 061 062 private static final Log log = LogFactory.getLog(RepositoryImpl.class); 063 064 private static final Random RANDOM = new SecureRandom(); 065 066 protected final RepositoryDescriptor repositoryDescriptor; 067 068 private RepositoryBackend backend; 069 070 private final Collection<SessionImpl> sessions; 071 072 protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 073 074 protected final Counter repositoryUp; 075 076 protected final Counter sessionCount; 077 078 private LockManager lockManager; 079 080 /** 081 * @since 7.4 : used to know if the LockManager was provided by this repository or externally 082 */ 083 protected boolean selfRegisteredLockManager = false; 084 085 /** Propagator of invalidations to all mappers' caches. */ 086 protected final InvalidationsPropagator invalidationsPropagator; 087 088 private Model model; 089 090 /** 091 * Transient id for this repository assigned by the server on first connection. This is not persisted. 092 */ 093 public String repositoryId; 094 095 public RepositoryImpl(RepositoryDescriptor repositoryDescriptor) { 096 this.repositoryDescriptor = repositoryDescriptor; 097 sessions = new CopyOnWriteArrayList<>(); 098 invalidationsPropagator = new InvalidationsPropagator(); 099 100 repositoryUp = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 101 "instance-up")); 102 repositoryUp.inc(); 103 sessionCount = registry.counter(MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, 104 "sessions")); 105 createMetricsGauges(); 106 107 initRepository(); 108 } 109 110 protected void createMetricsGauges() { 111 String gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "size"); 112 registry.remove(gaugeName); 113 registry.register(gaugeName, new Gauge<Long>() { 114 @Override 115 public Long getValue() { 116 return getCacheSize(); 117 } 118 }); 119 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "pristines"); 120 registry.remove(gaugeName); 121 registry.register(gaugeName, new Gauge<Long>() { 122 @Override 123 public Long getValue() { 124 return getCachePristineSize(); 125 } 126 }); 127 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "selections"); 128 registry.remove(gaugeName); 129 registry.register(gaugeName, new Gauge<Long>() { 130 @Override 131 public Long getValue() { 132 return getCacheSelectionSize(); 133 } 134 }); 135 gaugeName = MetricRegistry.name("nuxeo", "repositories", repositoryDescriptor.name, "caches", "mappers"); 136 registry.remove(gaugeName); 137 registry.register(gaugeName, new Gauge<Long>() { 138 @Override 139 public Long getValue() { 140 return getCacheMapperSize(); 141 } 142 }); 143 } 144 145 protected RepositoryBackend createBackend() { 146 Class<? extends RepositoryBackend> backendClass = repositoryDescriptor.backendClass; 147 if (backendClass == null) { 148 backendClass = JDBCBackend.class; 149 } 150 try { 151 RepositoryBackend backend = backendClass.newInstance(); 152 return backend; 153 } catch (ReflectiveOperationException e) { 154 throw new NuxeoException(e); 155 } 156 } 157 158 protected Mapper createCachingMapper(Model model, Mapper mapper) { 159 try { 160 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 161 if (cachingMapperClass == null) { 162 return mapper; 163 } 164 CachingMapper cachingMapper = cachingMapperClass.newInstance(); 165 cachingMapper.initialize(getName(), model, mapper, invalidationsPropagator, 166 repositoryDescriptor.cachingMapperProperties); 167 return cachingMapper; 168 } catch (ReflectiveOperationException e) { 169 throw new NuxeoException(e); 170 } 171 } 172 173 protected Class<? extends CachingMapper> getCachingMapperClass() { 174 if (!repositoryDescriptor.getCachingMapperEnabled()) { 175 return null; 176 } 177 Class<? extends CachingMapper> cachingMapperClass = repositoryDescriptor.cachingMapperClass; 178 if (cachingMapperClass == null) { 179 // default cache 180 cachingMapperClass = SoftRefCachingMapper.class; 181 } 182 return cachingMapperClass; 183 } 184 185 public RepositoryDescriptor getRepositoryDescriptor() { 186 return repositoryDescriptor; 187 } 188 189 public LockManager getLockManager() { 190 return lockManager; 191 } 192 193 public Model getModel() { 194 return model; 195 } 196 197 public InvalidationsPropagator getInvalidationsPropagator() { 198 return invalidationsPropagator; 199 } 200 201 public boolean isChangeTokenEnabled() { 202 return repositoryDescriptor.isChangeTokenEnabled(); 203 } 204 205 /* 206 * ----- javax.resource.cci.ConnectionFactory ----- 207 */ 208 209 /** 210 * Gets a new connection. 211 * 212 * @param connectionSpec the parameters to use to connect (unused) 213 * @return the session 214 */ 215 @Override 216 public SessionImpl getConnection(ConnectionSpec connectionSpec) { 217 return getConnection(); 218 } 219 220 /** 221 * Gets a new connection. 222 * 223 * @return the session 224 */ 225 @Override 226 public synchronized SessionImpl getConnection() { 227 if (Framework.getRuntime().isShuttingDown()) { 228 throw new IllegalStateException("Cannot open connection, runtime is shutting down"); 229 } 230 SessionPathResolver pathResolver = new SessionPathResolver(); 231 Mapper mapper = newMapper(pathResolver, true); 232 SessionImpl session = newSession(model, mapper); 233 pathResolver.setSession(session); 234 sessions.add(session); 235 sessionCount.inc(); 236 return session; 237 } 238 239 /** 240 * Creates a new mapper. 241 * 242 * @param pathResolver the path resolver (for regular mappers) 243 * @param useInvalidations whether this mapper participates in invalidation propagation (false for lock manager / 244 * cluster invalidator) 245 * @return the new mapper. 246 * @since 7.4 247 */ 248 public Mapper newMapper(PathResolver pathResolver, boolean useInvalidations) { 249 return backend.newMapper(pathResolver, useInvalidations); 250 } 251 252 protected void initRepository() { 253 log.debug("Initializing"); 254 backend = createBackend(); 255 model = backend.initialize(this); 256 initLockManager(); 257 258 // create the cluster invalidator 259 if (repositoryDescriptor.getClusteringEnabled()) { 260 initClusterInvalidator(); 261 } 262 263 // log once which mapper cache is being used 264 Class<? extends CachingMapper> cachingMapperClass = getCachingMapperClass(); 265 if (cachingMapperClass == null) { 266 log.warn("VCS Mapper cache is disabled."); 267 } else { 268 log.info("VCS Mapper cache using: " + cachingMapperClass.getName()); 269 } 270 271 initRootNode(); 272 } 273 274 protected void initRootNode() { 275 try { 276 // access a session once so that SessionImpl.computeRootNode can create the root node 277 getConnection().close(); 278 } catch (ResourceException e) { 279 throw new RuntimeException(e); 280 } 281 } 282 283 protected String getLockManagerName() { 284 // TODO configure in repo descriptor 285 return getName(); 286 } 287 288 protected void initLockManager() { 289 String lockManagerName = getLockManagerName(); 290 LockManagerService lockManagerService = Framework.getService(LockManagerService.class); 291 lockManager = lockManagerService.getLockManager(lockManagerName); 292 if (lockManager == null) { 293 // no descriptor 294 // default to a VCSLockManager 295 lockManager = new VCSLockManager(this); 296 lockManagerService.registerLockManager(lockManagerName, lockManager); 297 selfRegisteredLockManager = true; 298 } else { 299 selfRegisteredLockManager = false; 300 } 301 log.info("Repository " + getName() + " using lock manager " + lockManager); 302 } 303 304 protected void initClusterInvalidator() { 305 String nodeId = repositoryDescriptor.getClusterNodeId(); 306 if (StringUtils.isBlank(nodeId)) { 307 // need a smallish int because of SQL Server legacy node ids 308 nodeId = String.valueOf(RANDOM.nextInt(32768)); 309 log.warn("Missing cluster node id configuration, please define it explicitly (usually through repository.clustering.id). " 310 + "Using random cluster node id instead: " + nodeId); 311 } else { 312 nodeId = nodeId.trim(); 313 } 314 ClusterInvalidator clusterInvalidator = createClusterInvalidator(); 315 clusterInvalidator.initialize(nodeId, this); 316 backend.setClusterInvalidator(clusterInvalidator); 317 } 318 319 protected ClusterInvalidator createClusterInvalidator() { 320 Class<? extends ClusterInvalidator> klass = repositoryDescriptor.clusterInvalidatorClass; 321 if (klass == null) { 322 klass = JDBCClusterInvalidator.class; 323 } 324 try { 325 return klass.newInstance(); 326 } catch (ReflectiveOperationException e) { 327 throw new NuxeoException(e); 328 } 329 } 330 331 protected SessionImpl newSession(Model model, Mapper mapper) { 332 mapper = createCachingMapper(model, mapper); 333 return new SessionImpl(this, model, mapper); 334 } 335 336 public static class SessionPathResolver implements PathResolver { 337 338 private Session session; 339 340 protected void setSession(Session session) { 341 this.session = session; 342 } 343 344 @Override 345 public Serializable getIdForPath(String path) { 346 Node node = session.getNodeByPath(path, null); 347 return node == null ? null : node.getId(); 348 } 349 } 350 351 /* 352 * ----- 353 */ 354 355 @Override 356 public ResourceAdapterMetaData getMetaData() { 357 throw new UnsupportedOperationException(); 358 } 359 360 @Override 361 public RecordFactory getRecordFactory() { 362 throw new UnsupportedOperationException(); 363 } 364 365 /* 366 * ----- javax.resource.Referenceable ----- 367 */ 368 369 private Reference reference; 370 371 @Override 372 public void setReference(Reference reference) { 373 this.reference = reference; 374 } 375 376 @Override 377 public Reference getReference() { 378 return reference; 379 } 380 381 /* 382 * ----- Repository ----- 383 */ 384 385 @Override 386 public synchronized void close() { 387 closeAllSessions(); 388 model = null; 389 backend.shutdown(); 390 391 registry.remove(MetricRegistry.name(RepositoryImpl.class, getName(), "cache-size")); 392 registry.remove(MetricRegistry.name(PersistenceContext.class, getName(), "cache-size")); 393 registry.remove(MetricRegistry.name(SelectionContext.class, getName(), "cache-size")); 394 395 if (selfRegisteredLockManager) { 396 LockManagerService lms = Framework.getService(LockManagerService.class); 397 if (lms != null) { 398 lms.unregisterLockManager(getLockManagerName()); 399 } 400 } 401 } 402 403 protected synchronized void closeAllSessions() { 404 for (SessionImpl session : sessions) { 405 if (!session.isLive()) { 406 continue; 407 } 408 session.closeSession(); 409 } 410 sessions.clear(); 411 sessionCount.dec(sessionCount.getCount()); 412 if (lockManager != null) { 413 lockManager.closeLockManager(); 414 } 415 } 416 417 /* 418 * ----- RepositoryManagement ----- 419 */ 420 421 @Override 422 public String getName() { 423 return repositoryDescriptor.name; 424 } 425 426 @Override 427 public int getActiveSessionsCount() { 428 return sessions.size(); 429 } 430 431 @Override 432 public int clearCaches() { 433 int n = 0; 434 for (SessionImpl session : sessions) { 435 n += session.clearCaches(); 436 } 437 if (lockManager != null) { 438 lockManager.clearLockManagerCaches(); 439 } 440 return n; 441 } 442 443 @Override 444 public long getCacheSize() { 445 long size = 0; 446 for (SessionImpl session : sessions) { 447 size += session.getCacheSize(); 448 } 449 return size; 450 } 451 452 public long getCacheMapperSize() { 453 long size = 0; 454 for (SessionImpl session : sessions) { 455 size += session.getCacheMapperSize(); 456 } 457 return size; 458 } 459 460 @Override 461 public long getCachePristineSize() { 462 long size = 0; 463 for (SessionImpl session : sessions) { 464 size += session.getCachePristineSize(); 465 } 466 return size; 467 } 468 469 @Override 470 public long getCacheSelectionSize() { 471 long size = 0; 472 for (SessionImpl session : sessions) { 473 size += session.getCacheSelectionSize(); 474 } 475 return size; 476 } 477 478 @Override 479 public void processClusterInvalidationsNext() { 480 // TODO pass through or something 481 } 482 483 @Override 484 public void markReferencedBinaries() { 485 try { 486 SessionImpl conn = getConnection(); 487 try { 488 conn.markReferencedBinaries(); 489 } finally { 490 conn.close(); 491 } 492 } catch (ResourceException e) { 493 throw new RuntimeException(e); 494 } 495 } 496 497 @Override 498 public int cleanupDeletedDocuments(int max, Calendar beforeTime) { 499 if (!repositoryDescriptor.getSoftDeleteEnabled()) { 500 return 0; 501 } 502 try { 503 SessionImpl conn = getConnection(); 504 try { 505 return conn.cleanupDeletedDocuments(max, beforeTime); 506 } finally { 507 conn.close(); 508 } 509 } catch (ResourceException e) { 510 throw new RuntimeException(e); 511 } 512 } 513 514 @Override 515 public FulltextConfiguration getFulltextConfiguration() { 516 return model.getFulltextConfiguration(); 517 } 518 519 /* 520 * ----- ----- 521 */ 522 523 // callback by session at close time 524 protected void closeSession(SessionImpl session) { 525 sessions.remove(session); 526 sessionCount.dec(); 527 } 528 529}