001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.api.repository.RepositoryManager;
041import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
042import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo;
043import org.nuxeo.ecm.core.blob.BlobProvider;
044import org.nuxeo.ecm.core.blob.ManagedBlob;
045import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
046import org.nuxeo.ecm.core.cache.Cache;
047import org.nuxeo.ecm.core.cache.CacheService;
048import org.nuxeo.ecm.core.model.Document;
049import org.nuxeo.ecm.core.work.api.WorkManager;
050import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
051import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
052import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
054import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
055import org.nuxeo.ecm.platform.query.api.PageProvider;
056import org.nuxeo.ecm.platform.query.api.PageProviderService;
057import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
058import org.nuxeo.runtime.api.Framework;
059
060import com.google.api.client.auth.oauth2.Credential;
061import com.google.common.base.Splitter;
062
063/**
064 * Basic implementation of {@link BlobProvider} for live connect.
065 *
066 * @param <O> The OAuth2 service provider type.
067 * @since 8.1
068 */
069public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
070        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider {
071
072    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
073
074    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
075
076    private static final char BLOB_KEY_SEPARATOR = ':';
077
078    static {
079        ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder());
080    }
081
082    /** Resource cache */
083    private Cache cache;
084
085    /**
086     * Should be overriden by subclasses needing something different.
087     */
088    @Override
089    public void close() {
090    }
091
092    /**
093     * Should be overriden by subclasses needing something different.
094     */
095    @Override
096    public Blob readBlob(BlobInfo blobInfo) throws IOException {
097        return toBlob(toFileInfo(blobInfo.key));
098    }
099
100    /**
101     * Should be overriden by subclasses needing something different.
102     */
103    @Override
104    public String writeBlob(Blob blob, Document doc) throws IOException {
105        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
106    }
107
108    @Override
109    public boolean performsExternalAccessControl(BlobInfo blobInfo) {
110        return true;
111    }
112
113    /**
114     * Should be overriden by subclasses needing something different.
115     */
116    @Override
117    public boolean isVersion(ManagedBlob blob) {
118        return toFileInfo(blob).getRevisionId().isPresent();
119    }
120
121    @Override
122    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
123        List<DocumentModel> changedDocuments = new ArrayList<>();
124        for (DocumentModel doc : docs) {
125            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
126            if (blob == null || isVersion(blob)) {
127                continue;
128            }
129            LiveConnectFileInfo fileInfo = toFileInfo(blob);
130            try {
131                LiveConnectFile file = retrieveFile(fileInfo);
132                putFileInCache(file);
133                if (hasChanged(blob, file)) {
134                    if (log.isTraceEnabled()) {
135                        log.trace("Updating blob=" + blob.key);
136                    }
137                    doc.setPropertyValue("content", toBlob(file));
138                    changedDocuments.add(doc);
139                }
140            } catch (IOException e) {
141                log.error("Could not update document=" + fileInfo, e);
142            }
143
144        }
145        return changedDocuments;
146    }
147
148    @Override
149    public void processDocumentsUpdate() {
150        final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class);
151        final WorkManager workManager = Framework.getLocalService(WorkManager.class);
152        for (String repositoryName : repositoryManager.getRepositoryNames()) {
153            try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
154
155                long offset = 0;
156                List<DocumentModel> nextDocumentsToBeUpdated;
157                PageProviderService ppService = Framework.getService(PageProviderService.class);
158                Map<String, Serializable> props = new HashMap<>();
159                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
160                @SuppressWarnings("unchecked")
161                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
162                        getPageProviderNameForUpdate(), null, null, null, props);
163                final long maxResult = pp.getPageSize();
164                do {
165                    pp.setCurrentPageOffset(offset);
166                    pp.refresh();
167                    nextDocumentsToBeUpdated = pp.getCurrentPage();
168
169                    if (nextDocumentsToBeUpdated.isEmpty()) {
170                        break;
171                    }
172                    List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId)
173                            .collect(Collectors.toList());
174                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(buildWorkId(
175                            repositoryName, offset), blobProviderId);
176                    work.setDocuments(repositoryName, docIds);
177                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
178                    offset += maxResult;
179                } while (nextDocumentsToBeUpdated.size() == maxResult);
180
181            }
182        }
183    }
184
185    private String buildWorkId(String repositoryName, long offset) {
186        return blobProviderId + ':' + repositoryName + ':' + offset;
187    }
188
189    /**
190     * Should be overriden by subclasses wanting to rely on a different fields.
191     */
192    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
193        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
194    }
195
196    @Override
197    @SuppressWarnings("unchecked")
198    public O getOAuth2Provider() {
199        return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
200    }
201
202    @Override
203    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
204        LiveConnectFile file = getFile(fileInfo);
205        return toBlob(file);
206    }
207
208    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
209        BlobInfo blobInfo = new BlobInfo();
210        blobInfo.key = buildBlobKey(file.getInfo());
211        blobInfo.mimeType = file.getMimeType();
212        blobInfo.encoding = file.getEncoding();
213        blobInfo.filename = file.getFilename().replace('/', '-');
214        blobInfo.length = file.getFileSize();
215        blobInfo.digest = file.getDigest();
216        return new SimpleManagedBlob(blobInfo);
217    }
218
219    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
220        StringBuilder key = new StringBuilder(blobProviderId);
221        key.append(BLOB_KEY_SEPARATOR);
222        key.append(fileInfo.getUser());
223        key.append(BLOB_KEY_SEPARATOR);
224        key.append(fileInfo.getFileId());
225        if (fileInfo.getRevisionId().isPresent()) {
226            key.append(BLOB_KEY_SEPARATOR);
227            key.append(fileInfo.getRevisionId().get());
228        }
229        return key.toString();
230    }
231
232    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
233        return toFileInfo(blob.getKey());
234    }
235
236    /**
237     * @since 8.4
238     */
239    protected LiveConnectFileInfo toFileInfo(String key) {
240        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
241        // According to buildBlobKey we have :
242        // 0 - blobProviderId
243        // 1 - userId
244        // 2 - fileId
245        // 3 - revisionId (optional)
246        if (keyParts.size() < 3 || keyParts.size() > 4) {
247            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
248        }
249        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
250    }
251
252    /**
253     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
254     *
255     * @param fileInfo the file info
256     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
257     */
258    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
259        LiveConnectFile file = getFileFromCache(fileInfo);
260        if (file == null) {
261            file = retrieveFile(fileInfo);
262            putFileInCache(file);
263        }
264        return file;
265    }
266
267    private Cache getCache() {
268        if (cache == null) {
269            cache = Framework.getService(CacheService.class).getCache(getCacheName());
270        }
271        return cache;
272    }
273
274    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
275        return getCredential(fileInfo.getUser());
276    }
277
278    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
279        return getCredential(token.getServiceLogin());
280    }
281
282    public final synchronized Credential getCredential(String user) throws IOException {
283        Credential credential = getCredentialFactory().build(user);
284        if (credential == null) {
285            throw new NuxeoException("No credentials found for user " + user + " and service " + blobProviderId);
286        }
287        Long expiresInSeconds = credential.getExpiresInSeconds();
288        if (expiresInSeconds != null && expiresInSeconds <= 0) {
289            credential.refreshToken();
290        }
291        return credential;
292    }
293
294    /**
295     * Should be overriden by subclasses needing another credential factory.
296     */
297    protected CredentialFactory getCredentialFactory() {
298        return new OAuth2CredentialFactory(getOAuth2Provider());
299    }
300
301    @SuppressWarnings("unchecked")
302    protected final <T extends Serializable> T getFromCache(String key) {
303        return (T) getCache().get(key);
304    }
305
306    protected final <T extends Serializable> void putInCache(String key, T object) {
307        getCache().put(key, object);
308    }
309
310    protected final void invalidateInCache(LiveConnectFileInfo fileInfo) {
311        getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
312    }
313
314    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
315        return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo));
316    }
317
318    protected final void putFileInCache(LiveConnectFile file) {
319        putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file);
320    }
321
322    /**
323     * Parse a {@link URI}.
324     *
325     * @return the {@link URI} or null if it fails
326     */
327    protected URI asURI(String link) {
328        try {
329            return new URI(link);
330        } catch (URISyntaxException e) {
331            log.error("Invalid URI: " + link, e);
332            return null;
333        }
334    }
335
336    protected abstract String getCacheName();
337
338    protected abstract String getPageProviderNameForUpdate();
339
340    /**
341     * Retrieves the file with API.
342     *
343     * @param fileInfo the file info
344     * @return the file retrieved from API
345     */
346    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
347
348}