001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.blob;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.URI;
024import java.util.Collections;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import javax.servlet.http.HttpServletRequest;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.api.Blob;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.ecm.core.blob.BlobDispatcher.BlobDispatch;
040import org.nuxeo.ecm.core.blob.binary.BinaryBlobProvider;
041import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
042import org.nuxeo.ecm.core.blob.binary.BinaryManager;
043import org.nuxeo.ecm.core.blob.binary.BinaryManagerStatus;
044import org.nuxeo.ecm.core.model.Document;
045import org.nuxeo.ecm.core.model.Document.BlobAccessor;
046import org.nuxeo.ecm.core.model.Repository;
047import org.nuxeo.ecm.core.repository.RepositoryService;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.model.ComponentContext;
050import org.nuxeo.runtime.model.ComponentInstance;
051import org.nuxeo.runtime.model.DefaultComponent;
052import org.nuxeo.runtime.model.SimpleContributionRegistry;
053
054/**
055 * Implementation of the service managing the storage and retrieval of {@link Blob}s, through internally-registered
056 * {@link BlobProvider}s.
057 *
058 * @since 7.2
059 */
060public class BlobManagerComponent extends DefaultComponent implements BlobManager {
061
062    private static final Log log = LogFactory.getLog(BlobManagerComponent.class);
063
064    protected static final String XP = "configuration";
065
066    protected static BlobDispatcher DEFAULT_BLOB_DISPATCHER = new DefaultBlobDispatcher();
067
068    protected Deque<BlobDispatcherDescriptor> blobDispatcherDescriptorsRegistry = new LinkedList<>();
069
070    protected BlobProviderDescriptorRegistry blobProviderDescriptorsRegistry = new BlobProviderDescriptorRegistry();
071
072    protected Map<String, BlobProvider> blobProviders = new HashMap<>();
073
074    protected static class BlobProviderDescriptorRegistry extends SimpleContributionRegistry<BlobProviderDescriptor> {
075
076        @Override
077        public String getContributionId(BlobProviderDescriptor contrib) {
078            return contrib.name;
079        }
080
081        @Override
082        public BlobProviderDescriptor clone(BlobProviderDescriptor orig) {
083            return new BlobProviderDescriptor(orig);
084        }
085
086        @Override
087        public void merge(BlobProviderDescriptor src, BlobProviderDescriptor dst) {
088            dst.merge(src);
089        }
090
091        @Override
092        public boolean isSupportingMerge() {
093            return true;
094        }
095
096        public void clear() {
097            currentContribs.clear();
098        }
099
100        public BlobProviderDescriptor getBlobProviderDescriptor(String id) {
101            return getCurrentContribution(id);
102        }
103    }
104
105    @Override
106    public void deactivate(ComponentContext context) {
107        blobDispatcherDescriptorsRegistry.clear();
108        blobProviderDescriptorsRegistry.clear();
109        // close each blob provider
110        for (BlobProvider blobProvider : blobProviders.values()) {
111            blobProvider.close();
112        }
113        blobProviders.clear();
114    }
115
116    @Override
117    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
118        if (XP.equals(extensionPoint)) {
119            if (contribution instanceof BlobDispatcherDescriptor) {
120                registerBlobDispatcher((BlobDispatcherDescriptor) contribution);
121            } else if (contribution instanceof BlobProviderDescriptor) {
122                registerBlobProvider((BlobProviderDescriptor) contribution);
123            } else {
124                throw new NuxeoException("Invalid descriptor: " + contribution.getClass());
125            }
126        } else {
127            throw new NuxeoException("Invalid extension point: " + extensionPoint);
128        }
129    }
130
131    @Override
132    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
133        if (XP.equals(extensionPoint)) {
134            if (contribution instanceof BlobDispatcherDescriptor) {
135                unregisterBlobDispatcher((BlobDispatcherDescriptor) contribution);
136            } else if (contribution instanceof BlobProviderDescriptor) {
137                unregisterBlobProvider((BlobProviderDescriptor) contribution);
138            }
139        }
140    }
141
142    protected void registerBlobDispatcher(BlobDispatcherDescriptor descr) {
143        blobDispatcherDescriptorsRegistry.add(descr);
144    }
145
146    protected void unregisterBlobDispatcher(BlobDispatcherDescriptor descr) {
147        blobDispatcherDescriptorsRegistry.remove(descr);
148    }
149
150    protected BlobDispatcher getBlobDispatcher() {
151        BlobDispatcherDescriptor descr = blobDispatcherDescriptorsRegistry.peekLast();
152        if (descr == null) {
153            return DEFAULT_BLOB_DISPATCHER;
154        }
155        return descr.getBlobDispatcher();
156    }
157
158    // public for tests
159    public void registerBlobProvider(BlobProviderDescriptor descr) {
160        closeOldBlobProvider(descr.name);
161        blobProviderDescriptorsRegistry.addContribution(descr);
162        // lookup now to have immediate feedback on eror
163        getBlobProvider(descr.name);
164    }
165
166    // public for tests
167    public void unregisterBlobProvider(BlobProviderDescriptor descr) {
168        closeOldBlobProvider(descr.name);
169        blobProviderDescriptorsRegistry.removeContribution(descr);
170    }
171
172    /**
173     * We're about to change something about a contributed blob provider. Close the old one.
174     */
175    protected synchronized void closeOldBlobProvider(String id) {
176        BlobProvider blobProvider = blobProviders.remove(id);
177        if (blobProvider != null) {
178            blobProvider.close();
179        }
180    }
181
182    @Override
183    public synchronized BlobProvider getBlobProvider(String providerId) {
184        BlobProvider blobProvider = blobProviders.get(providerId);
185        if (blobProvider == null) {
186            BlobProviderDescriptor descr = blobProviderDescriptorsRegistry.getBlobProviderDescriptor(providerId);
187            if (descr == null) {
188                return null;
189            }
190            Class<?> klass = descr.klass;
191            Map<String, String> properties = descr.properties;
192            try {
193                if (BlobProvider.class.isAssignableFrom(klass)) {
194                    @SuppressWarnings("unchecked")
195                    Class<? extends BlobProvider> blobProviderClass = (Class<? extends BlobProvider>) klass;
196                    blobProvider = blobProviderClass.newInstance();
197                } else if (BinaryManager.class.isAssignableFrom(klass)) {
198                    @SuppressWarnings("unchecked")
199                    Class<? extends BinaryManager> binaryManagerClass = (Class<? extends BinaryManager>) klass;
200                    BinaryManager binaryManager = binaryManagerClass.newInstance();
201                    blobProvider = new BinaryBlobProvider(binaryManager);
202                } else {
203                    throw new RuntimeException("Unknown class for blob provider: " + klass);
204                }
205            } catch (ReflectiveOperationException e) {
206                throw new RuntimeException(e);
207            }
208            try {
209                blobProvider.initialize(providerId, properties);
210            } catch (IOException e) {
211                throw new RuntimeException(e);
212            }
213            blobProviders.put(providerId, blobProvider);
214        }
215        return blobProvider;
216    }
217
218    /**
219     * {@inheritDoc}
220     * <p>
221     * The {@link BlobInfo} (coming from the database) contains the blob key, which may or may not be prefixed by a blob
222     * provider id.
223     */
224    @Override
225    public Blob readBlob(BlobInfo blobInfo, String repositoryName) throws IOException {
226        String key = blobInfo.key;
227        if (key == null) {
228            return null;
229        }
230        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
231        if (blobProvider == null) {
232            throw new NuxeoException("No registered blob provider for key: " + key);
233        }
234        return blobProvider.readBlob(blobInfo);
235    }
236
237    protected BlobProvider getBlobProvider(String key, String repositoryName) {
238        int colon = key.indexOf(':');
239        String providerId;
240        if (colon < 0) {
241            // no prefix, use the blob dispatcher to find the blob provider id
242            providerId = getBlobDispatcher().getBlobProvider(repositoryName);
243        } else {
244            // use the prefix as blob provider id
245            providerId = key.substring(0, colon);
246        }
247        return getBlobProvider(providerId);
248    }
249
250    /**
251     * {@inheritDoc}
252     * <p>
253     * If the blob is managed and already uses the provider that's expected for this blob and document, there is no need
254     * to recompute a key. Otherwise, go through the blob provider.
255     */
256    @Override
257    public String writeBlob(Blob blob, Document doc) throws IOException {
258        BlobDispatcher blobDispatcher = getBlobDispatcher();
259        BlobDispatch dispatch = null;
260        if (blob instanceof ManagedBlob) {
261            ManagedBlob managedBlob = (ManagedBlob) blob;
262            String currentProviderId = managedBlob.getProviderId();
263            // is it something we don't have to dispatch?
264            if (!blobDispatcher.getBlobProviderIds().contains(currentProviderId)) {
265                // not something we have to dispatch, reuse the key
266                return managedBlob.getKey();
267            }
268            dispatch = blobDispatcher.getBlobProvider(doc, blob);
269            if (dispatch.providerId.equals(currentProviderId)) {
270                // same provider, just reuse the key
271                return managedBlob.getKey();
272            }
273        }
274        if (dispatch == null) {
275            dispatch = blobDispatcher.getBlobProvider(doc, blob);
276        }
277        BlobProvider blobProvider = getBlobProvider(dispatch.providerId);
278        if (blobProvider == null) {
279            throw new NuxeoException("No registered blob provider with id: " + dispatch.providerId);
280        }
281        String key = blobProvider.writeBlob(blob, doc);
282        if (dispatch.addPrefix) {
283            key = dispatch.providerId + ':' + key;
284        }
285        return key;
286    }
287
288    @Override
289    public BlobProvider getBlobProvider(Blob blob) {
290        if (!(blob instanceof ManagedBlob)) {
291            return null;
292        }
293        ManagedBlob managedBlob = (ManagedBlob) blob;
294        return getBlobProvider(managedBlob.getProviderId());
295    }
296
297    @Override
298    public InputStream getStream(Blob blob) throws IOException {
299        BlobProvider blobProvider = getBlobProvider(blob);
300        if (blobProvider == null) {
301            return null;
302        }
303        return blobProvider.getStream((ManagedBlob) blob);
304    }
305
306    @Override
307    public InputStream getThumbnail(Blob blob) throws IOException {
308        BlobProvider blobProvider = getBlobProvider(blob);
309        if (blobProvider == null) {
310            return null;
311        }
312        return blobProvider.getThumbnail((ManagedBlob) blob);
313    }
314
315    @Override
316    public URI getURI(Blob blob, UsageHint hint, HttpServletRequest servletRequest) throws IOException {
317        BlobProvider blobProvider = getBlobProvider(blob);
318        if (blobProvider == null) {
319            return null;
320        }
321        return blobProvider.getURI((ManagedBlob) blob, hint, servletRequest);
322    }
323
324    @Override
325    public Map<String, URI> getAvailableConversions(Blob blob, UsageHint hint) throws IOException {
326        BlobProvider blobProvider = getBlobProvider(blob);
327        if (blobProvider == null) {
328            return Collections.emptyMap();
329        }
330        return blobProvider.getAvailableConversions((ManagedBlob) blob, hint);
331    }
332
333    @Override
334    public InputStream getConvertedStream(Blob blob, String mimeType, DocumentModel doc) throws IOException {
335        BlobProvider blobProvider = getBlobProvider(blob);
336        if (blobProvider == null) {
337            return null;
338        }
339        return blobProvider.getConvertedStream((ManagedBlob) blob, mimeType, doc);
340    }
341
342    protected void freezeVersion(BlobAccessor accessor, Document doc) {
343        Blob blob = accessor.getBlob();
344        BlobProvider blobProvider = getBlobProvider(blob);
345        if (blobProvider == null) {
346            return;
347        }
348        try {
349            Blob newBlob = blobProvider.freezeVersion((ManagedBlob) blob, doc);
350            if (newBlob != null) {
351                accessor.setBlob(newBlob);
352            }
353        } catch (IOException e) {
354            throw new RuntimeException(e);
355        }
356    }
357
358    @Override
359    public Map<String, BlobProvider> getBlobProviders() {
360        return blobProviders;
361    }
362
363    @Override
364    public void freezeVersion(Document doc) {
365        // finds all blobs, then ask their providers if there's anything to do on check in
366        doc.visitBlobs(accessor -> freezeVersion(accessor, doc));
367    }
368
369    @Override
370    public void notifyChanges(Document doc, Set<String> xpaths) {
371        getBlobDispatcher().notifyChanges(doc, xpaths);
372    }
373
374    // find which GCs to use
375    // only GC the binary managers to which we dispatch blobs
376    protected List<BinaryGarbageCollector> getGarbageCollectors() {
377        List<BinaryGarbageCollector> gcs = new LinkedList<>();
378        for (String providerId : getBlobDispatcher().getBlobProviderIds()) {
379            BlobProvider blobProvider = getBlobProvider(providerId);
380            BinaryManager binaryManager = blobProvider.getBinaryManager();
381            if (binaryManager != null) {
382                gcs.add(binaryManager.getGarbageCollector());
383            }
384        }
385        return gcs;
386    }
387
388    @Override
389    public BinaryManagerStatus garbageCollectBinaries(boolean delete) {
390        List<BinaryGarbageCollector> gcs = getGarbageCollectors();
391        // start gc
392        long start = System.currentTimeMillis();
393        for (BinaryGarbageCollector gc : gcs) {
394            gc.start();
395        }
396        // in all repositories, mark referenced binaries
397        // the marking itself will call back into the appropriate gc's mark method
398        RepositoryService repositoryService = Framework.getService(RepositoryService.class);
399        for (String repositoryName : repositoryService.getRepositoryNames()) {
400            Repository repository = repositoryService.getRepository(repositoryName);
401            repository.markReferencedBinaries();
402        }
403        // stop gc
404        BinaryManagerStatus globalStatus = new BinaryManagerStatus();
405        for (BinaryGarbageCollector gc : gcs) {
406            gc.stop(delete);
407            BinaryManagerStatus status = gc.getStatus();
408            globalStatus.numBinaries += status.numBinaries;
409            globalStatus.sizeBinaries += status.sizeBinaries;
410            globalStatus.numBinariesGC += status.numBinariesGC;
411            globalStatus.sizeBinariesGC += status.sizeBinariesGC;
412        }
413        globalStatus.gcDuration = System.currentTimeMillis() - start;
414        return globalStatus;
415    }
416
417    @Override
418    public void markReferencedBinary(String key, String repositoryName) {
419        BlobProvider blobProvider = getBlobProvider(key, repositoryName);
420        BinaryManager binaryManager = blobProvider.getBinaryManager();
421        if (binaryManager != null) {
422            int colon = key.indexOf(':');
423            if (colon > 0) {
424                // if the key is in the "providerId:digest" format, keep only the real digest
425                key = key.substring(colon + 1);
426            }
427            binaryManager.getGarbageCollector().mark(key);
428        } else {
429            log.error("Unknown binary manager for key: " + key);
430        }
431    }
432
433    @Override
434    public boolean isBinariesGarbageCollectionInProgress() {
435        for (BinaryGarbageCollector gc : getGarbageCollectors()) {
436            if (gc.isInProgress()) {
437                return true;
438            }
439        }
440        return false;
441    }
442
443}