001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.io.UncheckedIOException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.stream.Collectors;
031
032import org.apache.commons.lang3.StringUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
036import org.nuxeo.ecm.core.api.Blob;
037import org.nuxeo.ecm.core.api.CoreInstance;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModel;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.api.repository.RepositoryManager;
042import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
043import org.nuxeo.ecm.core.blob.BlobInfo;
044import org.nuxeo.ecm.core.blob.BlobProvider;
045import org.nuxeo.ecm.core.blob.DocumentBlobProvider;
046import org.nuxeo.ecm.core.blob.ManagedBlob;
047import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
048import org.nuxeo.ecm.core.cache.Cache;
049import org.nuxeo.ecm.core.cache.CacheService;
050import org.nuxeo.ecm.core.work.api.WorkManager;
051import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
052import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
054import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
055import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
056import org.nuxeo.ecm.platform.query.api.PageProvider;
057import org.nuxeo.ecm.platform.query.api.PageProviderService;
058import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.transaction.TransactionHelper;
061
062import com.google.api.client.auth.oauth2.Credential;
063import com.google.common.base.Splitter;
064
065/**
066 * Basic implementation of {@link BlobProvider} for live connect.
067 *
068 * @param <O> The OAuth2 service provider type.
069 * @since 8.1
070 */
071public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
072        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider {
073
074    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
075
076    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
077
078    private static final char BLOB_KEY_SEPARATOR = ':';
079
080    static {
081        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder());
082    }
083
084    /** Resource cache */
085    private Cache cache;
086
087    /**
088     * Should be overriden by subclasses needing something different.
089     */
090    @Override
091    public void close() {
092    }
093
094    /**
095     * Should be overriden by subclasses needing something different.
096     */
097    @Override
098    public Blob readBlob(BlobInfo blobInfo) throws IOException {
099        return toBlob(toFileInfo(blobInfo.key));
100    }
101
102    /**
103     * Should be overriden by subclasses needing something different.
104     */
105    @Override
106    public String writeBlob(Blob blob) throws IOException {
107        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
108    }
109
110    @Override
111    public boolean performsExternalAccessControl(BlobInfo blobInfo) {
112        return true;
113    }
114
115    /**
116     * Should be overriden by subclasses needing something different.
117     */
118    @Override
119    public boolean isVersion(ManagedBlob blob) {
120        return toFileInfo(blob).getRevisionId().isPresent();
121    }
122
123    @Override
124    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
125        List<DocumentModel> changedDocuments = new ArrayList<>();
126        for (DocumentModel doc : docs) {
127            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
128            if (blob == null || isVersion(blob)) {
129                continue;
130            }
131            LiveConnectFileInfo fileInfo = toFileInfo(blob);
132            try {
133                LiveConnectFile file = retrieveFile(fileInfo);
134                putFileInCache(file);
135                if (hasChanged(blob, file)) {
136                    if (log.isTraceEnabled()) {
137                        log.trace("Updating blob=" + blob.key);
138                    }
139                    doc.setPropertyValue("content", toBlob(file));
140                    changedDocuments.add(doc);
141                }
142            } catch (IOException e) {
143                log.error("Could not update document=" + fileInfo, e);
144            }
145
146        }
147        return changedDocuments;
148    }
149
150    @Override
151    public void processDocumentsUpdate() {
152        final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class);
153        final WorkManager workManager = Framework.getLocalService(WorkManager.class);
154        for (String repositoryName : repositoryManager.getRepositoryNames()) {
155            try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
156
157                long offset = 0;
158                List<DocumentModel> nextDocumentsToBeUpdated;
159                PageProviderService ppService = Framework.getService(PageProviderService.class);
160                Map<String, Serializable> props = new HashMap<>();
161                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
162                @SuppressWarnings("unchecked")
163                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
164                        getPageProviderNameForUpdate(), null, null, null, props);
165                final long maxResult = pp.getPageSize();
166                do {
167                    pp.setCurrentPageOffset(offset);
168                    pp.refresh();
169                    nextDocumentsToBeUpdated = pp.getCurrentPage();
170
171                    if (nextDocumentsToBeUpdated.isEmpty()) {
172                        break;
173                    }
174                    List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId).collect(
175                            Collectors.toList());
176                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(
177                            buildWorkId(repositoryName, offset), blobProviderId);
178                    work.setDocuments(repositoryName, docIds);
179                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
180                    offset += maxResult;
181                } while (nextDocumentsToBeUpdated.size() == maxResult);
182
183            }
184        }
185    }
186
187    private String buildWorkId(String repositoryName, long offset) {
188        return blobProviderId + ':' + repositoryName + ':' + offset;
189    }
190
191    /**
192     * Should be overriden by subclasses wanting to rely on a different fields.
193     */
194    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
195        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
196    }
197
198    @Override
199    @SuppressWarnings("unchecked")
200    public O getOAuth2Provider() {
201        return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
202    }
203
204    @Override
205    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
206        LiveConnectFile file = getFile(fileInfo);
207        return toBlob(file);
208    }
209
210    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
211        BlobInfo blobInfo = new BlobInfo();
212        blobInfo.key = buildBlobKey(file.getInfo());
213        blobInfo.mimeType = file.getMimeType();
214        blobInfo.encoding = file.getEncoding();
215        blobInfo.filename = file.getFilename().replace('/', '-');
216        blobInfo.length = Long.valueOf(file.getFileSize());
217        blobInfo.digest = file.getDigest();
218        return new SimpleManagedBlob(blobInfo);
219    }
220
221    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
222        StringBuilder key = new StringBuilder(blobProviderId);
223        key.append(BLOB_KEY_SEPARATOR);
224        key.append(fileInfo.getUser());
225        key.append(BLOB_KEY_SEPARATOR);
226        key.append(fileInfo.getFileId());
227        if (fileInfo.getRevisionId().isPresent()) {
228            key.append(BLOB_KEY_SEPARATOR);
229            key.append(fileInfo.getRevisionId().get());
230        }
231        return key.toString();
232    }
233
234    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
235        return toFileInfo(blob.getKey());
236    }
237
238    /**
239     * @since 8.4
240     */
241    protected LiveConnectFileInfo toFileInfo(String key) {
242        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
243        // According to buildBlobKey we have :
244        // 0 - blobProviderId
245        // 1 - userId
246        // 2 - fileId
247        // 3 - revisionId (optional)
248        if (keyParts.size() < 3 || keyParts.size() > 4) {
249            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
250        }
251        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
252    }
253
254    /**
255     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
256     *
257     * @param fileInfo the file info
258     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
259     */
260    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
261        LiveConnectFile file = getFileFromCache(fileInfo);
262        if (file == null) {
263            file = retrieveFile(fileInfo);
264            putFileInCache(file);
265        }
266        return file;
267    }
268
269    private Cache getCache() {
270        if (cache == null) {
271            cache = Framework.getService(CacheService.class).getCache(getCacheName());
272        }
273        return cache;
274    }
275
276    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
277        return getCredential(fileInfo.getUser());
278    }
279
280    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
281        return getCredential(token.getServiceLogin());
282    }
283
284    public final synchronized Credential getCredential(String user) throws IOException {
285        // suspend current transaction and start a new one responsible for access token update
286        try {
287            return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user));
288        } catch (UncheckedIOException uioe) {
289            // re-throw IOException because methods implementing interface throwing IOException use this method
290            throw uioe.getCause();
291        }
292    }
293
294    /**
295     * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent
296     * access on it, don't let anyone calling it.
297     */
298    private Credential retrieveAndRefreshCredential(String user) {
299        try {
300            Credential credential = getCredentialFactory().build(user);
301            if (credential == null) {
302                throw new NuxeoException(
303                        "No credentials found for user " + user + " and service " + blobProviderId);
304            }
305            Long expiresInSeconds = credential.getExpiresInSeconds();
306            if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) {
307                credential.refreshToken();
308            }
309            return credential;
310        } catch (IOException e) {
311            throw new UncheckedIOException(e);
312        }
313    }
314
315    /**
316     * Should be overriden by subclasses needing another credential factory.
317     */
318    protected CredentialFactory getCredentialFactory() {
319        return new OAuth2CredentialFactory(getOAuth2Provider());
320    }
321
322    @SuppressWarnings("unchecked")
323    protected final <T extends Serializable> T getFromCache(String key) {
324        return (T) getCache().get(key);
325    }
326
327    protected final <T extends Serializable> void putInCache(String key, T object) {
328        getCache().put(key, object);
329    }
330
331    protected final void invalidateInCache(LiveConnectFileInfo fileInfo) {
332        getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
333    }
334
335    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
336        return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
337    }
338
339    protected final void putFileInCache(LiveConnectFile file) {
340        putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file);
341    }
342
343    /**
344     * Parse a {@link URI}.
345     *
346     * @return the {@link URI} or null if it fails
347     */
348    protected URI asURI(String link) {
349        try {
350            return new URI(link);
351        } catch (URISyntaxException e) {
352            log.error("Invalid URI: " + link, e);
353            return null;
354        }
355    }
356
357    protected abstract String getCacheName();
358
359    protected abstract String getPageProviderNameForUpdate();
360
361    /**
362     * Retrieves the file with API.
363     *
364     * @param fileInfo the file info
365     * @return the file retrieved from API
366     */
367    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
368
369}