001/*
002 * (C) Copyright 2015-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.blob;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.Calendar;
024import java.util.Deque;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Set;
028import java.util.function.Consumer;
029import java.util.function.Supplier;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.api.Blob;
034import org.nuxeo.ecm.core.api.DocumentModel;
035import org.nuxeo.ecm.core.api.DocumentSecurityException;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.api.model.PropertyNotFoundException;
038import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch;
039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
040import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
041import org.nuxeo.ecm.core.model.Document;
042import org.nuxeo.ecm.core.model.Document.BlobAccessor;
043import org.nuxeo.ecm.core.model.Repository;
044import org.nuxeo.ecm.core.repository.RepositoryService;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.model.ComponentContext;
047import org.nuxeo.runtime.model.ComponentInstance;
048import org.nuxeo.runtime.model.DefaultComponent;
049import org.nuxeo.runtime.transaction.TransactionHelper;
050
051/**
052 * Implementation of the service managing {@link Blob}s associated to a {@link Document} or a repository.
053 *
054 * @since 9.2
055 */
056public class DocumentBlobManagerComponent extends DefaultComponent implements DocumentBlobManager {
057
058    private static final Log log = LogFactory.getLog(DocumentBlobManagerComponent.class);
059
060    protected static final String XP = "configuration";
061
062    protected static final BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher();
063
064    protected static final int BINARY_GC_TX_TIMEOUT_SEC = 86_400; // 1 day
065
066    // in these low-level APIs we deal with unprefixed xpaths, so not file:content
067    protected static final String MAIN_BLOB_XPATH = "content";
068
069    protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>();
070
071    @Override
072    public void deactivate(ComponentContext context) {
073        blobDispatcherDescriptorsRegistry.clear();
074    }
075
076    @Override
077    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
078        if (XP.equals(extensionPoint)) {
079            if (contribution instanceof BlobDispatcherDescriptor) {
080                registerBlobDispatcher((BlobDispatcherDescriptor) contribution);
081            } else {
082                throw new NuxeoException("Invalid descriptor: " + contribution.getClass());
083            }
084        } else {
085            throw new NuxeoException("Invalid extension point: " + extensionPoint);
086        }
087    }
088
089    @Override
090    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
091        if (XP.equals(extensionPoint)) {
092            if (contribution instanceof BlobDispatcherDescriptor) {
093                unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution);
094            }
095        }
096    }
097
098    protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) {
099        blobDispatcherDescriptorsRegistry.add(descr);
100    }
101
102    protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) {
103        blobDispatcherDescriptorsRegistry.remove(descr);
104    }
105
106    protected BlobDispatcher getBlobDispatcher() {
107        BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast();
108        if (descr == null) {
109            return DEFAULT_BLOB_DISPATCHER;
110        }
111        return descr.getBlobDispatcher();
112    }
113
114    protected BlobProvider getBlobProvider(String providerId) {
115        return Framework.getService(BlobManager.class).getBlobProvider(providerId);
116    }
117
118    protected DocumentBlobProvider getDocumentBlobProvider(Blob blob) {
119        BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob);
120        if (blobProvider instanceof DocumentBlobProvider) {
121            return (DocumentBlobProvider) blobProvider;
122        }
123        return null;
124    }
125
126    /**
127     * {@inheritDoc}
128     * <p>
129     * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob
130     * provider id.
131     */
132    @Override
133    public Blob readBlob(BlobInfo blobInfo, Document doc, String xpath) throws IOException {
134        return readBlob(blobInfo, doc, xpath, doc.getRepositoryName());
135    }
136
137    // helper used while deprecated signature below is kept
138    protected Blob readBlob(BlobInfo blobInfo, Document doc, String xpath, String repositoryName) throws IOException {
139        String key = blobInfo.key;
140        if (key == null) {
141            return null;
142        }
143        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
144        if (blobProvider == null) {
145            throw new NuxeoException("No registered blob provider for key: " + key);
146        }
147        return blobProvider.readBlob(new BlobInfoContext(blobInfo, doc, xpath));
148    }
149
150    /**
151     * {@inheritDoc}
152     * <p>
153     * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob
154     * provider id.
155     *
156     * @deprecated since 11.1, use {@link #readBlob(BlobInfo, Document, String)} instead
157     */
158    @Deprecated
159    @Override
160    public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException {
161        return readBlob(blobInfo, null, null, repositoryName);
162    }
163
164    protected BlobProvider getBlobProvider(String key, String repositoryName) {
165        int colon = key.indexOf(':');
166        String providerId;
167        if (colon < 0) {
168            // no prefix, use the blob dispatcher to find the blob provider id
169            providerId = getBlobDispatcher().getBlobProvider(repositoryName);
170        } else {
171            // use the prefix as blob provider id
172            providerId = key.substring(0, colon);
173        }
174        return getBlobProvider(providerId);
175    }
176
177    /**
178     * {@inheritDoc}
179     * <p>
180     * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need
181     * to recompute a key. Otherwise, go through the blob provider.
182     */
183    @Override
184    public String writeBlob(Blob blob, Document doc, String xpath) throws IOException {
185        if (blob == null) {
186            if (MAIN_BLOB_XPATH.equals(xpath) && doc.isUnderRetentionOrLegalHold()) {
187                throw new DocumentSecurityException(
188                        "Cannot delete blob from document " + doc.getUUID() + ", it is under retention / hold");
189            }
190            return null;
191        }
192
193        BlobDispatcher blobDispatcher = getBlobDispatcher();
194        BlobDispatch dispatch = null;
195        if (blob instanceof ManagedBlob) {
196            ManagedBlob managedBlob = (ManagedBlob) blob;
197            String currentProviderId = managedBlob.getProviderId();
198            // is the blob non-transient, so that reusing the key is an option?
199            if (!getBlobProvider(currentProviderId).isTransient()) {
200                // is it something we don't have to dispatch?
201                if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) {
202                    // not something we have to dispatch, reuse the key
203                    return managedBlob.getKey();
204                }
205                dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath);
206                if (dispatch.providerId.equals(currentProviderId)) {
207                    // same provider, just reuse the key
208                    return managedBlob.getKey();
209                }
210            }
211        }
212        if (dispatch == null) {
213            dispatch = blobDispatcher.getBlobProvider(doc, blob, xpath);
214        }
215        BlobProvider blobProvider = getBlobProvider(dispatch.providerId);
216        if (blobProvider == null) {
217            throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId);
218        }
219        if (MAIN_BLOB_XPATH.equals(xpath) && blobProvider.isRecordMode() && doc.isUnderRetentionOrLegalHold()) {
220            throw new DocumentSecurityException(
221                    "Cannot change blob from document " + doc.getUUID() + ", it is under retention / hold");
222        }
223        String key = blobProvider.writeBlob(new BlobContext(blob, doc, xpath));
224        if (dispatch.addPrefix) {
225            key = dispatch.providerId + ':' + key;
226        }
227        return key;
228    }
229
230    @Override
231    public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException {
232        DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob);
233        if (blobProvider == null) {
234            return null;
235        }
236        return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc);
237    }
238
239    protected void freezeVersion(BlobAccessor accessor, Document doc) {
240        Blob blob = accessor.getBlob();
241        DocumentBlobProvider blobProvider = getDocumentBlobProvider(blob);
242        if (blobProvider == null) {
243            return;
244        }
245        try {
246            Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc);
247            if (newBlob != null) {
248                accessor.setBlob(newBlob);
249            }
250        } catch (IOException e) {
251            throw new NuxeoException(e);
252        }
253    }
254
255    @Override
256    public void freezeVersion(Document doc) {
257        // finds all blobs, then ask their providers if there's anything to do on check in
258        doc.visitBlobs(accessor -> freezeVersion(accessor, doc));
259    }
260
261    @Override
262    public void notifyChanges(Document doc, Set<String> xpaths) {
263        getBlobDispatcher().notifyChanges(doc, xpaths);
264    }
265
266    @Override
267    public void notifyMakeRecord(Document doc) {
268        getBlobDispatcher().notifyMakeRecord(doc);
269    }
270
271    @Override
272    public void notifyAfterCopy(Document doc) {
273        getBlobDispatcher().notifyAfterCopy(doc);
274    }
275
276    @Override
277    public void notifyBeforeRemove(Document doc) {
278        getBlobDispatcher().notifyBeforeRemove(doc);
279    }
280
281    @Override
282    public void notifySetRetainUntil(Document doc, Calendar retainUntil) {
283        updateBlob(doc, context -> context.withUpdateRetainUntil(retainUntil));
284    }
285
286    @Override
287    public void notifySetLegalHold(Document doc, boolean hold) {
288        updateBlob(doc, context -> context.withUpdateLegalHold(hold));
289    }
290
291    public void updateBlob(Document doc, Consumer<BlobUpdateContext> contextFiller) {
292        ManagedBlob blob = getMainBlob(doc);
293        if (blob == null) {
294            return;
295        }
296        BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blob);
297        if (blobProvider == null) {
298            log.error("No blob provider found for blob: " + blob.getKey());
299            return;
300        }
301        try {
302            String key = stripBlobKeyPrefix(blob.getKey());
303            BlobUpdateContext blobUpdateContext = new BlobUpdateContext(key);
304            contextFiller.accept(blobUpdateContext);
305            blobProvider.updateBlob(blobUpdateContext);
306        } catch (IOException e) {
307            throw new NuxeoException(e);
308        }
309    }
310
311    protected ManagedBlob getMainBlob(Document doc) {
312        Blob blob;
313        try {
314            blob = (Blob) doc.getValue(MAIN_BLOB_XPATH);
315        } catch (PropertyNotFoundException | ClassCastException e) {
316            // not a standard file schema
317            return null;
318        }
319        if (blob == null) {
320            // no blob in this document
321            return null;
322        }
323        if (blob instanceof ManagedBlob) {
324            return (ManagedBlob) blob;
325        }
326        log.error("Blob is not managed: " + blob);
327        return null;
328    }
329
330    // TODO restore to version also changes the blob
331
332    protected String stripBlobKeyPrefix(String key) {
333        int colon = key.indexOf(':');
334        if (colon >= 0) {
335            key = key.substring(colon + 1);
336        }
337        return key;
338    }
339
340    // find which GCs to use
341    // only GC the binary managers to which we dispatch blobs
342    protected List<BinaryGarbageCollector> getGarbageCollectors() {
343        List<BinaryGarbageCollector> gcs = new LinkedList<>();
344        for (String providerId : getBlobDispatcher().getBlobProviderIds()) {
345            BlobProvider blobProvider = getBlobProvider(providerId);
346            BinaryGarbageCollector gc = blobProvider.getBinaryGarbageCollector();
347            if (gc != null) {
348                gcs.add(gc);
349            }
350        }
351        return gcs;
352    }
353
354    @Override
355    public BinaryManagerStatus garbageCollectBinaries(boolean delete) {
356        // do the GC in a long-running transaction to avoid timeouts
357        return runInTransaction(() -> {
358            List<BinaryGarbageCollector> gcs = getGarbageCollectors();
359            // start gc
360            long start = System.currentTimeMillis();
361            for (BinaryGarbageCollector gc : gcs) {
362                gc.start();
363            }
364            // in all repositories, mark referenced binaries
365            // the marking itself will call back into the appropriate gc's mark method
366            RepositoryService repositoryService = Framework.getService(RepositoryService.class);
367            for (String repositoryName : repositoryService.getRepositoryNames()) {
368                Repository repository = repositoryService.getRepository(repositoryName);
369                repository.markReferencedBinaries();
370            }
371            // stop gc
372            BinaryManagerStatus globalStatus = new BinaryManagerStatus();
373            for (BinaryGarbageCollector gc : gcs) {
374                gc.stop(delete);
375                BinaryManagerStatus status = gc.getStatus();
376                globalStatus.numBinaries += status.numBinaries;
377                globalStatus.sizeBinaries += status.sizeBinaries;
378                globalStatus.numBinariesGC += status.numBinariesGC;
379                globalStatus.sizeBinariesGC += status.sizeBinariesGC;
380            }
381            globalStatus.gcDuration = System.currentTimeMillis() - start;
382            return globalStatus;
383        }, BINARY_GC_TX_TIMEOUT_SEC);
384    }
385
386    /**
387     * Runs the given {@link Supplier} in a transaction with the given {@code timeout}.
388     *
389     * @since 11.1
390     */
391    protected <R> R runInTransaction(Supplier<R> supplier, int timeout) {
392        if (TransactionHelper.isTransactionMarkedRollback()) {
393            throw new NuxeoException("Cannot run supplier when current transaction is marked rollback.");
394        }
395        boolean txActive = TransactionHelper.isTransactionActive();
396        boolean txStarted = false;
397        try {
398            if (txActive) {
399                TransactionHelper.commitOrRollbackTransaction();
400            }
401            txStarted = TransactionHelper.startTransaction(timeout);
402            return supplier.get();
403        } finally {
404            if (txStarted) {
405                TransactionHelper.commitOrRollbackTransaction();
406            }
407            if (txActive) {
408                // go back to default transaction timeout
409                TransactionHelper.startTransaction();
410            }
411        }
412    }
413
414    @Override
415    public void markReferencedBinary(String key, String repositoryName) {
416        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
417        BinaryGarbageCollector gc = blobProvider.getBinaryGarbageCollector();
418        if (gc != null) {
419            key = stripBlobKeyPrefix(key);
420            gc.mark(key);
421        } else {
422            log.error("Unknown binary manager for key: " + key);
423        }
424    }
425
426    @Override
427    public boolean isBinariesGarbageCollectionInProgress() {
428        for (BinaryGarbageCollector gc : getGarbageCollectors()) {
429            if (gc.isInProgress()) {
430                return true;
431            }
432        }
433        return false;
434    }
435
436}