001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.blob; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.net.URI; 022import java.util.Collections; 023import java.util.Deque; 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import javax.servlet.http.HttpServletRequest; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.Blob; 035import org.nuxeo.ecm.core.api.DocumentModel; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch; 038import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider; 039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 040import org.nuxeo.ecm.core.blob.binary.BinaryManager; 041import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 042import org.nuxeo.ecm.core.model.Document; 043import org.nuxeo.ecm.core.model.Document.BlobAccessor; 044import org.nuxeo.ecm.core.model.Repository; 045import org.nuxeo.ecm.core.repository.RepositoryService; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.model.ComponentContext; 048import org.nuxeo.runtime.model.ComponentInstance; 049import org.nuxeo.runtime.model.DefaultComponent; 050import org.nuxeo.runtime.model.SimpleContributionRegistry; 051 052/** 053 * Implementation of the service managing the storage and retrieval of {@link Blob}s, through internally-registered 054 * {@link BlobProvider}s. 055 * 056 * @since 7.2 057 */ 058public class BlobManagerComponent extends DefaultComponent implements BlobManager { 059 060 private static final Log log = LogFactory.getLog(BlobManagerComponent.class); 061 062 protected static final String XP = "configuration"; 063 064 protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher(); 065 066 protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>(); 067 068 protected BlobProviderDescriptorRegistry blobProviderDescriptorsRegistry = new BlobProviderDescriptorRegistry(); 069 070 protected Map<String, BlobProvider> blobProviders = new HashMap<>(); 071 072 protected static class BlobProviderDescriptorRegistry extends SimpleContributionRegistry<BlobProviderDescriptor> { 073 074 @Override 075 public String getContributionId(BlobProviderDescriptor contrib) { 076 return contrib.name; 077 } 078 079 @Override 080 public BlobProviderDescriptor clone(BlobProviderDescriptor orig) { 081 return new BlobProviderDescriptor(orig); 082 } 083 084 @Override 085 public void merge(BlobProviderDescriptor src, BlobProviderDescriptor dst) { 086 dst.merge(src); 087 } 088 089 @Override 090 public boolean isSupportingMerge() { 091 return true; 092 } 093 094 public void clear() { 095 currentContribs.clear(); 096 } 097 098 public BlobProviderDescriptor getBlobProviderDescriptor(String id) { 099 return getCurrentContribution(id); 100 } 101 } 102 103 @Override 104 public void deactivate(ComponentContext context) { 105 blobDispatcherDescriptorsRegistry.clear(); 106 blobProviderDescriptorsRegistry.clear(); 107 // close each blob provider 108 for (BlobProvider blobProvider : blobProviders.values()) { 109 blobProvider.close(); 110 } 111 blobProviders.clear(); 112 } 113 114 @Override 115 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 116 if (XP.equals(extensionPoint)) { 117 if (contribution instanceof BlobDispatcherDescriptor) { 118 registerBlobDispatcher((BlobDispatcherDescriptor) contribution); 119 } else if (contribution instanceof BlobProviderDescriptor) { 120 registerBlobProvider((BlobProviderDescriptor) contribution); 121 } else { 122 throw new NuxeoException("Invalid descriptor: " + contribution.getClass()); 123 } 124 } else { 125 throw new NuxeoException("Invalid extension point: " + extensionPoint); 126 } 127 } 128 129 @Override 130 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 131 if (XP.equals(extensionPoint)) { 132 if (contribution instanceof BlobDispatcherDescriptor) { 133 unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution); 134 } else if (contribution instanceof BlobProviderDescriptor) { 135 unregisterBlobProvider((BlobProviderDescriptor) contribution); 136 } 137 } 138 } 139 140 protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) { 141 blobDispatcherDescriptorsRegistry.add(descr); 142 } 143 144 protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) { 145 blobDispatcherDescriptorsRegistry.remove(descr); 146 } 147 148 protected BlobDispatcher getBlobDispatcher() { 149 BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast(); 150 if (descr == null) { 151 return DEFAULT_BLOB_DISPATCHER; 152 } 153 return descr.getBlobDispatcher(); 154 } 155 156 // public for tests 157 public void registerBlobProvider(BlobProviderDescriptor descr) { 158 closeOldBlobProvider(descr.name); 159 blobProviderDescriptorsRegistry.addContribution(descr); 160 // lookup now to have immediate feedback on eror 161 getBlobProvider(descr.name); 162 } 163 164 // public for tests 165 public void unregisterBlobProvider(BlobProviderDescriptor descr) { 166 closeOldBlobProvider(descr.name); 167 blobProviderDescriptorsRegistry.removeContribution(descr); 168 } 169 170 /** 171 * We're about to change something about a contributed blob provider. Close the old one. 172 */ 173 protected synchronized void closeOldBlobProvider(String id) { 174 BlobProvider blobProvider = blobProviders.remove(id); 175 if (blobProvider != null) { 176 blobProvider.close(); 177 } 178 } 179 180 @Override 181 public synchronized BlobProvider getBlobProvider(String providerId) { 182 BlobProvider blobProvider = blobProviders.get(providerId); 183 if (blobProvider == null) { 184 BlobProviderDescriptor descr = blobProviderDescriptorsRegistry.getBlobProviderDescriptor(providerId); 185 if (descr == null) { 186 return null; 187 } 188 Class<?> klass = descr.klass; 189 Map<String, String> properties = descr.properties; 190 try { 191 if (BlobProvider.class.isAssignableFrom(klass)) { 192 @SuppressWarnings("unchecked") 193 Class<? extends BlobProvider> blobProviderClass = (Class<? extends BlobProvider>) klass; 194 blobProvider = blobProviderClass.newInstance(); 195 } else if (BinaryManager.class.isAssignableFrom(klass)) { 196 @SuppressWarnings("unchecked") 197 Class<? extends BinaryManager> binaryManagerClass = (Class<? extends BinaryManager>) klass; 198 BinaryManager binaryManager = binaryManagerClass.newInstance(); 199 blobProvider = new BinaryBlobProvider(binaryManager); 200 } else { 201 throw new RuntimeException("Unknown class for blob provider: " + klass); 202 } 203 } catch (ReflectiveOperationException e) { 204 throw new RuntimeException(e); 205 } 206 try { 207 blobProvider.initialize(providerId, properties); 208 } catch (IOException e) { 209 throw new RuntimeException(e); 210 } 211 blobProviders.put(providerId, blobProvider); 212 } 213 return blobProvider; 214 } 215 216 /** 217 * {@inheritDoc} 218 * <p> 219 * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob 220 * provider id. 221 */ 222 @Override 223 public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException { 224 String key = blobInfo.key; 225 if (key == null) { 226 return null; 227 } 228 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 229 if (blobProvider == null) { 230 throw new NuxeoException("No registered blob provider for key: " + key); 231 } 232 return blobProvider.readBlob(blobInfo); 233 } 234 235 protected BlobProvider getBlobProvider(String key, String repositoryName) { 236 int colon = key.indexOf(':'); 237 String providerId; 238 if (colon < 0) { 239 // no prefix, use the blob dispatcher to find the blob provider id 240 providerId = getBlobDispatcher().getBlobProvider(repositoryName); 241 } else { 242 // use the prefix as blob provider id 243 providerId = key.substring(0, colon); 244 } 245 return getBlobProvider(providerId); 246 } 247 248 /** 249 * {@inheritDoc} 250 * <p> 251 * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need 252 * to recompute a key. Otherwise, go through the blob provider. 253 */ 254 @Override 255 public String writeBlob(Blob blob, Document doc) throws IOException { 256 BlobDispatcher blobDispatcher = getBlobDispatcher(); 257 BlobDispatch dispatch = null; 258 if (blob instanceof ManagedBlob) { 259 ManagedBlob managedBlob = (ManagedBlob) blob; 260 String currentProviderId = managedBlob.getProviderId(); 261 // is it something we don't have to dispatch? 262 if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) { 263 // not something we have to dispatch, reuse the key 264 return managedBlob.getKey(); 265 } 266 dispatch = blobDispatcher.getBlobProvider(doc, blob); 267 if (dispatch.providerId.equals(currentProviderId)) { 268 // same provider, just reuse the key 269 return managedBlob.getKey(); 270 } 271 } 272 if (dispatch == null) { 273 dispatch = blobDispatcher.getBlobProvider(doc, blob); 274 } 275 BlobProvider blobProvider = getBlobProvider(dispatch.providerId); 276 if (blobProvider == null) { 277 throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId); 278 } 279 String key = blobProvider.writeBlob(blob, doc); 280 if (dispatch.addPrefix) { 281 key = dispatch.providerId + ':' + key; 282 } 283 return key; 284 } 285 286 @Override 287 public BlobProvider getBlobProvider(Blob blob) { 288 if (!(blob instanceof ManagedBlob)) { 289 return null; 290 } 291 ManagedBlob managedBlob = (ManagedBlob) blob; 292 return getBlobProvider(managedBlob.getProviderId()); 293 } 294 295 @Override 296 public InputStream getStream(Blob blob) throws IOException { 297 BlobProvider blobProvider = getBlobProvider(blob); 298 if (blobProvider == null) { 299 return null; 300 } 301 return blobProvider.getStream((ManagedBlob) blob); 302 } 303 304 @Override 305 public InputStream getThumbnail(Blob blob) throws IOException { 306 BlobProvider blobProvider = getBlobProvider(blob); 307 if (blobProvider == null) { 308 return null; 309 } 310 return blobProvider.getThumbnail((ManagedBlob) blob); 311 } 312 313 @Override 314 public URI getURI(Blob blob, UsageHint hint, HttpServletRequest servletRequest) throws IOException { 315 BlobProvider blobProvider = getBlobProvider(blob); 316 if (blobProvider == null) { 317 return null; 318 } 319 return blobProvider.getURI((ManagedBlob) blob, hint, servletRequest); 320 } 321 322 @Override 323 public Map<String, URI> getAvailableConversions(Blob blob, UsageHint hint) throws IOException { 324 BlobProvider blobProvider = getBlobProvider(blob); 325 if (blobProvider == null) { 326 return Collections.emptyMap(); 327 } 328 return blobProvider.getAvailableConversions((ManagedBlob) blob, hint); 329 } 330 331 @Override 332 public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException { 333 BlobProvider blobProvider = getBlobProvider(blob); 334 if (blobProvider == null) { 335 return null; 336 } 337 return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc); 338 } 339 340 protected void freezeVersion(BlobAccessor accessor, Document doc) { 341 Blob blob = accessor.getBlob(); 342 BlobProvider blobProvider = getBlobProvider(blob); 343 if (blobProvider == null) { 344 return; 345 } 346 try { 347 Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc); 348 if (newBlob != null) { 349 accessor.setBlob(newBlob); 350 } 351 } catch (IOException e) { 352 throw new RuntimeException(e); 353 } 354 } 355 356 @Override 357 public Map<String, BlobProvider> getBlobProviders() { 358 return blobProviders; 359 } 360 361 @Override 362 public void freezeVersion(Document doc) { 363 // finds all blobs, then ask their providers if there's anything to do on check in 364 doc.visitBlobs(accessor -> freezeVersion(accessor, doc)); 365 } 366 367 @Override 368 public void notifyChanges(Document doc, Set<String> xpaths) { 369 getBlobDispatcher().notifyChanges(doc, xpaths); 370 } 371 372 // find which GCs to use 373 // only GC the binary managers to which we dispatch blobs 374 protected List<BinaryGarbageCollector> getGarbageCollectors() { 375 List<BinaryGarbageCollector> gcs = new LinkedList<>(); 376 for (String providerId : getBlobDispatcher().getBlobProviderIds()) { 377 BlobProvider blobProvider = getBlobProvider(providerId); 378 BinaryManager binaryManager = blobProvider.getBinaryManager(); 379 if (binaryManager != null) { 380 gcs.add(binaryManager.getGarbageCollector()); 381 } 382 } 383 return gcs; 384 } 385 386 @Override 387 public BinaryManagerStatus garbageCollectBinaries(boolean delete) { 388 List<BinaryGarbageCollector> gcs = getGarbageCollectors(); 389 // start gc 390 long start = System.currentTimeMillis(); 391 for (BinaryGarbageCollector gc : gcs) { 392 gc.start(); 393 } 394 // in all repositories, mark referenced binaries 395 // the marking itself will call back into the appropriate gc's mark method 396 RepositoryService repositoryService = Framework.getService(RepositoryService.class); 397 for (String repositoryName : repositoryService.getRepositoryNames()) { 398 Repository repository = repositoryService.getRepository(repositoryName); 399 repository.markReferencedBinaries(); 400 } 401 // stop gc 402 BinaryManagerStatus globalStatus = new BinaryManagerStatus(); 403 for (BinaryGarbageCollector gc : gcs) { 404 gc.stop(delete); 405 BinaryManagerStatus status = gc.getStatus(); 406 globalStatus.numBinaries += status.numBinaries; 407 globalStatus.sizeBinaries += status.sizeBinaries; 408 globalStatus.numBinariesGC += status.numBinariesGC; 409 globalStatus.sizeBinariesGC += status.sizeBinariesGC; 410 } 411 globalStatus.gcDuration = System.currentTimeMillis() - start; 412 return globalStatus; 413 } 414 415 @Override 416 public void markReferencedBinary(String key, String repositoryName) { 417 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 418 BinaryManager binaryManager = blobProvider.getBinaryManager(); 419 if (binaryManager != null) { 420 int colon = key.indexOf(':'); 421 if (colon > 0) { 422 // if the key is in the "providerId:digest" format, keep only the real digest 423 key = key.substring(colon + 1); 424 } 425 binaryManager.getGarbageCollector().mark(key); 426 } else { 427 log.error("Unknown binary manager for key: " + key); 428 } 429 } 430 431 @Override 432 public boolean isBinariesGarbageCollectionInProgress() { 433 for (BinaryGarbageCollector gc : getGarbageCollectors()) { 434 if (gc.isInProgress()) { 435 return true; 436 } 437 } 438 return false; 439 } 440 441}