001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.io.UncheckedIOException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.stream.Collectors;
032
033import org.apache.commons.lang3.StringUtils;
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.CloseableCoreSession;
039import org.nuxeo.ecm.core.api.CoreInstance;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.repository.RepositoryManager;
044import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
045import org.nuxeo.ecm.core.blob.BlobInfo;
046import org.nuxeo.ecm.core.blob.BlobProvider;
047import org.nuxeo.ecm.core.blob.DocumentBlobProvider;
048import org.nuxeo.ecm.core.blob.ManagedBlob;
049import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
050import org.nuxeo.ecm.core.cache.Cache;
051import org.nuxeo.ecm.core.cache.CacheService;
052import org.nuxeo.ecm.core.work.api.WorkManager;
053import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
054import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
055import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
056import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
057import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
058import org.nuxeo.ecm.platform.query.api.PageProvider;
059import org.nuxeo.ecm.platform.query.api.PageProviderService;
060import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
061import org.nuxeo.runtime.api.Framework;
062import org.nuxeo.runtime.transaction.TransactionHelper;
063
064import com.google.api.client.auth.oauth2.Credential;
065import com.google.common.base.Splitter;
066
067/**
068 * Basic implementation of {@link BlobProvider} for live connect.
069 *
070 * @param <O> The OAuth2 service provider type.
071 * @since 8.1
072 */
073public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
074        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider {
075
076    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
077
078    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
079
080    private static final char BLOB_KEY_SEPARATOR = ':';
081
082    static {
083        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder());
084    }
085
086    /** Resource cache */
087    private Cache cache;
088
089    /**
090     * Should be overriden by subclasses needing something different.
091     */
092    @Override
093    public void close() {
094    }
095
096    /**
097     * Should be overriden by subclasses needing something different.
098     */
099    @Override
100    public Blob readBlob(BlobInfo blobInfo) throws IOException {
101        return toBlob(toFileInfo(blobInfo.key));
102    }
103
104    /**
105     * Should be overriden by subclasses needing something different.
106     */
107    @Override
108    public String writeBlob(Blob blob) throws IOException {
109        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
110    }
111
112    @Override
113    public boolean performsExternalAccessControl(BlobInfo blobInfo) {
114        return true;
115    }
116
117    /**
118     * Should be overriden by subclasses needing something different.
119     */
120    @Override
121    public boolean isVersion(ManagedBlob blob) {
122        return toFileInfo(blob).getRevisionId().isPresent();
123    }
124
125    @Override
126    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
127        List<DocumentModel> changedDocuments = new ArrayList<>();
128        for (DocumentModel doc : docs) {
129            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
130            if (blob == null || isVersion(blob)) {
131                continue;
132            }
133            LiveConnectFileInfo fileInfo = toFileInfo(blob);
134            try {
135                LiveConnectFile file = retrieveFile(fileInfo);
136                putFileInCache(file);
137                if (hasChanged(blob, file)) {
138                    if (log.isTraceEnabled()) {
139                        log.trace("Updating blob=" + blob.key);
140                    }
141                    doc.setPropertyValue("content", toBlob(file));
142                    changedDocuments.add(doc);
143                }
144            } catch (IOException e) {
145                log.error("Could not update document=" + fileInfo, e);
146            }
147
148        }
149        return changedDocuments;
150    }
151
152    @Override
153    public void processDocumentsUpdate() {
154        final RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
155        final WorkManager workManager = Framework.getService(WorkManager.class);
156        for (String repositoryName : repositoryManager.getRepositoryNames()) {
157            try (CloseableCoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
158
159                long offset = 0;
160                List<DocumentModel> nextDocumentsToBeUpdated;
161                PageProviderService ppService = Framework.getService(PageProviderService.class);
162                Map<String, Serializable> props = new HashMap<>();
163                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
164                @SuppressWarnings("unchecked")
165                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
166                        getPageProviderNameForUpdate(), null, null, null, props);
167                final long maxResult = pp.getPageSize();
168                do {
169                    pp.setCurrentPageOffset(offset);
170                    pp.refresh();
171                    nextDocumentsToBeUpdated = pp.getCurrentPage();
172
173                    if (nextDocumentsToBeUpdated.isEmpty()) {
174                        break;
175                    }
176                    List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId).collect(
177                            Collectors.toList());
178                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(
179                            buildWorkId(repositoryName, offset), blobProviderId);
180                    work.setDocuments(repositoryName, docIds);
181                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
182                    offset += maxResult;
183                } while (nextDocumentsToBeUpdated.size() == maxResult);
184
185            }
186        }
187    }
188
189    private String buildWorkId(String repositoryName, long offset) {
190        return blobProviderId + ':' + repositoryName + ':' + offset;
191    }
192
193    /**
194     * Should be overriden by subclasses wanting to rely on a different fields.
195     */
196    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
197        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
198    }
199
200    @Override
201    @SuppressWarnings("unchecked")
202    public O getOAuth2Provider() {
203        return (O) Framework.getService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
204    }
205
206    @Override
207    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
208        LiveConnectFile file;
209        try {
210            file = getFile(fileInfo);
211        } catch (IOException e) {
212            // we don't want to crash everything if the remote file cannot be accessed
213            log.error("Failed to access file: " + fileInfo, e);
214            file = new ErrorLiveConnectFile(fileInfo);
215        }
216        return toBlob(file);
217    }
218
219    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
220        BlobInfo blobInfo = new BlobInfo();
221        blobInfo.key = buildBlobKey(file.getInfo());
222        blobInfo.mimeType = file.getMimeType();
223        blobInfo.encoding = file.getEncoding();
224        blobInfo.filename = file.getFilename().replace('/', '-');
225        blobInfo.length = Long.valueOf(file.getFileSize());
226        blobInfo.digest = file.getDigest();
227        return new SimpleManagedBlob(blobInfo);
228    }
229
230    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
231        StringBuilder key = new StringBuilder(blobProviderId);
232        key.append(BLOB_KEY_SEPARATOR);
233        key.append(fileInfo.getUser());
234        key.append(BLOB_KEY_SEPARATOR);
235        key.append(fileInfo.getFileId());
236        Optional<String> revisionId = fileInfo.getRevisionId();
237        if (revisionId.isPresent()) {
238            key.append(BLOB_KEY_SEPARATOR);
239            key.append(revisionId.get());
240        }
241        return key.toString();
242    }
243
244    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
245        return toFileInfo(blob.getKey());
246    }
247
248    /**
249     * @since 8.4
250     */
251    protected LiveConnectFileInfo toFileInfo(String key) {
252        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
253        // According to buildBlobKey we have :
254        // 0 - blobProviderId
255        // 1 - userId
256        // 2 - fileId
257        // 3 - revisionId (optional)
258        if (keyParts.size() < 3 || keyParts.size() > 4) {
259            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
260        }
261        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
262    }
263
264    /**
265     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
266     *
267     * @param fileInfo the file info
268     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
269     */
270    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
271        LiveConnectFile file = getFileFromCache(fileInfo);
272        if (file == null) {
273            file = retrieveFile(fileInfo);
274            putFileInCache(file);
275        }
276        return file;
277    }
278
279    private Cache getCache() {
280        if (cache == null) {
281            cache = Framework.getService(CacheService.class).getCache(getCacheName());
282        }
283        return cache;
284    }
285
286    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
287        return getCredential(fileInfo.getUser());
288    }
289
290    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
291        return getCredential(token.getServiceLogin());
292    }
293
294    public final synchronized Credential getCredential(String user) throws IOException {
295        // suspend current transaction and start a new one responsible for access token update
296        try {
297            return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user));
298        } catch (UncheckedIOException uioe) {
299            // re-throw IOException because methods implementing interface throwing IOException use this method
300            throw uioe.getCause();
301        }
302    }
303
304    /**
305     * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent
306     * access on it, don't let anyone calling it.
307     */
308    private Credential retrieveAndRefreshCredential(String user) {
309        try {
310            Credential credential = getCredentialFactory().build(user);
311            if (credential == null) {
312                String message = "No credentials found for user " + user + " and service " + blobProviderId;
313                // TODO NXP-23860 replace this log.error with a dedicated admin service to record missing credentials
314                log.error(message);
315                throw new IOException(message);
316            }
317            Long expiresInSeconds = credential.getExpiresInSeconds();
318            if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) {
319                credential.refreshToken();
320            }
321            return credential;
322        } catch (IOException e) {
323            throw new UncheckedIOException(e);
324        }
325    }
326
327    /**
328     * Should be overriden by subclasses needing another credential factory.
329     */
330    protected CredentialFactory getCredentialFactory() {
331        return new OAuth2CredentialFactory(getOAuth2Provider());
332    }
333
334    @SuppressWarnings("unchecked")
335    protected final <T extends Serializable> T getFromCache(String key) {
336        return (T) getCache().get(key);
337    }
338
339    protected final <T extends Serializable> void putInCache(String key, T object) {
340        getCache().put(key, object);
341    }
342
343    protected final void invalidateInCache(LiveConnectFileInfo fileInfo) {
344        getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
345    }
346
347    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
348        return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
349    }
350
351    protected final void putFileInCache(LiveConnectFile file) {
352        putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file);
353    }
354
355    /**
356     * Parse a {@link URI}.
357     *
358     * @return the {@link URI} or null if it fails
359     */
360    protected URI asURI(String link) {
361        try {
362            return new URI(link);
363        } catch (URISyntaxException e) {
364            log.error("Invalid URI: " + link, e);
365            return null;
366        }
367    }
368
369    protected abstract String getCacheName();
370
371    protected abstract String getPageProviderNameForUpdate();
372
373    /**
374     * Retrieves the file with API.
375     *
376     * @param fileInfo the file info
377     * @return the file retrieved from API
378     */
379    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
380
381}