001/* 002 * (C) Copyright 2015-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.blob; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.Deque; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.ecm.core.api.Blob; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch; 034import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 035import org.nuxeo.ecm.core.blob.binary.BinaryManager; 036import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus; 037import org.nuxeo.ecm.core.model.Document; 038import org.nuxeo.ecm.core.model.Document.BlobAccessor; 039import org.nuxeo.ecm.core.model.Repository; 040import org.nuxeo.ecm.core.repository.RepositoryService; 041import org.nuxeo.runtime.api.Framework; 042import org.nuxeo.runtime.model.ComponentContext; 043import org.nuxeo.runtime.model.ComponentInstance; 044import org.nuxeo.runtime.model.DefaultComponent; 045 046/** 047 * Implementation of the service managing {@link Blob}s associated to a {@link Document} or a repository. 048 * 049 * @since 9.2 050 */ 051public class DocumentBlobManagerComponent extends DefaultComponent implements DocumentBlobManager { 052 053 private static final Log log = LogFactory.getLog(DocumentBlobManagerComponent.class); 054 055 protected static final String XP = "configuration"; 056 057 protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher(); 058 059 protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>(); 060 061 @Override 062 public void deactivate(ComponentContext context) { 063 blobDispatcherDescriptorsRegistry.clear(); 064 } 065 066 @Override 067 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 068 if (XP.equals(extensionPoint)) { 069 if (contribution instanceof BlobDispatcherDescriptor) { 070 registerBlobDispatcher((BlobDispatcherDescriptor) contribution); 071 } else { 072 throw new NuxeoException("Invalid descriptor: " + contribution.getClass()); 073 } 074 } else { 075 throw new NuxeoException("Invalid extension point: " + extensionPoint); 076 } 077 } 078 079 @Override 080 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 081 if (XP.equals(extensionPoint)) { 082 if (contribution instanceof BlobDispatcherDescriptor) { 083 unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution); 084 } 085 } 086 } 087 088 protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) { 089 blobDispatcherDescriptorsRegistry.add(descr); 090 } 091 092 protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) { 093 blobDispatcherDescriptorsRegistry.remove(descr); 094 } 095 096 protected BlobDispatcher getBlobDispatcher() { 097 BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast(); 098 if (descr == null) { 099 return DEFAULT_BLOB_DISPATCHER; 100 } 101 return descr.getBlobDispatcher(); 102 } 103 104 protected BlobProvider getBlobProvider(String providerId) { 105 return Framework.getService(BlobManager.class).getBlobProvider(providerId); 106 } 107 108 protected DocumentBlobProvider getDocumentBlobProvider(Blob blob) { 109 BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob); 110 if (blobProvider instanceof DocumentBlobProvider) { 111 return (DocumentBlobProvider) blobProvider; 112 } 113 return null; 114 } 115 116 /** 117 * {@inheritDoc} 118 * <p> 119 * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob 120 * provider id. 121 */ 122 @Override 123 public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException { 124 String key = blobInfo.key; 125 if (key == null) { 126 return null; 127 } 128 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 129 if (blobProvider == null) { 130 throw new NuxeoException("No registered blob provider for key: " + key); 131 } 132 return blobProvider.readBlob(blobInfo); 133 } 134 135 protected BlobProvider getBlobProvider(String key, String repositoryName) { 136 int colon = key.indexOf(':'); 137 String providerId; 138 if (colon < 0) { 139 // no prefix, use the blob dispatcher to find the blob provider id 140 providerId = getBlobDispatcher().getBlobProvider(repositoryName); 141 } else { 142 // use the prefix as blob provider id 143 providerId = key.substring(0, colon); 144 } 145 return getBlobProvider(providerId); 146 } 147 148 /** 149 * {@inheritDoc} 150 * <p> 151 * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need 152 * to recompute a key. Otherwise, go through the blob provider. 153 */ 154 @Override 155 public String writeBlob(Blob blob, Document doc, String xpath) throws IOException { 156 BlobDispatcher blobDispatcher = getBlobDispatcher(); 157 BlobDispatch dispatch = null; 158 if (blob instanceof ManagedBlob) { 159 ManagedBlob managedBlob = (ManagedBlob) blob; 160 String currentProviderId = managedBlob.getProviderId(); 161 // is it something we don't have to dispatch? 162 if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) { 163 // not something we have to dispatch, reuse the key 164 return managedBlob.getKey(); 165 } 166 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 167 if (dispatch.providerId.equals(currentProviderId)) { 168 // same provider, just reuse the key 169 return managedBlob.getKey(); 170 } 171 } 172 if (dispatch == null) { 173 dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath); 174 } 175 BlobProvider blobProvider = getBlobProvider(dispatch.providerId); 176 if (blobProvider == null) { 177 throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId); 178 } 179 String key = blobProvider.writeBlob(blob); 180 if (dispatch.addPrefix) { 181 key = dispatch.providerId + ':' + key; 182 } 183 return key; 184 } 185 186 @Override 187 public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException { 188 DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob); 189 if (blobProvider == null) { 190 return null; 191 } 192 return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc); 193 } 194 195 protected void freezeVersion(BlobAccessor accessor, Document doc) { 196 Blob blob = accessor.getBlob(); 197 DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob); 198 if (blobProvider == null) { 199 return; 200 } 201 try { 202 Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc); 203 if (newBlob != null) { 204 accessor.setBlob(newBlob); 205 } 206 } catch (IOException e) { 207 throw new RuntimeException(e); 208 } 209 } 210 211 @Override 212 public void freezeVersion(Document doc) { 213 // finds all blobs, then ask their providers if there's anything to do on check in 214 doc.visitBlobs(accessor -> freezeVersion(accessor, doc)); 215 } 216 217 @Override 218 public void notifyChanges(Document doc, Set<String> xpaths) { 219 getBlobDispatcher().notifyChanges(doc, xpaths); 220 } 221 222 // find which GCs to use 223 // only GC the binary managers to which we dispatch blobs 224 protected List<BinaryGarbageCollector> getGarbageCollectors() { 225 List<BinaryGarbageCollector> gcs = new LinkedList<>(); 226 for (String providerId : getBlobDispatcher().getBlobProviderIds()) { 227 BlobProvider blobProvider = getBlobProvider(providerId); 228 BinaryManager binaryManager = blobProvider.getBinaryManager(); 229 if (binaryManager != null) { 230 gcs.add(binaryManager.getGarbageCollector()); 231 } 232 } 233 return gcs; 234 } 235 236 @Override 237 public BinaryManagerStatus garbageCollectBinaries(boolean delete) { 238 List<BinaryGarbageCollector> gcs = getGarbageCollectors(); 239 // start gc 240 long start = System.currentTimeMillis(); 241 for (BinaryGarbageCollector gc : gcs) { 242 gc.start(); 243 } 244 // in all repositories, mark referenced binaries 245 // the marking itself will call back into the appropriate gc's mark method 246 RepositoryService repositoryService = Framework.getService(RepositoryService.class); 247 for (String repositoryName : repositoryService.getRepositoryNames()) { 248 Repository repository = repositoryService.getRepository(repositoryName); 249 repository.markReferencedBinaries(); 250 } 251 // stop gc 252 BinaryManagerStatus globalStatus = new BinaryManagerStatus(); 253 for (BinaryGarbageCollector gc : gcs) { 254 gc.stop(delete); 255 BinaryManagerStatus status = gc.getStatus(); 256 globalStatus.numBinaries += status.numBinaries; 257 globalStatus.sizeBinaries += status.sizeBinaries; 258 globalStatus.numBinariesGC += status.numBinariesGC; 259 globalStatus.sizeBinariesGC += status.sizeBinariesGC; 260 } 261 globalStatus.gcDuration = System.currentTimeMillis() - start; 262 return globalStatus; 263 } 264 265 @Override 266 public void markReferencedBinary(String key, String repositoryName) { 267 BlobProvider blobProvider = getBlobProvider(key, repositoryName); 268 BinaryManager binaryManager = blobProvider.getBinaryManager(); 269 if (binaryManager != null) { 270 int colon = key.indexOf(':'); 271 if (colon > 0) { 272 // if the key is in the "providerId:digest" format, keep only the real digest 273 key = key.substring(colon + 1); 274 } 275 binaryManager.getGarbageCollector().mark(key); 276 } else { 277 log.error("Unknown binary manager for key: " + key); 278 } 279 } 280 281 @Override 282 public boolean isBinariesGarbageCollectionInProgress() { 283 for (BinaryGarbageCollector gc : getGarbageCollectors()) { 284 if (gc.isInProgress()) { 285 return true; 286 } 287 } 288 return false; 289 } 290 291}