001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.blob;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.net.URI;
022import java.util.Collections;
023import java.util.Deque;
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import javax.servlet.http.HttpServletRequest;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.Blob;
035import org.nuxeo.ecm.core.api.DocumentModel;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch;
038import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
039import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
040import org.nuxeo.ecm.core.blob.binary.BinaryManager;
041import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
042import org.nuxeo.ecm.core.model.Document;
043import org.nuxeo.ecm.core.model.Document.BlobAccessor;
044import org.nuxeo.ecm.core.model.Repository;
045import org.nuxeo.ecm.core.repository.RepositoryService;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.model.ComponentContext;
048import org.nuxeo.runtime.model.ComponentInstance;
049import org.nuxeo.runtime.model.DefaultComponent;
050import org.nuxeo.runtime.model.SimpleContributionRegistry;
051
052/**
053 * Implementation of the service managing the storage and retrieval of {@link Blob}s, through internally-registered
054 * {@link BlobProvider}s.
055 *
056 * @since 7.2
057 */
058public class BlobManagerComponent extends DefaultComponent implements BlobManager {
059
060    private static final Log log = LogFactory.getLog(BlobManagerComponent.class);
061
062    protected static final String XP = "configuration";
063
064    protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher();
065
066    protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>();
067
068    protected BlobProviderDescriptorRegistry blobProviderDescriptorsRegistry = new BlobProviderDescriptorRegistry();
069
070    protected Map<String, BlobProvider> blobProviders = new HashMap<>();
071
072    protected static class BlobProviderDescriptorRegistry extends SimpleContributionRegistry<BlobProviderDescriptor> {
073
074        @Override
075        public String getContributionId(BlobProviderDescriptor contrib) {
076            return contrib.name;
077        }
078
079        @Override
080        public BlobProviderDescriptor clone(BlobProviderDescriptor orig) {
081            return new BlobProviderDescriptor(orig);
082        }
083
084        @Override
085        public void merge(BlobProviderDescriptor src, BlobProviderDescriptor dst) {
086            dst.merge(src);
087        }
088
089        @Override
090        public boolean isSupportingMerge() {
091            return true;
092        }
093
094        public void clear() {
095            currentContribs.clear();
096        }
097
098        public BlobProviderDescriptor getBlobProviderDescriptor(String id) {
099            return getCurrentContribution(id);
100        }
101    }
102
103    @Override
104    public void deactivate(ComponentContext context) {
105        blobDispatcherDescriptorsRegistry.clear();
106        blobProviderDescriptorsRegistry.clear();
107        // close each blob provider
108        for (BlobProvider blobProvider : blobProviders.values()) {
109            blobProvider.close();
110        }
111        blobProviders.clear();
112    }
113
114    @Override
115    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
116        if (XP.equals(extensionPoint)) {
117            if (contribution instanceof BlobDispatcherDescriptor) {
118                registerBlobDispatcher((BlobDispatcherDescriptor) contribution);
119            } else if (contribution instanceof BlobProviderDescriptor) {
120                registerBlobProvider((BlobProviderDescriptor) contribution);
121            } else {
122                throw new NuxeoException("Invalid descriptor: " + contribution.getClass());
123            }
124        } else {
125            throw new NuxeoException("Invalid extension point: " + extensionPoint);
126        }
127    }
128
129    @Override
130    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
131        if (XP.equals(extensionPoint)) {
132            if (contribution instanceof BlobDispatcherDescriptor) {
133                unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution);
134            } else if (contribution instanceof BlobProviderDescriptor) {
135                unregisterBlobProvider((BlobProviderDescriptor) contribution);
136            }
137        }
138    }
139
140    protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) {
141        blobDispatcherDescriptorsRegistry.add(descr);
142    }
143
144    protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) {
145        blobDispatcherDescriptorsRegistry.remove(descr);
146    }
147
148    protected BlobDispatcher getBlobDispatcher() {
149        BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast();
150        if (descr == null) {
151            return DEFAULT_BLOB_DISPATCHER;
152        }
153        return descr.getBlobDispatcher();
154    }
155
156    // public for tests
157    public void registerBlobProvider(BlobProviderDescriptor descr) {
158        closeOldBlobProvider(descr.name);
159        blobProviderDescriptorsRegistry.addContribution(descr);
160        // lookup now to have immediate feedback on eror
161        getBlobProvider(descr.name);
162    }
163
164    // public for tests
165    public void unregisterBlobProvider(BlobProviderDescriptor descr) {
166        closeOldBlobProvider(descr.name);
167        blobProviderDescriptorsRegistry.removeContribution(descr);
168    }
169
170    /**
171     * We're about to change something about a contributed blob provider. Close the old one.
172     */
173    protected synchronized void closeOldBlobProvider(String id) {
174        BlobProvider blobProvider = blobProviders.remove(id);
175        if (blobProvider != null) {
176            blobProvider.close();
177        }
178    }
179
180    @Override
181    public synchronized BlobProvider getBlobProvider(String providerId) {
182        BlobProvider blobProvider = blobProviders.get(providerId);
183        if (blobProvider == null) {
184            BlobProviderDescriptor descr = blobProviderDescriptorsRegistry.getBlobProviderDescriptor(providerId);
185            if (descr == null) {
186                return null;
187            }
188            Class<?> klass = descr.klass;
189            Map<String, String> properties = descr.properties;
190            try {
191                if (BlobProvider.class.isAssignableFrom(klass)) {
192                    @SuppressWarnings("unchecked")
193                    Class<? extends BlobProvider> blobProviderClass = (Class<? extends BlobProvider>) klass;
194                    blobProvider = blobProviderClass.newInstance();
195                } else if (BinaryManager.class.isAssignableFrom(klass)) {
196                    @SuppressWarnings("unchecked")
197                    Class<? extends BinaryManager> binaryManagerClass = (Class<? extends BinaryManager>) klass;
198                    BinaryManager binaryManager = binaryManagerClass.newInstance();
199                    blobProvider = new BinaryBlobProvider(binaryManager);
200                } else {
201                    throw new RuntimeException("Unknown class for blob provider: " + klass);
202                }
203            } catch (ReflectiveOperationException e) {
204                throw new RuntimeException(e);
205            }
206            try {
207                blobProvider.initialize(providerId, properties);
208            } catch (IOException e) {
209                throw new RuntimeException(e);
210            }
211            blobProviders.put(providerId, blobProvider);
212        }
213        return blobProvider;
214    }
215
216    /**
217     * {@inheritDoc}
218     * <p>
219     * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob
220     * provider id.
221     */
222    @Override
223    public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException {
224        String key = blobInfo.key;
225        if (key == null) {
226            return null;
227        }
228        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
229        if (blobProvider == null) {
230            throw new NuxeoException("No registered blob provider for key: " + key);
231        }
232        return blobProvider.readBlob(blobInfo);
233    }
234
235    protected BlobProvider getBlobProvider(String key, String repositoryName) {
236        int colon = key.indexOf(':');
237        String providerId;
238        if (colon < 0) {
239            // no prefix, use the blob dispatcher to find the blob provider id
240            providerId = getBlobDispatcher().getBlobProvider(repositoryName);
241        } else {
242            // use the prefix as blob provider id
243            providerId = key.substring(0, colon);
244        }
245        return getBlobProvider(providerId);
246    }
247
248    /**
249     * {@inheritDoc}
250     * <p>
251     * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need
252     * to recompute a key. Otherwise, go through the blob provider.
253     */
254    @Override
255    public String writeBlob(Blob blob, Document doc) throws IOException {
256        BlobDispatcher blobDispatcher = getBlobDispatcher();
257        BlobDispatch dispatch = null;
258        if (blob instanceof ManagedBlob) {
259            ManagedBlob managedBlob = (ManagedBlob) blob;
260            String currentProviderId = managedBlob.getProviderId();
261            // is it something we don't have to dispatch?
262            if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) {
263                // not something we have to dispatch, reuse the key
264                return managedBlob.getKey();
265            }
266            dispatch = blobDispatcher.getBlobProvider(doc, blob);
267            if (dispatch.providerId.equals(currentProviderId)) {
268                // same provider, just reuse the key
269                return managedBlob.getKey();
270            }
271        }
272        if (dispatch == null) {
273            dispatch = blobDispatcher.getBlobProvider(doc, blob);
274        }
275        BlobProvider blobProvider = getBlobProvider(dispatch.providerId);
276        if (blobProvider == null) {
277            throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId);
278        }
279        String key = blobProvider.writeBlob(blob, doc);
280        if (dispatch.addPrefix) {
281            key = dispatch.providerId + ':' + key;
282        }
283        return key;
284    }
285
286    @Override
287    public BlobProvider getBlobProvider(Blob blob) {
288        if (!(blob instanceof ManagedBlob)) {
289            return null;
290        }
291        ManagedBlob managedBlob = (ManagedBlob) blob;
292        return getBlobProvider(managedBlob.getProviderId());
293    }
294
295    @Override
296    public InputStream getStream(Blob blob) throws IOException {
297        BlobProvider blobProvider = getBlobProvider(blob);
298        if (blobProvider == null) {
299            return null;
300        }
301        return blobProvider.getStream((ManagedBlob) blob);
302    }
303
304    @Override
305    public InputStream getThumbnail(Blob blob) throws IOException {
306        BlobProvider blobProvider = getBlobProvider(blob);
307        if (blobProvider == null) {
308            return null;
309        }
310        return blobProvider.getThumbnail((ManagedBlob) blob);
311    }
312
313    @Override
314    public URI getURI(Blob blob, UsageHint hint, HttpServletRequest servletRequest) throws IOException {
315        BlobProvider blobProvider = getBlobProvider(blob);
316        if (blobProvider == null) {
317            return null;
318        }
319        return blobProvider.getURI((ManagedBlob) blob, hint, servletRequest);
320    }
321
322    @Override
323    public Map<String, URI> getAvailableConversions(Blob blob, UsageHint hint) throws IOException {
324        BlobProvider blobProvider = getBlobProvider(blob);
325        if (blobProvider == null) {
326            return Collections.emptyMap();
327        }
328        return blobProvider.getAvailableConversions((ManagedBlob) blob, hint);
329    }
330
331    @Override
332    public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException {
333        BlobProvider blobProvider = getBlobProvider(blob);
334        if (blobProvider == null) {
335            return null;
336        }
337        return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc);
338    }
339
340    protected void freezeVersion(BlobAccessor accessor, Document doc) {
341        Blob blob = accessor.getBlob();
342        BlobProvider blobProvider = getBlobProvider(blob);
343        if (blobProvider == null) {
344            return;
345        }
346        try {
347            Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc);
348            if (newBlob != null) {
349                accessor.setBlob(newBlob);
350            }
351        } catch (IOException e) {
352            throw new RuntimeException(e);
353        }
354    }
355
356    @Override
357    public Map<String, BlobProvider> getBlobProviders() {
358        return blobProviders;
359    }
360
361    @Override
362    public void freezeVersion(Document doc) {
363        // finds all blobs, then ask their providers if there's anything to do on check in
364        doc.visitBlobs(accessor -> freezeVersion(accessor, doc));
365    }
366
367    @Override
368    public void notifyChanges(Document doc, Set<String> xpaths) {
369        getBlobDispatcher().notifyChanges(doc, xpaths);
370    }
371
372    // find which GCs to use
373    // only GC the binary managers to which we dispatch blobs
374    protected List<BinaryGarbageCollector> getGarbageCollectors() {
375        List<BinaryGarbageCollector> gcs = new LinkedList<>();
376        for (String providerId : getBlobDispatcher().getBlobProviderIds()) {
377            BlobProvider blobProvider = getBlobProvider(providerId);
378            BinaryManager binaryManager = blobProvider.getBinaryManager();
379            if (binaryManager != null) {
380                gcs.add(binaryManager.getGarbageCollector());
381            }
382        }
383        return gcs;
384    }
385
386    @Override
387    public BinaryManagerStatus garbageCollectBinaries(boolean delete) {
388        List<BinaryGarbageCollector> gcs = getGarbageCollectors();
389        // start gc
390        long start = System.currentTimeMillis();
391        for (BinaryGarbageCollector gc : gcs) {
392            gc.start();
393        }
394        // in all repositories, mark referenced binaries
395        // the marking itself will call back into the appropriate gc's mark method
396        RepositoryService repositoryService = Framework.getService(RepositoryService.class);
397        for (String repositoryName : repositoryService.getRepositoryNames()) {
398            Repository repository = repositoryService.getRepository(repositoryName);
399            repository.markReferencedBinaries();
400        }
401        // stop gc
402        BinaryManagerStatus globalStatus = new BinaryManagerStatus();
403        for (BinaryGarbageCollector gc : gcs) {
404            gc.stop(delete);
405            BinaryManagerStatus status = gc.getStatus();
406            globalStatus.numBinaries += status.numBinaries;
407            globalStatus.sizeBinaries += status.sizeBinaries;
408            globalStatus.numBinariesGC += status.numBinariesGC;
409            globalStatus.sizeBinariesGC += status.sizeBinariesGC;
410        }
411        globalStatus.gcDuration = System.currentTimeMillis() - start;
412        return globalStatus;
413    }
414
415    @Override
416    public void markReferencedBinary(String key, String repositoryName) {
417        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
418        BinaryManager binaryManager = blobProvider.getBinaryManager();
419        if (binaryManager != null) {
420            int colon = key.indexOf(':');
421            if (colon > 0) {
422                // if the key is in the "providerId:digest" format, keep only the real digest
423                key = key.substring(colon + 1);
424            }
425            binaryManager.getGarbageCollector().mark(key);
426        } else {
427            log.error("Unknown binary manager for key: " + key);
428        }
429    }
430
431    @Override
432    public boolean isBinariesGarbageCollectionInProgress() {
433        for (BinaryGarbageCollector gc : getGarbageCollectors()) {
434            if (gc.isInProgress()) {
435                return true;
436            }
437        }
438        return false;
439    }
440
441}