001/* 002 * (C) Copyright 2015-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.Calendar; 024import java.util.Deque; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Set; 028import java.util.function.Consumer; 029import java.util.function.Supplier; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.api.Blob; 034import org.nuxeo.ecm.core.api.DocumentModel; 035import org.nuxeo.ecm.core.api.DocumentSecurityException; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.api.model.PropertyNotFoundException; 038import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch; 039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 040import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 041import org.nuxeo.ecm.core.model.Document; 042import org.nuxeo.ecm.core.model.Document.BlobAccessor; 043import org.nuxeo.ecm.core.model.Repository; 044import org.nuxeo.ecm.core.repository.RepositoryService; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.model.ComponentContext; 047import org.nuxeo.runtime.model.ComponentInstance; 048import org.nuxeo.runtime.model.DefaultComponent; 049import org.nuxeo.runtime.transaction.TransactionHelper; 050 051/** 052 * Implementation of the service managing {@link Blob}s associated to a {@link Document} or a repository. 053 * 054 * @since 9.2 055 */ 056public class DocumentBlobManagerComponent extends DefaultComponent implements DocumentBlobManager { 057 058 private static final Log log = LogFactory.getLog(DocumentBlobManagerComponent.class); 059 060 protected static final String XP = "configuration"; 061 062 protected static final BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher(); 063 064 protected static final int BINARY_GC_TX_TIMEOUT_SEC = 86_400; // 1 day 065 066 // in these low-level APIs we deal with unprefixed xpaths, so not file:content 067 protected static final String MAIN_BLOB_XPATH = "content"; 068 069 protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>(); 070 071 @Override 072 public void deactivate(ComponentContext context) { 073 blobDispatcherDescriptorsRegistry.clear(); 074 } 075 076 @Override 077 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 078 if (XP.equals(extensionPoint)) { 079 if (contribution instanceof BlobDispatcherDescriptor) { 080 registerBlobDispatcher((BlobDispatcherDescriptor) contribution); 081 } else { 082 throw new NuxeoException("Invalid descriptor: " + contribution.getClass()); 083 } 084 } else { 085 throw new NuxeoException("Invalid extension point: " + extensionPoint); 086 } 087 } 088 089 @Override 090 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 091 if (XP.equals(extensionPoint)) { 092 if (contribution instanceof BlobDispatcherDescriptor) { 093 unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution); 094 } 095 } 096 } 097 098 protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) { 099 blobDispatcherDescriptorsRegistry.add(descr); 100 } 101 102 protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) { 103 blobDispatcherDescriptorsRegistry.remove(descr); 104 } 105 106 protected BlobDispatcher getBlobDispatcher() { 107 BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast(); 108 if (descr == null) { 109 return DEFAULT_BLOB_DISPATCHER; 110 } 111 return descr.getBlobDispatcher(); 112 } 113 114 protected BlobProvider getBlobProvider(String providerId) { 115 return Framework.getService(BlobManager.class).getBlobProvider(providerId); 116 } 117 118 protected DocumentBlobProvider getDocumentBlobProvider(Blob blob) { 119 BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob); 120 if (blobProvider instanceof DocumentBlobProvider) { 121 return (DocumentBlobProvider) blobProvider; 122 } 123 return null; 124 } 125 126 /** 127 * {@inheritDoc} 128 * <p> 129 * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob 130 * provider id. 131 */ 132 @Override 133 public Blob readBlob(BlobInfo blobInfo, Document doc, String xpath) throws IOException { 134 return readBlob(blobInfo, doc, xpath, doc.getRepositoryName()); 135 } 136 137 // helper used while deprecated signature below is kept 138 protected Blob readBlob(BlobInfo blobInfo, Document doc, String xpath, String repositoryName) throws IOException { 139 String key = blobInfo.key; 140 if (key == null) { 141 return null; 142 } 143 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 144 if (blobProvider == null) { 145 throw new NuxeoException("No registered blob provider for key: " + key); 146 } 147 return blobProvider.readBlob(new BlobInfoContext(blobInfo, doc, xpath)); 148 } 149 150 /** 151 * {@inheritDoc} 152 * <p> 153 * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob 154 * provider id. 155 * 156 * @deprecated since 11.1, use {@link #readBlob(BlobInfo, Document, String)} instead 157 */ 158 @Deprecated 159 @Override 160 public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException { 161 return readBlob(blobInfo, null, null, repositoryName); 162 } 163 164 protected BlobProvider getBlobProvider(String key, String repositoryName) { 165 int colon = key.indexOf(':'); 166 String providerId; 167 if (colon < 0) { 168 // no prefix, use the blob dispatcher to find the blob provider id 169 providerId = getBlobDispatcher().getBlobProvider(repositoryName); 170 } else { 171 // use the prefix as blob provider id 172 providerId = key.substring(0, colon); 173 } 174 return getBlobProvider(providerId); 175 } 176 177 /** 178 * {@inheritDoc} 179 * <p> 180 * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need 181 * to recompute a key. Otherwise, go through the blob provider. 182 */ 183 @Override 184 public String writeBlob(Blob blob, Document doc, String xpath) throws IOException { 185 if (blob == null) { 186 if (MAIN_BLOB_XPATH.equals(xpath) && doc.isUnderRetentionOrLegalHold()) { 187 throw new DocumentSecurityException( 188 "Cannot delete blob from document " + doc.getUUID() + ", it is under retention / hold"); 189 } 190 return null; 191 } 192 193 BlobDispatcher blobDispatcher = getBlobDispatcher(); 194 BlobDispatch dispatch = null; 195 if (blob instanceof ManagedBlob) { 196 ManagedBlob managedBlob = (ManagedBlob) blob; 197 String currentProviderId = managedBlob.getProviderId(); 198 // is the blob non-transient, so that reusing the key is an option? 199 if (!getBlobProvider(currentProviderId).isTransient()) { 200 // is it something we don't have to dispatch? 201 if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) { 202 // not something we have to dispatch, reuse the key 203 return managedBlob.getKey(); 204 } 205 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 206 if (dispatch.providerId.equals(currentProviderId)) { 207 // same provider, just reuse the key 208 return managedBlob.getKey(); 209 } 210 } 211 } 212 if (dispatch == null) { 213 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 214 } 215 BlobProvider blobProvider = getBlobProvider(dispatch.providerId); 216 if (blobProvider == null) { 217 throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId); 218 } 219 if (MAIN_BLOB_XPATH.equals(xpath) && blobProvider.isRecordMode() && doc.isUnderRetentionOrLegalHold()) { 220 throw new DocumentSecurityException( 221 "Cannot change blob from document " + doc.getUUID() + ", it is under retention / hold"); 222 } 223 String key = blobProvider.writeBlob(new BlobContext(blob, doc, xpath)); 224 if (dispatch.addPrefix) { 225 key = dispatch.providerId + ':' + key; 226 } 227 return key; 228 } 229 230 @Override 231 public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException { 232 DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob); 233 if (blobProvider == null) { 234 return null; 235 } 236 return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc); 237 } 238 239 protected void freezeVersion(BlobAccessor accessor, Document doc) { 240 Blob blob = accessor.getBlob(); 241 DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob); 242 if (blobProvider == null) { 243 return; 244 } 245 try { 246 Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc); 247 if (newBlob != null) { 248 accessor.setBlob(newBlob); 249 } 250 } catch (IOException e) { 251 throw new NuxeoException(e); 252 } 253 } 254 255 @Override 256 public void freezeVersion(Document doc) { 257 // finds all blobs, then ask their providers if there's anything to do on check in 258 doc.visitBlobs(accessor -> freezeVersion(accessor, doc)); 259 } 260 261 @Override 262 public void notifyChanges(Document doc, Set<String> xpaths) { 263 getBlobDispatcher().notifyChanges(doc, xpaths); 264 } 265 266 @Override 267 public void notifyMakeRecord(Document doc) { 268 getBlobDispatcher().notifyMakeRecord(doc); 269 } 270 271 @Override 272 public void notifyAfterCopy(Document doc) { 273 getBlobDispatcher().notifyAfterCopy(doc); 274 } 275 276 @Override 277 public void notifyBeforeRemove(Document doc) { 278 getBlobDispatcher().notifyBeforeRemove(doc); 279 } 280 281 @Override 282 public void notifySetRetainUntil(Document doc, Calendar retainUntil) { 283 updateBlob(doc, context -> context.withUpdateRetainUntil(retainUntil)); 284 } 285 286 @Override 287 public void notifySetLegalHold(Document doc, boolean hold) { 288 updateBlob(doc, context -> context.withUpdateLegalHold(hold)); 289 } 290 291 public void updateBlob(Document doc, Consumer<BlobUpdateContext> contextFiller) { 292 ManagedBlob blob = getMainBlob(doc); 293 if (blob == null) { 294 return; 295 } 296 BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob); 297 if (blobProvider == null) { 298 log.error("No blob provider found for blob: " + blob.getKey()); 299 return; 300 } 301 try { 302 String key = stripBlobKeyPrefix(blob.getKey()); 303 BlobUpdateContext blobUpdateContext = new BlobUpdateContext(key); 304 contextFiller.accept(blobUpdateContext); 305 blobProvider.updateBlob(blobUpdateContext); 306 } catch (IOException e) { 307 throw new NuxeoException(e); 308 } 309 } 310 311 protected ManagedBlob getMainBlob(Document doc) { 312 Blob blob; 313 try { 314 blob = (Blob) doc.getValue(MAIN_BLOB_XPATH); 315 } catch (PropertyNotFoundException | ClassCastException e) { 316 // not a standard file schema 317 return null; 318 } 319 if (blob == null) { 320 // no blob in this document 321 return null; 322 } 323 if (blob instanceof ManagedBlob) { 324 return (ManagedBlob) blob; 325 } 326 log.error("Blob is not managed: " + blob); 327 return null; 328 } 329 330 // TODO restore to version also changes the blob 331 332 protected String stripBlobKeyPrefix(String key) { 333 int colon = key.indexOf(':'); 334 if (colon >= 0) { 335 key = key.substring(colon + 1); 336 } 337 return key; 338 } 339 340 // find which GCs to use 341 // only GC the binary managers to which we dispatch blobs 342 protected List<BinaryGarbageCollector> getGarbageCollectors() { 343 List<BinaryGarbageCollector> gcs = new LinkedList<>(); 344 for (String providerId : getBlobDispatcher().getBlobProviderIds()) { 345 BlobProvider blobProvider = getBlobProvider(providerId); 346 BinaryGarbageCollector gc = blobProvider.getBinaryGarbageCollector(); 347 if (gc != null) { 348 gcs.add(gc); 349 } 350 } 351 return gcs; 352 } 353 354 @Override 355 public BinaryManagerStatus garbageCollectBinaries(boolean delete) { 356 // do the GC in a long-running transaction to avoid timeouts 357 return runInTransaction(() -> { 358 List<BinaryGarbageCollector> gcs = getGarbageCollectors(); 359 // start gc 360 long start = System.currentTimeMillis(); 361 for (BinaryGarbageCollector gc : gcs) { 362 gc.start(); 363 } 364 // in all repositories, mark referenced binaries 365 // the marking itself will call back into the appropriate gc's mark method 366 RepositoryService repositoryService = Framework.getService(RepositoryService.class); 367 for (String repositoryName : repositoryService.getRepositoryNames()) { 368 Repository repository = repositoryService.getRepository(repositoryName); 369 repository.markReferencedBinaries(); 370 } 371 // stop gc 372 BinaryManagerStatus globalStatus = new BinaryManagerStatus(); 373 for (BinaryGarbageCollector gc : gcs) { 374 gc.stop(delete); 375 BinaryManagerStatus status = gc.getStatus(); 376 globalStatus.numBinaries += status.numBinaries; 377 globalStatus.sizeBinaries += status.sizeBinaries; 378 globalStatus.numBinariesGC += status.numBinariesGC; 379 globalStatus.sizeBinariesGC += status.sizeBinariesGC; 380 } 381 globalStatus.gcDuration = System.currentTimeMillis() - start; 382 return globalStatus; 383 }, BINARY_GC_TX_TIMEOUT_SEC); 384 } 385 386 /** 387 * Runs the given {@link Supplier} in a transaction with the given {@code timeout}. 388 * 389 * @since 11.1 390 */ 391 protected <R> R runInTransaction(Supplier<R> supplier, int timeout) { 392 if (TransactionHelper.isTransactionMarkedRollback()) { 393 throw new NuxeoException("Cannot run supplier when current transaction is marked rollback."); 394 } 395 boolean txActive = TransactionHelper.isTransactionActive(); 396 boolean txStarted = false; 397 try { 398 if (txActive) { 399 TransactionHelper.commitOrRollbackTransaction(); 400 } 401 txStarted = TransactionHelper.startTransaction(timeout); 402 return supplier.get(); 403 } finally { 404 if (txStarted) { 405 TransactionHelper.commitOrRollbackTransaction(); 406 } 407 if (txActive) { 408 // go back to default transaction timeout 409 TransactionHelper.startTransaction(); 410 } 411 } 412 } 413 414 @Override 415 public void markReferencedBinary(String key, String repositoryName) { 416 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 417 BinaryGarbageCollector gc = blobProvider.getBinaryGarbageCollector(); 418 if (gc != null) { 419 key = stripBlobKeyPrefix(key); 420 gc.mark(key); 421 } else { 422 log.error("Unknown binary manager for key: " + key); 423 } 424 } 425 426 @Override 427 public boolean isBinariesGarbageCollectionInProgress() { 428 for (BinaryGarbageCollector gc : getGarbageCollectors()) { 429 if (gc.isInProgress()) { 430 return true; 431 } 432 } 433 return false; 434 } 435 436}