001/*
002 * (C) Copyright 2015-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.blob;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.Deque;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.ecm.core.api.Blob;
031import org.nuxeo.ecm.core.api.DocumentModel;
032import org.nuxeo.ecm.core.api.NuxeoException;
033import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch;
034import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
035import org.nuxeo.ecm.core.blob.binary.BinaryManager;
036import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
037import org.nuxeo.ecm.core.model.Document;
038import org.nuxeo.ecm.core.model.Document.BlobAccessor;
039import org.nuxeo.ecm.core.model.Repository;
040import org.nuxeo.ecm.core.repository.RepositoryService;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.model.ComponentContext;
043import org.nuxeo.runtime.model.ComponentInstance;
044import org.nuxeo.runtime.model.DefaultComponent;
045
046/**
047 * Implementation of the service managing {@link Blob}s associated to a {@link Document} or a repository.
048 *
049 * @since 9.2
050 */
051public class DocumentBlobManagerComponent extends DefaultComponent implements DocumentBlobManager {
052
053    private static final Log log = LogFactory.getLog(DocumentBlobManagerComponent.class);
054
055    protected static final String XP = "configuration";
056
057    protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher();
058
059    protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>();
060
061    @Override
062    public void deactivate(ComponentContext context) {
063        blobDispatcherDescriptorsRegistry.clear();
064    }
065
066    @Override
067    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
068        if (XP.equals(extensionPoint)) {
069            if (contribution instanceof BlobDispatcherDescriptor) {
070                registerBlobDispatcher((BlobDispatcherDescriptor) contribution);
071            } else {
072                throw new NuxeoException("Invalid descriptor: " + contribution.getClass());
073            }
074        } else {
075            throw new NuxeoException("Invalid extension point: " + extensionPoint);
076        }
077    }
078
079    @Override
080    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
081        if (XP.equals(extensionPoint)) {
082            if (contribution instanceof BlobDispatcherDescriptor) {
083                unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution);
084            }
085        }
086    }
087
088    protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) {
089        blobDispatcherDescriptorsRegistry.add(descr);
090    }
091
092    protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) {
093        blobDispatcherDescriptorsRegistry.remove(descr);
094    }
095
096    protected BlobDispatcher getBlobDispatcher() {
097        BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast();
098        if (descr == null) {
099            return DEFAULT_BLOB_DISPATCHER;
100        }
101        return descr.getBlobDispatcher();
102    }
103
104    protected BlobProvider getBlobProvider(String providerId) {
105        return Framework.getService(BlobManager.class).getBlobProvider(providerId);
106    }
107
108    protected DocumentBlobProvider getDocumentBlobProvider(Blob blob) {
109        BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob);
110        if (blobProvider instanceof DocumentBlobProvider) {
111            return (DocumentBlobProvider) blobProvider;
112        }
113        return null;
114    }
115
116    /**
117     * {@inheritDoc}
118     * <p>
119     * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob
120     * provider id.
121     */
122    @Override
123    public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException {
124        String key = blobInfo.key;
125        if (key == null) {
126            return null;
127        }
128        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
129        if (blobProvider == null) {
130            throw new NuxeoException("No registered blob provider for key: " + key);
131        }
132        return blobProvider.readBlob(blobInfo);
133    }
134
135    protected BlobProvider getBlobProvider(String key, String repositoryName) {
136        int colon = key.indexOf(':');
137        String providerId;
138        if (colon < 0) {
139            // no prefix, use the blob dispatcher to find the blob provider id
140            providerId = getBlobDispatcher().getBlobProvider(repositoryName);
141        } else {
142            // use the prefix as blob provider id
143            providerId = key.substring(0, colon);
144        }
145        return getBlobProvider(providerId);
146    }
147
148    /**
149     * {@inheritDoc}
150     * <p>
151     * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need
152     * to recompute a key. Otherwise, go through the blob provider.
153     */
154    @Override
155    public String writeBlob(Blob blob, Document doc, String xpath) throws IOException {
156        BlobDispatcher blobDispatcher = getBlobDispatcher();
157        BlobDispatch dispatch = null;
158        if (blob instanceof ManagedBlob) {
159            ManagedBlob managedBlob = (ManagedBlob) blob;
160            String currentProviderId = managedBlob.getProviderId();
161            // is it something we don't have to dispatch?
162            if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) {
163                // not something we have to dispatch, reuse the key
164                return managedBlob.getKey();
165            }
166            dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath);
167            if (dispatch.providerId.equals(currentProviderId)) {
168                // same provider, just reuse the key
169                return managedBlob.getKey();
170            }
171        }
172        if (dispatch == null) {
173            dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath);
174        }
175        BlobProvider blobProvider = getBlobProvider(dispatch.providerId);
176        if (blobProvider == null) {
177            throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId);
178        }
179        String key = blobProvider.writeBlob(blob);
180        if (dispatch.addPrefix) {
181            key = dispatch.providerId + ':' + key;
182        }
183        return key;
184    }
185
186    @Override
187    public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException {
188        DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob);
189        if (blobProvider == null) {
190            return null;
191        }
192        return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc);
193    }
194
195    protected void freezeVersion(BlobAccessor accessor, Document doc) {
196        Blob blob = accessor.getBlob();
197        DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob);
198        if (blobProvider == null) {
199            return;
200        }
201        try {
202            Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc);
203            if (newBlob != null) {
204                accessor.setBlob(newBlob);
205            }
206        } catch (IOException e) {
207            throw new RuntimeException(e);
208        }
209    }
210
211    @Override
212    public void freezeVersion(Document doc) {
213        // finds all blobs, then ask their providers if there's anything to do on check in
214        doc.visitBlobs(accessor -> freezeVersion(accessor, doc));
215    }
216
217    @Override
218    public void notifyChanges(Document doc, Set<String> xpaths) {
219        getBlobDispatcher().notifyChanges(doc, xpaths);
220    }
221
222    // find which GCs to use
223    // only GC the binary managers to which we dispatch blobs
224    protected List<BinaryGarbageCollector> getGarbageCollectors() {
225        List<BinaryGarbageCollector> gcs = new LinkedList<>();
226        for (String providerId : getBlobDispatcher().getBlobProviderIds()) {
227            BlobProvider blobProvider = getBlobProvider(providerId);
228            BinaryManager binaryManager = blobProvider.getBinaryManager();
229            if (binaryManager != null) {
230                gcs.add(binaryManager.getGarbageCollector());
231            }
232        }
233        return gcs;
234    }
235
236    @Override
237    public BinaryManagerStatus garbageCollectBinaries(boolean delete) {
238        List<BinaryGarbageCollector> gcs = getGarbageCollectors();
239        // start gc
240        long start = System.currentTimeMillis();
241        for (BinaryGarbageCollector gc : gcs) {
242            gc.start();
243        }
244        // in all repositories, mark referenced binaries
245        // the marking itself will call back into the appropriate gc's mark method
246        RepositoryService repositoryService = Framework.getService(RepositoryService.class);
247        for (String repositoryName : repositoryService.getRepositoryNames()) {
248            Repository repository = repositoryService.getRepository(repositoryName);
249            repository.markReferencedBinaries();
250        }
251        // stop gc
252        BinaryManagerStatus globalStatus = new BinaryManagerStatus();
253        for (BinaryGarbageCollector gc : gcs) {
254            gc.stop(delete);
255            BinaryManagerStatus status = gc.getStatus();
256            globalStatus.numBinaries += status.numBinaries;
257            globalStatus.sizeBinaries += status.sizeBinaries;
258            globalStatus.numBinariesGC += status.numBinariesGC;
259            globalStatus.sizeBinariesGC += status.sizeBinariesGC;
260        }
261        globalStatus.gcDuration = System.currentTimeMillis() - start;
262        return globalStatus;
263    }
264
265    @Override
266    public void markReferencedBinary(String key, String repositoryName) {
267        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
268        BinaryManager binaryManager = blobProvider.getBinaryManager();
269        if (binaryManager != null) {
270            int colon = key.indexOf(':');
271            if (colon > 0) {
272                // if the key is in the "providerId:digest" format, keep only the real digest
273                key = key.substring(colon + 1);
274            }
275            binaryManager.getGarbageCollector().mark(key);
276        } else {
277            log.error("Unknown binary manager for key: " + key);
278        }
279    }
280
281    @Override
282    public boolean isBinariesGarbageCollectionInProgress() {
283        for (BinaryGarbageCollector gc : getGarbageCollectors()) {
284            if (gc.isInProgress()) {
285                return true;
286            }
287        }
288        return false;
289    }
290
291}