001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.io.UncheckedIOException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.stream.Collectors;
032
033import org.apache.commons.lang3.StringUtils;
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
037import org.nuxeo.ecm.core.api.Blob;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.repository.RepositoryManager;
042import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
043import org.nuxeo.ecm.core.blob.BlobInfo;
044import org.nuxeo.ecm.core.blob.BlobProvider;
045import org.nuxeo.ecm.core.blob.DocumentBlobProvider;
046import org.nuxeo.ecm.core.blob.ManagedBlob;
047import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
048import org.nuxeo.ecm.core.cache.Cache;
049import org.nuxeo.ecm.core.cache.CacheService;
050import org.nuxeo.ecm.core.work.api.WorkManager;
051import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
052import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
054import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
055import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
056import org.nuxeo.ecm.platform.query.api.PageProvider;
057import org.nuxeo.ecm.platform.query.api.PageProviderService;
058import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.transaction.TransactionHelper;
061
062import com.google.api.client.auth.oauth2.Credential;
063import com.google.common.base.Splitter;
064
065/**
066 * Basic implementation of {@link BlobProvider} for live connect.
067 *
068 * @param <O> The OAuth2 service provider type.
069 * @since 8.1
070 */
071public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
072        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider {
073
074    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
075
076    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
077
078    private static final char BLOB_KEY_SEPARATOR = ':';
079
080    static {
081        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder());
082    }
083
084    /** Resource cache */
085    private Cache cache;
086
087    /**
088     * Should be overriden by subclasses needing something different.
089     */
090    @Override
091    public void close() {
092    }
093
094    /**
095     * Should be overriden by subclasses needing something different.
096     */
097    @Override
098    public Blob readBlob(BlobInfo blobInfo) throws IOException {
099        return toBlob(toFileInfo(blobInfo.key));
100    }
101
102    /**
103     * Should be overriden by subclasses needing something different.
104     */
105    @Override
106    public String writeBlob(Blob blob) throws IOException {
107        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
108    }
109
110    @Override
111    public boolean performsExternalAccessControl(BlobInfo blobInfo) {
112        return true;
113    }
114
115    @Override
116    public boolean supportsSync() {
117        return supportsUserUpdate();
118    }
119
120    /**
121     * Should be overriden by subclasses needing something different.
122     */
123    @Override
124    public boolean isVersion(ManagedBlob blob) {
125        return toFileInfo(blob).getRevisionId().isPresent();
126    }
127
128    @Override
129    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
130        List<DocumentModel> changedDocuments = new ArrayList<>();
131        for (DocumentModel doc : docs) {
132            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
133            if (blob == null || isVersion(blob)) {
134                continue;
135            }
136            LiveConnectFileInfo fileInfo = toFileInfo(blob);
137            try {
138                LiveConnectFile file = retrieveFile(fileInfo);
139                putFileInCache(file);
140                if (hasChanged(blob, file)) {
141                    if (log.isTraceEnabled()) {
142                        log.trace("Updating blob=" + blob.key);
143                    }
144                    doc.setPropertyValue("content", toBlob(file));
145                    changedDocuments.add(doc);
146                }
147            } catch (IOException e) {
148                log.error("Could not update document=" + fileInfo, e);
149            }
150
151        }
152        return changedDocuments;
153    }
154
155    @Override
156    public void processDocumentsUpdate() {
157        final RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
158        final WorkManager workManager = Framework.getService(WorkManager.class);
159        for (String repositoryName : repositoryManager.getRepositoryNames()) {
160            CoreSession session = CoreInstance.getCoreSessionSystem(repositoryName);
161            long offset = 0;
162            List<DocumentModel> nextDocumentsToBeUpdated;
163            PageProviderService ppService = Framework.getService(PageProviderService.class);
164            Map<String, Serializable> props = new HashMap<>();
165            props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
166            @SuppressWarnings("unchecked")
167            PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
168                    getPageProviderNameForUpdate(), null, null, null, props);
169            final long maxResult = pp.getPageSize();
170            do {
171                pp.setCurrentPageOffset(offset);
172                pp.refresh();
173                nextDocumentsToBeUpdated = pp.getCurrentPage();
174
175                if (nextDocumentsToBeUpdated.isEmpty()) {
176                    break;
177                }
178                List<String> docIds = nextDocumentsToBeUpdated.stream()
179                                                              .map(DocumentModel::getId)
180                                                              .collect(Collectors.toList());
181                BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(
182                        buildWorkId(repositoryName, offset), blobProviderId);
183                work.setDocuments(repositoryName, docIds);
184                workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
185                offset += maxResult;
186            } while (nextDocumentsToBeUpdated.size() == maxResult);
187        }
188    }
189
190    private String buildWorkId(String repositoryName, long offset) {
191        return blobProviderId + ':' + repositoryName + ':' + offset;
192    }
193
194    /**
195     * Should be overriden by subclasses wanting to rely on a different fields.
196     */
197    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
198        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
199    }
200
201    @Override
202    @SuppressWarnings("unchecked")
203    public O getOAuth2Provider() {
204        return (O) Framework.getService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
205    }
206
207    @Override
208    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
209        LiveConnectFile file;
210        try {
211            file = getFile(fileInfo);
212        } catch (IOException e) {
213            // we don't want to crash everything if the remote file cannot be accessed
214            log.error("Failed to access file: " + fileInfo, e);
215            file = new ErrorLiveConnectFile(fileInfo);
216        }
217        return toBlob(file);
218    }
219
220    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
221        BlobInfo blobInfo = new BlobInfo();
222        blobInfo.key = buildBlobKey(file.getInfo());
223        blobInfo.mimeType = file.getMimeType();
224        blobInfo.encoding = file.getEncoding();
225        blobInfo.filename = file.getFilename().replace('/', '-');
226        blobInfo.length = Long.valueOf(file.getFileSize());
227        blobInfo.digest = file.getDigest();
228        return new SimpleManagedBlob(blobInfo);
229    }
230
231    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
232        StringBuilder key = new StringBuilder(blobProviderId);
233        key.append(BLOB_KEY_SEPARATOR);
234        key.append(fileInfo.getUser());
235        key.append(BLOB_KEY_SEPARATOR);
236        key.append(fileInfo.getFileId());
237        Optional<String> revisionId = fileInfo.getRevisionId();
238        if (revisionId.isPresent()) {
239            key.append(BLOB_KEY_SEPARATOR);
240            key.append(revisionId.get());
241        }
242        return key.toString();
243    }
244
245    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
246        return toFileInfo(blob.getKey());
247    }
248
249    /**
250     * @since 8.4
251     */
252    protected LiveConnectFileInfo toFileInfo(String key) {
253        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
254        // According to buildBlobKey we have :
255        // 0 - blobProviderId
256        // 1 - userId
257        // 2 - fileId
258        // 3 - revisionId (optional)
259        if (keyParts.size() < 3 || keyParts.size() > 4) {
260            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
261        }
262        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
263    }
264
265    /**
266     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
267     *
268     * @param fileInfo the file info
269     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
270     */
271    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
272        LiveConnectFile file = getFileFromCache(fileInfo);
273        if (file == null) {
274            file = retrieveFile(fileInfo);
275            putFileInCache(file);
276        }
277        return file;
278    }
279
280    private Cache getCache() {
281        if (cache == null) {
282            cache = Framework.getService(CacheService.class).getCache(getCacheName());
283        }
284        return cache;
285    }
286
287    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
288        return getCredential(fileInfo.getUser());
289    }
290
291    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
292        return getCredential(token.getServiceLogin());
293    }
294
295    public final synchronized Credential getCredential(String user) throws IOException {
296        // suspend current transaction and start a new one responsible for access token update
297        try {
298            return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user));
299        } catch (UncheckedIOException uioe) {
300            // re-throw IOException because methods implementing interface throwing IOException use this method
301            throw uioe.getCause();
302        }
303    }
304
305    /**
306     * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent
307     * access on it, don't let anyone calling it.
308     */
309    private Credential retrieveAndRefreshCredential(String user) {
310        try {
311            Credential credential = getCredentialFactory().build(user);
312            if (credential == null) {
313                String message = "No credentials found for user " + user + " and service " + blobProviderId;
314                // TODO NXP-23860 replace this log.error with a dedicated admin service to record missing credentials
315                log.error(message);
316                throw new IOException(message);
317            }
318            Long expiresInSeconds = credential.getExpiresInSeconds();
319            if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) {
320                credential.refreshToken();
321            }
322            return credential;
323        } catch (IOException e) {
324            throw new UncheckedIOException(e);
325        }
326    }
327
328    /**
329     * Should be overriden by subclasses needing another credential factory.
330     */
331    protected CredentialFactory getCredentialFactory() {
332        return new OAuth2CredentialFactory(getOAuth2Provider());
333    }
334
335    @SuppressWarnings("unchecked")
336    protected final <T extends Serializable> T getFromCache(String key) {
337        return (T) getCache().get(key);
338    }
339
340    protected final <T extends Serializable> void putInCache(String key, T object) {
341        getCache().put(key, object);
342    }
343
344    protected final void invalidateInCache(LiveConnectFileInfo fileInfo) {
345        getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
346    }
347
348    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
349        return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
350    }
351
352    protected final void putFileInCache(LiveConnectFile file) {
353        putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file);
354    }
355
356    /**
357     * Parse a {@link URI}.
358     *
359     * @return the {@link URI} or null if it fails
360     */
361    protected URI asURI(String link) {
362        try {
363            return new URI(link);
364        } catch (URISyntaxException e) {
365            log.error("Invalid URI: " + link, e);
366            return null;
367        }
368    }
369
370    protected abstract String getCacheName();
371
372    protected abstract String getPageProviderNameForUpdate();
373
374    /**
375     * Retrieves the file with API.
376     *
377     * @param fileInfo the file info
378     * @return the file retrieved from API
379     */
380    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
381
382}