001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.io.UncheckedIOException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.stream.Collectors;
031
032import org.apache.commons.lang3.StringUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
036import org.nuxeo.ecm.core.api.Blob;
037import org.nuxeo.ecm.core.api.CoreInstance;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModel;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.api.repository.RepositoryManager;
042import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
043import org.nuxeo.ecm.core.blob.BlobInfo;
044import org.nuxeo.ecm.core.blob.BlobProvider;
045import org.nuxeo.ecm.core.blob.DocumentBlobProvider;
046import org.nuxeo.ecm.core.blob.ManagedBlob;
047import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
048import org.nuxeo.ecm.core.cache.Cache;
049import org.nuxeo.ecm.core.cache.CacheService;
050import org.nuxeo.ecm.core.work.api.WorkManager;
051import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
052import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
054import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
055import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
056import org.nuxeo.ecm.platform.query.api.PageProvider;
057import org.nuxeo.ecm.platform.query.api.PageProviderService;
058import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.transaction.TransactionHelper;
061
062import com.google.api.client.auth.oauth2.Credential;
063import com.google.common.base.Splitter;
064
065/**
066 * Basic implementation of {@link BlobProvider} for live connect.
067 *
068 * @param <O> The OAuth2 service provider type.
069 * @since 8.1
070 */
071public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
072        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider {
073
074    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
075
076    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
077
078    private static final char BLOB_KEY_SEPARATOR = ':';
079
080    static {
081        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder());
082    }
083
084    /** Resource cache */
085    private Cache cache;
086
087    /**
088     * Should be overriden by subclasses needing something different.
089     */
090    @Override
091    public void close() {
092    }
093
094    /**
095     * Should be overriden by subclasses needing something different.
096     */
097    @Override
098    public Blob readBlob(BlobInfo blobInfo) throws IOException {
099        return toBlob(toFileInfo(blobInfo.key));
100    }
101
102    /**
103     * Should be overriden by subclasses needing something different.
104     */
105    @Override
106    public String writeBlob(Blob blob) throws IOException {
107        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
108    }
109
110    @Override
111    public boolean performsExternalAccessControl(BlobInfo blobInfo) {
112        return true;
113    }
114
115    /**
116     * Should be overriden by subclasses needing something different.
117     */
118    @Override
119    public boolean isVersion(ManagedBlob blob) {
120        return toFileInfo(blob).getRevisionId().isPresent();
121    }
122
123    @Override
124    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
125        List<DocumentModel> changedDocuments = new ArrayList<>();
126        for (DocumentModel doc : docs) {
127            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
128            if (blob == null || isVersion(blob)) {
129                continue;
130            }
131            LiveConnectFileInfo fileInfo = toFileInfo(blob);
132            try {
133                LiveConnectFile file = retrieveFile(fileInfo);
134                putFileInCache(file);
135                if (hasChanged(blob, file)) {
136                    if (log.isTraceEnabled()) {
137                        log.trace("Updating blob=" + blob.key);
138                    }
139                    doc.setPropertyValue("content", toBlob(file));
140                    changedDocuments.add(doc);
141                }
142            } catch (IOException e) {
143                log.error("Could not update document=" + fileInfo, e);
144            }
145
146        }
147        return changedDocuments;
148    }
149
150    @Override
151    public void processDocumentsUpdate() {
152        final RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
153        final WorkManager workManager = Framework.getService(WorkManager.class);
154        for (String repositoryName : repositoryManager.getRepositoryNames()) {
155            try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
156
157                long offset = 0;
158                List<DocumentModel> nextDocumentsToBeUpdated;
159                PageProviderService ppService = Framework.getService(PageProviderService.class);
160                Map<String, Serializable> props = new HashMap<>();
161                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
162                @SuppressWarnings("unchecked")
163                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
164                        getPageProviderNameForUpdate(), null, null, null, props);
165                final long maxResult = pp.getPageSize();
166                do {
167                    pp.setCurrentPageOffset(offset);
168                    pp.refresh();
169                    nextDocumentsToBeUpdated = pp.getCurrentPage();
170
171                    if (nextDocumentsToBeUpdated.isEmpty()) {
172                        break;
173                    }
174                    List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId).collect(
175                            Collectors.toList());
176                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(
177                            buildWorkId(repositoryName, offset), blobProviderId);
178                    work.setDocuments(repositoryName, docIds);
179                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
180                    offset += maxResult;
181                } while (nextDocumentsToBeUpdated.size() == maxResult);
182
183            }
184        }
185    }
186
187    private String buildWorkId(String repositoryName, long offset) {
188        return blobProviderId + ':' + repositoryName + ':' + offset;
189    }
190
191    /**
192     * Should be overriden by subclasses wanting to rely on a different fields.
193     */
194    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
195        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
196    }
197
198    @Override
199    @SuppressWarnings("unchecked")
200    public O getOAuth2Provider() {
201        return (O) Framework.getService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
202    }
203
204    @Override
205    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
206        LiveConnectFile file;
207        try {
208            file = getFile(fileInfo);
209        } catch (IOException e) {
210            // we don't want to crash everything if the remote file cannot be accessed
211            log.error("Failed to access file: " + fileInfo, e);
212            file = new ErrorLiveConnectFile(fileInfo);
213        }
214        return toBlob(file);
215    }
216
217    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
218        BlobInfo blobInfo = new BlobInfo();
219        blobInfo.key = buildBlobKey(file.getInfo());
220        blobInfo.mimeType = file.getMimeType();
221        blobInfo.encoding = file.getEncoding();
222        blobInfo.filename = file.getFilename().replace('/', '-');
223        blobInfo.length = Long.valueOf(file.getFileSize());
224        blobInfo.digest = file.getDigest();
225        return new SimpleManagedBlob(blobInfo);
226    }
227
228    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
229        StringBuilder key = new StringBuilder(blobProviderId);
230        key.append(BLOB_KEY_SEPARATOR);
231        key.append(fileInfo.getUser());
232        key.append(BLOB_KEY_SEPARATOR);
233        key.append(fileInfo.getFileId());
234        if (fileInfo.getRevisionId().isPresent()) {
235            key.append(BLOB_KEY_SEPARATOR);
236            key.append(fileInfo.getRevisionId().get());
237        }
238        return key.toString();
239    }
240
241    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
242        return toFileInfo(blob.getKey());
243    }
244
245    /**
246     * @since 8.4
247     */
248    protected LiveConnectFileInfo toFileInfo(String key) {
249        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
250        // According to buildBlobKey we have :
251        // 0 - blobProviderId
252        // 1 - userId
253        // 2 - fileId
254        // 3 - revisionId (optional)
255        if (keyParts.size() < 3 || keyParts.size() > 4) {
256            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
257        }
258        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
259    }
260
261    /**
262     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
263     *
264     * @param fileInfo the file info
265     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
266     */
267    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
268        LiveConnectFile file = getFileFromCache(fileInfo);
269        if (file == null) {
270            file = retrieveFile(fileInfo);
271            putFileInCache(file);
272        }
273        return file;
274    }
275
276    private Cache getCache() {
277        if (cache == null) {
278            cache = Framework.getService(CacheService.class).getCache(getCacheName());
279        }
280        return cache;
281    }
282
283    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
284        return getCredential(fileInfo.getUser());
285    }
286
287    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
288        return getCredential(token.getServiceLogin());
289    }
290
291    public final synchronized Credential getCredential(String user) throws IOException {
292        // suspend current transaction and start a new one responsible for access token update
293        try {
294            return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user));
295        } catch (UncheckedIOException uioe) {
296            // re-throw IOException because methods implementing interface throwing IOException use this method
297            throw uioe.getCause();
298        }
299    }
300
301    /**
302     * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent
303     * access on it, don't let anyone calling it.
304     */
305    private Credential retrieveAndRefreshCredential(String user) {
306        try {
307            Credential credential = getCredentialFactory().build(user);
308            if (credential == null) {
309                String message = "No credentials found for user " + user + " and service " + blobProviderId;
310                // TODO NXP-23860 replace this log.error with a dedicated admin service to record missing credentials
311                log.error(message);
312                throw new IOException(message);
313            }
314            Long expiresInSeconds = credential.getExpiresInSeconds();
315            if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) {
316                credential.refreshToken();
317            }
318            return credential;
319        } catch (IOException e) {
320            throw new UncheckedIOException(e);
321        }
322    }
323
324    /**
325     * Should be overriden by subclasses needing another credential factory.
326     */
327    protected CredentialFactory getCredentialFactory() {
328        return new OAuth2CredentialFactory(getOAuth2Provider());
329    }
330
331    @SuppressWarnings("unchecked")
332    protected final <T extends Serializable> T getFromCache(String key) {
333        return (T) getCache().get(key);
334    }
335
336    protected final <T extends Serializable> void putInCache(String key, T object) {
337        getCache().put(key, object);
338    }
339
340    protected final void invalidateInCache(LiveConnectFileInfo fileInfo) {
341        getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
342    }
343
344    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
345        return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
346    }
347
348    protected final void putFileInCache(LiveConnectFile file) {
349        putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file);
350    }
351
352    /**
353     * Parse a {@link URI}.
354     *
355     * @return the {@link URI} or null if it fails
356     */
357    protected URI asURI(String link) {
358        try {
359            return new URI(link);
360        } catch (URISyntaxException e) {
361            log.error("Invalid URI: " + link, e);
362            return null;
363        }
364    }
365
366    protected abstract String getCacheName();
367
368    protected abstract String getPageProviderNameForUpdate();
369
370    /**
371     * Retrieves the file with API.
372     *
373     * @param fileInfo the file info
374     * @return the file retrieved from API
375     */
376    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
377
378}