001/* 002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.dbs; 020 021import static java.lang.Boolean.FALSE; 022 023import java.lang.reflect.InvocationHandler; 024import java.lang.reflect.Method; 025import java.lang.reflect.Proxy; 026import java.util.ArrayDeque; 027import java.util.Collection; 028import java.util.Deque; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.ConcurrentHashMap; 035 036import javax.naming.NamingException; 037import javax.resource.spi.ConnectionManager; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043 044import org.apache.commons.lang3.StringUtils; 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.utils.ExceptionUtils; 048import org.nuxeo.ecm.core.api.NuxeoException; 049import org.nuxeo.ecm.core.api.repository.FulltextConfiguration; 050import org.nuxeo.ecm.core.api.security.ACE; 051import org.nuxeo.ecm.core.api.security.SecurityConstants; 052import org.nuxeo.ecm.core.api.security.impl.ACLImpl; 053import org.nuxeo.ecm.core.api.security.impl.ACPImpl; 054import org.nuxeo.ecm.core.blob.BlobManager; 055import org.nuxeo.ecm.core.model.Document; 056import org.nuxeo.ecm.core.model.LockManager; 057import org.nuxeo.ecm.core.model.Session; 058import org.nuxeo.ecm.core.schema.DocumentType; 059import org.nuxeo.ecm.core.schema.SchemaManager; 060import org.nuxeo.ecm.core.schema.TypeConstants; 061import org.nuxeo.ecm.core.schema.types.ComplexType; 062import org.nuxeo.ecm.core.schema.types.CompositeType; 063import org.nuxeo.ecm.core.schema.types.Field; 064import org.nuxeo.ecm.core.schema.types.ListType; 065import org.nuxeo.ecm.core.schema.types.Schema; 066import org.nuxeo.ecm.core.schema.types.Type; 067import org.nuxeo.ecm.core.storage.FulltextConfigurationFactory; 068import org.nuxeo.ecm.core.storage.FulltextDescriptor; 069import org.nuxeo.ecm.core.storage.lock.LockManagerService; 070import org.nuxeo.ecm.core.storage.sql.ra.ConnectionFactoryImpl; 071import org.nuxeo.runtime.api.Framework; 072import org.nuxeo.runtime.jtajca.NuxeoContainer; 073import org.nuxeo.runtime.transaction.TransactionHelper; 074 075/** 076 * Provides sharing behavior for repository sessions and other basic functions. 077 * 078 * @since 5.9.4 079 */ 080public abstract class DBSRepositoryBase implements DBSRepository { 081 082 private static final Log log = LogFactory.getLog(DBSRepositoryBase.class); 083 084 public static final String TYPE_ROOT = "Root"; 085 086 // change to have deterministic pseudo-UUID generation for debugging 087 protected final boolean DEBUG_UUIDS = false; 088 089 private static final String UUID_ZERO = "00000000-0000-0000-0000-000000000000"; 090 091 private static final String UUID_ZERO_DEBUG = "UUID_0"; 092 093 /** 094 * Type of id to used for documents. 095 * 096 * @since 8.3 097 */ 098 public enum IdType { 099 /** Random UUID stored in a string. */ 100 varchar, 101 /** Random UUID stored as a native UUID type. */ 102 uuid, 103 /** Integer sequence maintained by the database. */ 104 sequence, 105 } 106 107 /** @since 8.3 */ 108 protected IdType idType; 109 110 protected final String repositoryName; 111 112 protected final FulltextConfiguration fulltextConfiguration; 113 114 protected final BlobManager blobManager; 115 116 protected LockManager lockManager; 117 118 protected final ConnectionManager cm; 119 120 protected final boolean changeTokenEnabled; 121 122 /** 123 * @since 7.4 : used to know if the LockManager was provided by this repository or externally 124 */ 125 protected boolean selfRegisteredLockManager = false; 126 127 public DBSRepositoryBase(ConnectionManager cm, String repositoryName, DBSRepositoryDescriptor descriptor) { 128 this.repositoryName = repositoryName; 129 String idt = descriptor.idType; 130 List<IdType> allowed = getAllowedIdTypes(); 131 if (StringUtils.isBlank(idt)) { 132 idt = allowed.get(0).name(); 133 } 134 try { 135 idType = IdType.valueOf(idt); 136 if (!allowed.contains(idType)) { 137 throw new IllegalArgumentException(); 138 } 139 } catch (IllegalArgumentException e) { 140 throw new NuxeoException("Unknown id type: " + idt + ", allowed: " + allowed); 141 } 142 FulltextDescriptor fulltextDescriptor = descriptor.getFulltextDescriptor(); 143 if (fulltextDescriptor.getFulltextDisabled()) { 144 fulltextConfiguration = null; 145 } else { 146 fulltextConfiguration = FulltextConfigurationFactory.make(fulltextDescriptor); 147 } 148 this.cm = cm; 149 changeTokenEnabled = descriptor.isChangeTokenEnabled(); 150 blobManager = Framework.getService(BlobManager.class); 151 initBlobsPaths(); 152 initLockManager(); 153 } 154 155 /** Gets the allowed id types for this DBS repository. The first one is the default. */ 156 public abstract List<IdType> getAllowedIdTypes(); 157 158 @Override 159 public void shutdown() { 160 try { 161 NuxeoContainer.disposeConnectionManager(cm); 162 } catch (RuntimeException e) { 163 LogFactory.getLog(ConnectionFactoryImpl.class) 164 .warn("cannot dispose connection manager of " + repositoryName); 165 } 166 if (selfRegisteredLockManager) { 167 LockManagerService lms = Framework.getService(LockManagerService.class); 168 if (lms != null) { 169 lms.unregisterLockManager(getLockManagerName()); 170 } 171 } 172 } 173 174 @Override 175 public String getName() { 176 return repositoryName; 177 } 178 179 @Override 180 public FulltextConfiguration getFulltextConfiguration() { 181 return fulltextConfiguration; 182 } 183 184 protected String getLockManagerName() { 185 // TODO configure in repo descriptor 186 return getName(); 187 } 188 189 protected void initLockManager() { 190 String lockManagerName = getLockManagerName(); 191 LockManagerService lockManagerService = Framework.getService(LockManagerService.class); 192 lockManager = lockManagerService.getLockManager(lockManagerName); 193 if (lockManager == null) { 194 // no descriptor, use DBS repository intrinsic lock manager 195 lockManager = this; 196 log.info("Repository " + repositoryName + " using own lock manager"); 197 lockManagerService.registerLockManager(lockManagerName, lockManager); 198 selfRegisteredLockManager = true; 199 } else { 200 selfRegisteredLockManager = false; 201 log.info("Repository " + repositoryName + " using lock manager " + lockManager); 202 } 203 } 204 205 @Override 206 public LockManager getLockManager() { 207 return lockManager; 208 } 209 210 protected abstract void initBlobsPaths(); 211 212 /** Finds the paths for all blobs in all document types. */ 213 protected static abstract class BlobFinder { 214 215 protected final Set<String> schemaDone = new HashSet<>(); 216 217 protected final Deque<String> path = new ArrayDeque<>(); 218 219 public void visit() { 220 SchemaManager schemaManager = Framework.getService(SchemaManager.class); 221 // document types 222 for (DocumentType docType : schemaManager.getDocumentTypes()) { 223 visitSchemas(docType.getSchemas()); 224 } 225 // mixins 226 for (CompositeType type : schemaManager.getFacets()) { 227 visitSchemas(type.getSchemas()); 228 } 229 } 230 231 protected void visitSchemas(Collection<Schema> schemas) { 232 for (Schema schema : schemas) { 233 if (schemaDone.add(schema.getName())) { 234 visitComplexType(schema); 235 } 236 } 237 } 238 239 protected void visitComplexType(ComplexType complexType) { 240 if (TypeConstants.isContentType(complexType)) { 241 recordBlobPath(); 242 return; 243 } 244 for (Field field : complexType.getFields()) { 245 visitField(field); 246 } 247 } 248 249 /** Records a blob path, stored in the {@link #path} field. */ 250 protected abstract void recordBlobPath(); 251 252 protected void visitField(Field field) { 253 Type type = field.getType(); 254 if (type.isSimpleType()) { 255 // scalar 256 // assume no bare binary exists 257 } else if (type.isComplexType()) { 258 // complex property 259 String name = field.getName().getPrefixedName(); 260 path.addLast(name); 261 visitComplexType((ComplexType) type); 262 path.removeLast(); 263 } else { 264 // array or list 265 Type fieldType = ((ListType) type).getFieldType(); 266 if (fieldType.isSimpleType()) { 267 // array 268 // assume no array of bare binaries exist 269 } else { 270 // complex list 271 String name = field.getName().getPrefixedName(); 272 path.addLast(name); 273 visitComplexType((ComplexType) fieldType); 274 path.removeLast(); 275 } 276 } 277 } 278 } 279 280 /** 281 * Initializes the root and its ACP. 282 */ 283 public void initRoot() { 284 Session session = getSession(); 285 Document root = session.importDocument(getRootId(), null, "", TYPE_ROOT, new HashMap<>()); 286 ACLImpl acl = new ACLImpl(); 287 acl.add(new ACE(SecurityConstants.ADMINISTRATORS, SecurityConstants.EVERYTHING, true)); 288 acl.add(new ACE(SecurityConstants.ADMINISTRATOR, SecurityConstants.EVERYTHING, true)); 289 acl.add(new ACE(SecurityConstants.MEMBERS, SecurityConstants.READ, true)); 290 ACPImpl acp = new ACPImpl(); 291 acp.addACL(acl); 292 session.setACP(root, acp, true); 293 session.save(); 294 session.close(); 295 if (TransactionHelper.isTransactionActive()) { 296 TransactionHelper.commitOrRollbackTransaction(); 297 TransactionHelper.startTransaction(); 298 } 299 } 300 301 @Override 302 public String getRootId() { 303 if (DEBUG_UUIDS) { 304 return UUID_ZERO_DEBUG; 305 } 306 switch (idType) { 307 case varchar: 308 case uuid: 309 return UUID_ZERO; 310 case sequence: 311 return "0"; 312 default: 313 throw new UnsupportedOperationException(); 314 } 315 } 316 317 @Override 318 public BlobManager getBlobManager() { 319 return blobManager; 320 } 321 322 @Override 323 public boolean isFulltextDisabled() { 324 return fulltextConfiguration == null; 325 } 326 327 @Override 328 public boolean isFulltextSearchDisabled() { 329 return isFulltextDisabled() || fulltextConfiguration.fulltextSearchDisabled; 330 } 331 332 @Override 333 public boolean isChangeTokenEnabled() { 334 return changeTokenEnabled; 335 } 336 337 @Override 338 public int getActiveSessionsCount() { 339 return transactionContexts.size(); 340 } 341 342 @Override 343 public Session getSession() { 344 return getSession(this); 345 } 346 347 protected Session getSession(DBSRepository repository) { 348 Transaction transaction; 349 try { 350 transaction = TransactionHelper.lookupTransactionManager().getTransaction(); 351 if (transaction == null) { 352 throw new NuxeoException("Missing transaction"); 353 } 354 int status = transaction.getStatus(); 355 if (status != Status.STATUS_ACTIVE && status != Status.STATUS_MARKED_ROLLBACK) { 356 throw new NuxeoException("Transaction in invalid state: " + status); 357 } 358 } catch (SystemException | NamingException e) { 359 throw new NuxeoException("Failed to get transaction", e); 360 } 361 TransactionContext context = transactionContexts.get(transaction); 362 if (context == null) { 363 context = new TransactionContext(transaction, newSession(repository)); 364 context.init(); 365 } 366 return context.newSession(); 367 } 368 369 protected DBSSession newSession(DBSRepository repository) { 370 return new DBSSession(repository); 371 } 372 373 public Map<Transaction, TransactionContext> transactionContexts = new ConcurrentHashMap<>(); 374 375 /** 376 * Context maintained during a transaction, holding the base session used, and all session proxy handles that have 377 * been returned to callers. 378 */ 379 public class TransactionContext implements Synchronization { 380 381 protected final Transaction transaction; 382 383 protected final DBSSession baseSession; 384 385 protected final Set<Session> proxies; 386 387 public TransactionContext(Transaction transaction, DBSSession baseSession) { 388 this.transaction = transaction; 389 this.baseSession = baseSession; 390 proxies = new HashSet<>(); 391 } 392 393 public void init() { 394 transactionContexts.put(transaction, this); 395 begin(); 396 // make sure it's closed (with handles) at transaction end 397 try { 398 transaction.registerSynchronization(this); 399 } catch (RollbackException | SystemException e) { 400 throw new RuntimeException(e); 401 } 402 } 403 404 public Session newSession() { 405 ClassLoader cl = getClass().getClassLoader(); 406 DBSSessionInvoker invoker = new DBSSessionInvoker(this); 407 Session proxy = (Session) Proxy.newProxyInstance(cl, new Class[] { Session.class }, invoker); 408 add(proxy); 409 return proxy; 410 } 411 412 public void add(Session proxy) { 413 proxies.add(proxy); 414 } 415 416 public boolean remove(Object proxy) { 417 return proxies.remove(proxy); 418 } 419 420 public void begin() { 421 baseSession.begin(); 422 } 423 424 @Override 425 public void beforeCompletion() { 426 } 427 428 @Override 429 public void afterCompletion(int status) { 430 if (status == Status.STATUS_COMMITTED) { 431 baseSession.commit(); 432 } else if (status == Status.STATUS_ROLLEDBACK) { 433 baseSession.rollback(); 434 } else { 435 log.error("Unexpected afterCompletion status: " + status); 436 } 437 baseSession.close(); 438 removeTransaction(); 439 } 440 441 protected void removeTransaction() { 442 for (Session proxy : proxies.toArray(new Session[0])) { 443 proxy.close(); // so that users of the session proxy see it's not live anymore 444 } 445 transactionContexts.remove(transaction); 446 } 447 } 448 449 /** 450 * An indirection to a base {@link DBSSession} intercepting {@code close()} to not close the base session until the 451 * transaction itself is closed. 452 */ 453 public static class DBSSessionInvoker implements InvocationHandler { 454 455 private static final String METHOD_HASHCODE = "hashCode"; 456 457 private static final String METHOD_EQUALS = "equals"; 458 459 private static final String METHOD_CLOSE = "close"; 460 461 private static final String METHOD_ISLIVE = "isLive"; 462 463 protected final TransactionContext context; 464 465 protected boolean closed; 466 467 public DBSSessionInvoker(TransactionContext context) { 468 this.context = context; 469 } 470 471 @Override 472 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 473 String methodName = method.getName(); 474 if (methodName.equals(METHOD_HASHCODE)) { 475 return doHashCode(); 476 } 477 if (methodName.equals(METHOD_EQUALS)) { 478 return doEquals(args); 479 } 480 if (methodName.equals(METHOD_CLOSE)) { 481 return doClose(proxy); 482 } 483 if (methodName.equals(METHOD_ISLIVE)) { 484 return doIsLive(); 485 } 486 487 if (closed) { 488 throw new NuxeoException("Cannot use closed connection handle"); 489 } 490 491 try { 492 return method.invoke(context.baseSession, args); 493 } catch (ReflectiveOperationException e) { 494 throw ExceptionUtils.unwrapInvoke(e); 495 } 496 } 497 498 protected Integer doHashCode() { 499 return Integer.valueOf(hashCode()); 500 } 501 502 protected Boolean doEquals(Object[] args) { 503 if (args.length != 1 || args[0] == null) { 504 return FALSE; 505 } 506 Object other = args[0]; 507 if (!(Proxy.isProxyClass(other.getClass()))) { 508 return FALSE; 509 } 510 InvocationHandler otherInvoker = Proxy.getInvocationHandler(other); 511 return Boolean.valueOf(equals(otherInvoker)); 512 } 513 514 protected Object doClose(Object proxy) { 515 closed = true; 516 context.remove(proxy); 517 return null; 518 } 519 520 protected Boolean doIsLive() { 521 if (closed) { 522 return FALSE; 523 } else { 524 return Boolean.valueOf(context.baseSession.isLive()); 525 } 526 } 527 } 528 529}