001/* 002 * (C) Copyright 2015-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.Deque; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.ecm.core.api.Blob; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch; 034import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 035import org.nuxeo.ecm.core.blob.binary.BinaryManager; 036import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 037import org.nuxeo.ecm.core.model.Document; 038import org.nuxeo.ecm.core.model.Document.BlobAccessor; 039import org.nuxeo.ecm.core.model.Repository; 040import org.nuxeo.ecm.core.repository.RepositoryService; 041import org.nuxeo.runtime.api.Framework; 042import org.nuxeo.runtime.model.ComponentContext; 043import org.nuxeo.runtime.model.ComponentInstance; 044import org.nuxeo.runtime.model.DefaultComponent; 045 046/** 047 * Implementation of the service managing {@link Blob}s associated to a {@link Document} or a repository. 048 * 049 * @since 9.2 050 */ 051public class DocumentBlobManagerComponent extends DefaultComponent implements DocumentBlobManager { 052 053 private static final Log log = LogFactory.getLog(DocumentBlobManagerComponent.class); 054 055 protected static final String XP = "configuration"; 056 057 protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher(); 058 059 protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>(); 060 061 @Override 062 public void deactivate(ComponentContext context) { 063 blobDispatcherDescriptorsRegistry.clear(); 064 } 065 066 @Override 067 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 068 if (XP.equals(extensionPoint)) { 069 if (contribution instanceof BlobDispatcherDescriptor) { 070 registerBlobDispatcher((BlobDispatcherDescriptor) contribution); 071 } else { 072 throw new NuxeoException("Invalid descriptor: " + contribution.getClass()); 073 } 074 } else { 075 throw new NuxeoException("Invalid extension point: " + extensionPoint); 076 } 077 } 078 079 @Override 080 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 081 if (XP.equals(extensionPoint)) { 082 if (contribution instanceof BlobDispatcherDescriptor) { 083 unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution); 084 } 085 } 086 } 087 088 protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) { 089 blobDispatcherDescriptorsRegistry.add(descr); 090 } 091 092 protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) { 093 blobDispatcherDescriptorsRegistry.remove(descr); 094 } 095 096 protected BlobDispatcher getBlobDispatcher() { 097 BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast(); 098 if (descr == null) { 099 return DEFAULT_BLOB_DISPATCHER; 100 } 101 return descr.getBlobDispatcher(); 102 } 103 104 protected BlobProvider getBlobProvider(String providerId) { 105 return Framework.getService(BlobManager.class).getBlobProvider(providerId); 106 } 107 108 protected DocumentBlobProvider getDocumentBlobProvider(Blob blob) { 109 BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob); 110 if (blobProvider instanceof DocumentBlobProvider) { 111 return (DocumentBlobProvider) blobProvider; 112 } 113 return null; 114 } 115 116 /** 117 * {@inheritDoc} 118 * <p> 119 * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob 120 * provider id. 121 */ 122 @Override 123 public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException { 124 String key = blobInfo.key; 125 if (key == null) { 126 return null; 127 } 128 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 129 if (blobProvider == null) { 130 throw new NuxeoException("No registered blob provider for key: " + key); 131 } 132 return blobProvider.readBlob(blobInfo); 133 } 134 135 protected BlobProvider getBlobProvider(String key, String repositoryName) { 136 int colon = key.indexOf(':'); 137 String providerId; 138 if (colon < 0) { 139 // no prefix, use the blob dispatcher to find the blob provider id 140 providerId = getBlobDispatcher().getBlobProvider(repositoryName); 141 } else { 142 // use the prefix as blob provider id 143 providerId = key.substring(0, colon); 144 } 145 return getBlobProvider(providerId); 146 } 147 148 /** 149 * {@inheritDoc} 150 * <p> 151 * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need 152 * to recompute a key. Otherwise, go through the blob provider. 153 */ 154 @Override 155 public String writeBlob(Blob blob, Document doc, String xpath) throws IOException { 156 BlobDispatcher blobDispatcher = getBlobDispatcher(); 157 BlobDispatch dispatch = null; 158 if (blob instanceof ManagedBlob) { 159 ManagedBlob managedBlob = (ManagedBlob) blob; 160 String currentProviderId = managedBlob.getProviderId(); 161 // is the blob non-transient, so that reusing the key is an option? 162 if (!getBlobProvider(currentProviderId).isTransient()) { 163 // is it something we don't have to dispatch? 164 if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) { 165 // not something we have to dispatch, reuse the key 166 return managedBlob.getKey(); 167 } 168 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 169 if (dispatch.providerId.equals(currentProviderId)) { 170 // same provider, just reuse the key 171 return managedBlob.getKey(); 172 } 173 } 174 } 175 if (dispatch == null) { 176 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 177 } 178 BlobProvider blobProvider = getBlobProvider(dispatch.providerId); 179 if (blobProvider == null) { 180 throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId); 181 } 182 String key = blobProvider.writeBlob(blob); 183 if (dispatch.addPrefix) { 184 key = dispatch.providerId + ':' + key; 185 } 186 return key; 187 } 188 189 @Override 190 public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException { 191 DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob); 192 if (blobProvider == null) { 193 return null; 194 } 195 return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc); 196 } 197 198 protected void freezeVersion(BlobAccessor accessor, Document doc) { 199 Blob blob = accessor.getBlob(); 200 DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob); 201 if (blobProvider == null) { 202 return; 203 } 204 try { 205 Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc); 206 if (newBlob != null) { 207 accessor.setBlob(newBlob); 208 } 209 } catch (IOException e) { 210 throw new RuntimeException(e); 211 } 212 } 213 214 @Override 215 public void freezeVersion(Document doc) { 216 // finds all blobs, then ask their providers if there's anything to do on check in 217 doc.visitBlobs(accessor -> freezeVersion(accessor, doc)); 218 } 219 220 @Override 221 public void notifyChanges(Document doc, Set<String> xpaths) { 222 getBlobDispatcher().notifyChanges(doc, xpaths); 223 } 224 225 // find which GCs to use 226 // only GC the binary managers to which we dispatch blobs 227 protected List<BinaryGarbageCollector> getGarbageCollectors() { 228 List<BinaryGarbageCollector> gcs = new LinkedList<>(); 229 for (String providerId : getBlobDispatcher().getBlobProviderIds()) { 230 BlobProvider blobProvider = getBlobProvider(providerId); 231 BinaryManager binaryManager = blobProvider.getBinaryManager(); 232 if (binaryManager != null) { 233 gcs.add(binaryManager.getGarbageCollector()); 234 } 235 } 236 return gcs; 237 } 238 239 @Override 240 public BinaryManagerStatus garbageCollectBinaries(boolean delete) { 241 List<BinaryGarbageCollector> gcs = getGarbageCollectors(); 242 // start gc 243 long start = System.currentTimeMillis(); 244 for (BinaryGarbageCollector gc : gcs) { 245 gc.start(); 246 } 247 // in all repositories, mark referenced binaries 248 // the marking itself will call back into the appropriate gc's mark method 249 RepositoryService repositoryService = Framework.getService(RepositoryService.class); 250 for (String repositoryName : repositoryService.getRepositoryNames()) { 251 Repository repository = repositoryService.getRepository(repositoryName); 252 repository.markReferencedBinaries(); 253 } 254 // stop gc 255 BinaryManagerStatus globalStatus = new BinaryManagerStatus(); 256 for (BinaryGarbageCollector gc : gcs) { 257 gc.stop(delete); 258 BinaryManagerStatus status = gc.getStatus(); 259 globalStatus.numBinaries += status.numBinaries; 260 globalStatus.sizeBinaries += status.sizeBinaries; 261 globalStatus.numBinariesGC += status.numBinariesGC; 262 globalStatus.sizeBinariesGC += status.sizeBinariesGC; 263 } 264 globalStatus.gcDuration = System.currentTimeMillis() - start; 265 return globalStatus; 266 } 267 268 @Override 269 public void markReferencedBinary(String key, String repositoryName) { 270 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 271 BinaryManager binaryManager = blobProvider.getBinaryManager(); 272 if (binaryManager != null) { 273 int colon = key.indexOf(':'); 274 if (colon > 0) { 275 // if the key is in the "providerId:digest" format, keep only the real digest 276 key = key.substring(colon + 1); 277 } 278 binaryManager.getGarbageCollector().mark(key); 279 } else { 280 log.error("Unknown binary manager for key: " + key); 281 } 282 } 283 284 @Override 285 public boolean isBinariesGarbageCollectionInProgress() { 286 for (BinaryGarbageCollector gc : getGarbageCollectors()) { 287 if (gc.isInProgress()) { 288 return true; 289 } 290 } 291 return false; 292 } 293 294}