001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.net.URI; 024import java.util.Collections; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import javax.servlet.http.HttpServletRequest; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.api.Blob; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch; 040import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 041import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 042import org.nuxeo.ecm.core.blob.binary.BinaryManager; 043import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 044import org.nuxeo.ecm.core.model.Document; 045import org.nuxeo.ecm.core.model.Document.BlobAccessor; 046import org.nuxeo.ecm.core.model.Repository; 047import org.nuxeo.ecm.core.repository.RepositoryService; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.model.ComponentContext; 050import org.nuxeo.runtime.model.ComponentInstance; 051import org.nuxeo.runtime.model.DefaultComponent; 052import org.nuxeo.runtime.model.SimpleContributionRegistry; 053 054/** 055 * Implementation of the service managing the storage and retrieval of {@link Blob}s, through internally-registered 056 * {@link BlobProvider}s. 057 * 058 * @since 7.2 059 */ 060public class BlobManagerComponent extends DefaultComponent implements BlobManager { 061 062 private static final Log log = LogFactory.getLog(BlobManagerComponent.class); 063 064 protected static final String XP = "configuration"; 065 066 protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher(); 067 068 protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>(); 069 070 protected BlobProviderDescriptorRegistry blobProviderDescriptorsRegistry = new BlobProviderDescriptorRegistry(); 071 072 protected Map<String, BlobProvider> blobProviders = new HashMap<>(); 073 074 protected static class BlobProviderDescriptorRegistry extends SimpleContributionRegistry<BlobProviderDescriptor> { 075 076 @Override 077 public String getContributionId(BlobProviderDescriptor contrib) { 078 return contrib.name; 079 } 080 081 @Override 082 public BlobProviderDescriptor clone(BlobProviderDescriptor orig) { 083 return new BlobProviderDescriptor(orig); 084 } 085 086 @Override 087 public void merge(BlobProviderDescriptor src, BlobProviderDescriptor dst) { 088 dst.merge(src); 089 } 090 091 @Override 092 public boolean isSupportingMerge() { 093 return true; 094 } 095 096 public void clear() { 097 currentContribs.clear(); 098 } 099 100 public BlobProviderDescriptor getBlobProviderDescriptor(String id) { 101 return getCurrentContribution(id); 102 } 103 } 104 105 @Override 106 public void deactivate(ComponentContext context) { 107 blobDispatcherDescriptorsRegistry.clear(); 108 blobProviderDescriptorsRegistry.clear(); 109 // close each blob provider 110 for (BlobProvider blobProvider : blobProviders.values()) { 111 blobProvider.close(); 112 } 113 blobProviders.clear(); 114 } 115 116 @Override 117 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 118 if (XP.equals(extensionPoint)) { 119 if (contribution instanceof BlobDispatcherDescriptor) { 120 registerBlobDispatcher((BlobDispatcherDescriptor) contribution); 121 } else if (contribution instanceof BlobProviderDescriptor) { 122 registerBlobProvider((BlobProviderDescriptor) contribution); 123 } else { 124 throw new NuxeoException("Invalid descriptor: " + contribution.getClass()); 125 } 126 } else { 127 throw new NuxeoException("Invalid extension point: " + extensionPoint); 128 } 129 } 130 131 @Override 132 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 133 if (XP.equals(extensionPoint)) { 134 if (contribution instanceof BlobDispatcherDescriptor) { 135 unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution); 136 } else if (contribution instanceof BlobProviderDescriptor) { 137 unregisterBlobProvider((BlobProviderDescriptor) contribution); 138 } 139 } 140 } 141 142 protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) { 143 blobDispatcherDescriptorsRegistry.add(descr); 144 } 145 146 protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) { 147 blobDispatcherDescriptorsRegistry.remove(descr); 148 } 149 150 protected BlobDispatcher getBlobDispatcher() { 151 BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast(); 152 if (descr == null) { 153 return DEFAULT_BLOB_DISPATCHER; 154 } 155 return descr.getBlobDispatcher(); 156 } 157 158 // public for tests 159 public void registerBlobProvider(BlobProviderDescriptor descr) { 160 closeOldBlobProvider(descr.name); 161 blobProviderDescriptorsRegistry.addContribution(descr); 162 // lookup now to have immediate feedback on eror 163 getBlobProvider(descr.name); 164 } 165 166 // public for tests 167 public void unregisterBlobProvider(BlobProviderDescriptor descr) { 168 closeOldBlobProvider(descr.name); 169 blobProviderDescriptorsRegistry.removeContribution(descr); 170 } 171 172 /** 173 * We're about to change something about a contributed blob provider. Close the old one. 174 */ 175 protected synchronized void closeOldBlobProvider(String id) { 176 BlobProvider blobProvider = blobProviders.remove(id); 177 if (blobProvider != null) { 178 blobProvider.close(); 179 } 180 } 181 182 @Override 183 public synchronized BlobProvider getBlobProvider(String providerId) { 184 BlobProvider blobProvider = blobProviders.get(providerId); 185 if (blobProvider == null) { 186 BlobProviderDescriptor descr = blobProviderDescriptorsRegistry.getBlobProviderDescriptor(providerId); 187 if (descr == null) { 188 return null; 189 } 190 Class<?> klass = descr.klass; 191 Map<String, String> properties = descr.properties; 192 try { 193 if (BlobProvider.class.isAssignableFrom(klass)) { 194 @SuppressWarnings("unchecked") 195 Class<? extends BlobProvider> blobProviderClass = (Class<? extends BlobProvider>) klass; 196 blobProvider = blobProviderClass.newInstance(); 197 } else if (BinaryManager.class.isAssignableFrom(klass)) { 198 @SuppressWarnings("unchecked") 199 Class<? extends BinaryManager> binaryManagerClass = (Class<? extends BinaryManager>) klass; 200 BinaryManager binaryManager = binaryManagerClass.newInstance(); 201 blobProvider = new BinaryBlobProvider(binaryManager); 202 } else { 203 throw new RuntimeException("Unknown class for blob provider: " + klass); 204 } 205 } catch (ReflectiveOperationException e) { 206 throw new RuntimeException(e); 207 } 208 try { 209 blobProvider.initialize(providerId, properties); 210 } catch (IOException e) { 211 throw new RuntimeException(e); 212 } 213 blobProviders.put(providerId, blobProvider); 214 } 215 return blobProvider; 216 } 217 218 /** 219 * {@inheritDoc} 220 * <p> 221 * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob 222 * provider id. 223 */ 224 @Override 225 public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException { 226 String key = blobInfo.key; 227 if (key == null) { 228 return null; 229 } 230 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 231 if (blobProvider == null) { 232 throw new NuxeoException("No registered blob provider for key: " + key); 233 } 234 return blobProvider.readBlob(blobInfo); 235 } 236 237 protected BlobProvider getBlobProvider(String key, String repositoryName) { 238 int colon = key.indexOf(':'); 239 String providerId; 240 if (colon < 0) { 241 // no prefix, use the blob dispatcher to find the blob provider id 242 providerId = getBlobDispatcher().getBlobProvider(repositoryName); 243 } else { 244 // use the prefix as blob provider id 245 providerId = key.substring(0, colon); 246 } 247 return getBlobProvider(providerId); 248 } 249 250 /** 251 * {@inheritDoc} 252 * <p> 253 * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need 254 * to recompute a key. Otherwise, go through the blob provider. 255 */ 256 @Override 257 public String writeBlob(Blob blob, Document doc, String xpath) throws IOException { 258 BlobDispatcher blobDispatcher = getBlobDispatcher(); 259 BlobDispatch dispatch = null; 260 if (blob instanceof ManagedBlob) { 261 ManagedBlob managedBlob = (ManagedBlob) blob; 262 String currentProviderId = managedBlob.getProviderId(); 263 // is it something we don't have to dispatch? 264 if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) { 265 // not something we have to dispatch, reuse the key 266 return managedBlob.getKey(); 267 } 268 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 269 if (dispatch.providerId.equals(currentProviderId)) { 270 // same provider, just reuse the key 271 return managedBlob.getKey(); 272 } 273 } 274 if (dispatch == null) { 275 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 276 } 277 BlobProvider blobProvider = getBlobProvider(dispatch.providerId); 278 if (blobProvider == null) { 279 throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId); 280 } 281 String key = blobProvider.writeBlob(blob, doc); 282 if (dispatch.addPrefix) { 283 key = dispatch.providerId + ':' + key; 284 } 285 return key; 286 } 287 288 @Override 289 public BlobProvider getBlobProvider(Blob blob) { 290 if (!(blob instanceof ManagedBlob)) { 291 return null; 292 } 293 ManagedBlob managedBlob = (ManagedBlob) blob; 294 return getBlobProvider(managedBlob.getProviderId()); 295 } 296 297 @Override 298 public InputStream getStream(Blob blob) throws IOException { 299 BlobProvider blobProvider = getBlobProvider(blob); 300 if (blobProvider == null) { 301 return null; 302 } 303 return blobProvider.getStream((ManagedBlob) blob); 304 } 305 306 @Override 307 public InputStream getThumbnail(Blob blob) throws IOException { 308 BlobProvider blobProvider = getBlobProvider(blob); 309 if (blobProvider == null) { 310 return null; 311 } 312 return blobProvider.getThumbnail((ManagedBlob) blob); 313 } 314 315 @Override 316 public URI getURI(Blob blob, UsageHint hint, HttpServletRequest servletRequest) throws IOException { 317 BlobProvider blobProvider = getBlobProvider(blob); 318 if (blobProvider == null) { 319 return null; 320 } 321 return blobProvider.getURI((ManagedBlob) blob, hint, servletRequest); 322 } 323 324 @Override 325 public Map<String, URI> getAvailableConversions(Blob blob, UsageHint hint) throws IOException { 326 BlobProvider blobProvider = getBlobProvider(blob); 327 if (blobProvider == null) { 328 return Collections.emptyMap(); 329 } 330 return blobProvider.getAvailableConversions((ManagedBlob) blob, hint); 331 } 332 333 @Override 334 public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException { 335 BlobProvider blobProvider = getBlobProvider(blob); 336 if (blobProvider == null) { 337 return null; 338 } 339 return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc); 340 } 341 342 protected void freezeVersion(BlobAccessor accessor, Document doc) { 343 Blob blob = accessor.getBlob(); 344 BlobProvider blobProvider = getBlobProvider(blob); 345 if (blobProvider == null) { 346 return; 347 } 348 try { 349 Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc); 350 if (newBlob != null) { 351 accessor.setBlob(newBlob); 352 } 353 } catch (IOException e) { 354 throw new RuntimeException(e); 355 } 356 } 357 358 @Override 359 public Map<String, BlobProvider> getBlobProviders() { 360 return blobProviders; 361 } 362 363 @Override 364 public void freezeVersion(Document doc) { 365 // finds all blobs, then ask their providers if there's anything to do on check in 366 doc.visitBlobs(accessor -> freezeVersion(accessor, doc)); 367 } 368 369 @Override 370 public void notifyChanges(Document doc, Set<String> xpaths) { 371 getBlobDispatcher().notifyChanges(doc, xpaths); 372 } 373 374 // find which GCs to use 375 // only GC the binary managers to which we dispatch blobs 376 protected List<BinaryGarbageCollector> getGarbageCollectors() { 377 List<BinaryGarbageCollector> gcs = new LinkedList<>(); 378 for (String providerId : getBlobDispatcher().getBlobProviderIds()) { 379 BlobProvider blobProvider = getBlobProvider(providerId); 380 BinaryManager binaryManager = blobProvider.getBinaryManager(); 381 if (binaryManager != null) { 382 gcs.add(binaryManager.getGarbageCollector()); 383 } 384 } 385 return gcs; 386 } 387 388 @Override 389 public BinaryManagerStatus garbageCollectBinaries(boolean delete) { 390 List<BinaryGarbageCollector> gcs = getGarbageCollectors(); 391 // start gc 392 long start = System.currentTimeMillis(); 393 for (BinaryGarbageCollector gc : gcs) { 394 gc.start(); 395 } 396 // in all repositories, mark referenced binaries 397 // the marking itself will call back into the appropriate gc's mark method 398 RepositoryService repositoryService = Framework.getService(RepositoryService.class); 399 for (String repositoryName : repositoryService.getRepositoryNames()) { 400 Repository repository = repositoryService.getRepository(repositoryName); 401 repository.markReferencedBinaries(); 402 } 403 // stop gc 404 BinaryManagerStatus globalStatus = new BinaryManagerStatus(); 405 for (BinaryGarbageCollector gc : gcs) { 406 gc.stop(delete); 407 BinaryManagerStatus status = gc.getStatus(); 408 globalStatus.numBinaries += status.numBinaries; 409 globalStatus.sizeBinaries += status.sizeBinaries; 410 globalStatus.numBinariesGC += status.numBinariesGC; 411 globalStatus.sizeBinariesGC += status.sizeBinariesGC; 412 } 413 globalStatus.gcDuration = System.currentTimeMillis() - start; 414 return globalStatus; 415 } 416 417 @Override 418 public void markReferencedBinary(String key, String repositoryName) { 419 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 420 BinaryManager binaryManager = blobProvider.getBinaryManager(); 421 if (binaryManager != null) { 422 int colon = key.indexOf(':'); 423 if (colon > 0) { 424 // if the key is in the "providerId:digest" format, keep only the real digest 425 key = key.substring(colon + 1); 426 } 427 binaryManager.getGarbageCollector().mark(key); 428 } else { 429 log.error("Unknown binary manager for key: " + key); 430 } 431 } 432 433 @Override 434 public boolean isBinariesGarbageCollectionInProgress() { 435 for (BinaryGarbageCollector gc : getGarbageCollectors()) { 436 if (gc.isInProgress()) { 437 return true; 438 } 439 } 440 return false; 441 } 442 443}