001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.Blob;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.ecm.core.api.repository.RepositoryManager;
040import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
041import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo;
042import org.nuxeo.ecm.core.blob.BlobProvider;
043import org.nuxeo.ecm.core.blob.ManagedBlob;
044import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
045import org.nuxeo.ecm.core.cache.Cache;
046import org.nuxeo.ecm.core.cache.CacheService;
047import org.nuxeo.ecm.core.model.Document;
048import org.nuxeo.ecm.core.work.api.WorkManager;
049import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
050import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
051import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
052import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
053import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
054import org.nuxeo.ecm.platform.query.api.PageProvider;
055import org.nuxeo.ecm.platform.query.api.PageProviderService;
056import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
057import org.nuxeo.runtime.api.Framework;
058
059import com.google.api.client.auth.oauth2.Credential;
060import com.google.common.base.Splitter;
061
062/**
063 * Basic implementation of {@link BlobProvider} for live connect.
064 *
065 * @param <O> The OAuth2 service provider type.
066 * @since 8.1
067 */
068public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
069        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider {
070
071    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
072
073    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
074
075    private static final char BLOB_KEY_SEPARATOR = ':';
076
077    /** Resource cache */
078    private Cache cache;
079
080    /**
081     * Should be overriden by subclasses needing something different.
082     */
083    @Override
084    public void close() {
085    }
086
087    /**
088     * Should be overriden by subclasses needing something different.
089     */
090    @Override
091    public Blob readBlob(BlobInfo blobInfo) throws IOException {
092        return new SimpleManagedBlob(blobInfo);
093    }
094
095    /**
096     * Should be overriden by subclasses needing something different.
097     */
098    @Override
099    public String writeBlob(Blob blob, Document doc) throws IOException {
100        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
101    }
102
103    /**
104     * Should be overriden by subclasses needing something different.
105     */
106    @Override
107    public boolean isVersion(ManagedBlob blob) {
108        return toFileInfo(blob).getRevisionId().isPresent();
109    }
110
111    @Override
112    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
113        List<DocumentModel> changedDocuments = new ArrayList<>();
114        for (DocumentModel doc : docs) {
115            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
116            if (blob == null || isVersion(blob)) {
117                continue;
118            }
119            LiveConnectFileInfo fileInfo = toFileInfo(blob);
120            try {
121                LiveConnectFile file = retrieveFile(fileInfo);
122                putFileInCache(file);
123                if (hasChanged(blob, file)) {
124                    if (log.isTraceEnabled()) {
125                        log.trace("Updating blob=" + blob.key);
126                    }
127                    doc.setPropertyValue("content", toBlob(file));
128                    changedDocuments.add(doc);
129                }
130            } catch (IOException e) {
131                log.error("Could not update document=" + fileInfo, e);
132            }
133
134        }
135        return changedDocuments;
136    }
137
138    @Override
139    public void processDocumentsUpdate() {
140        final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class);
141        final WorkManager workManager = Framework.getLocalService(WorkManager.class);
142        for (String repositoryName : repositoryManager.getRepositoryNames()) {
143            try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
144
145                long offset = 0;
146                List<DocumentModel> nextDocumentsToBeUpdated;
147                PageProviderService ppService = Framework.getService(PageProviderService.class);
148                Map<String, Serializable> props = new HashMap<>();
149                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
150                @SuppressWarnings("unchecked")
151                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
152                        getPageProviderNameForUpdate(), null, null, null, props);
153                final long maxResult = pp.getPageSize();
154                do {
155                    pp.setCurrentPageOffset(offset);
156                    pp.refresh();
157                    nextDocumentsToBeUpdated = pp.getCurrentPage();
158
159                    if (nextDocumentsToBeUpdated.isEmpty()) {
160                        break;
161                    }
162                    List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId)
163                            .collect(Collectors.toList());
164                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(buildWorkId(
165                            repositoryName, offset), blobProviderId);
166                    work.setDocuments(repositoryName, docIds);
167                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
168                    offset += maxResult;
169                } while (nextDocumentsToBeUpdated.size() == maxResult);
170
171            }
172        }
173    }
174
175    private String buildWorkId(String repositoryName, long offset) {
176        return blobProviderId + ':' + repositoryName + ':' + offset;
177    }
178
179    /**
180     * Should be overriden by subclasses wanting to rely on a different fields.
181     */
182    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
183        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
184    }
185
186    @Override
187    @SuppressWarnings("unchecked")
188    public O getOAuth2Provider() {
189        return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
190    }
191
192    @Override
193    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
194        LiveConnectFile file = getFile(fileInfo);
195        return toBlob(file);
196    }
197
198    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
199        BlobInfo blobInfo = new BlobInfo();
200        blobInfo.key = buildBlobKey(file.getInfo());
201        blobInfo.mimeType = file.getMimeType();
202        blobInfo.encoding = file.getEncoding();
203        blobInfo.filename = file.getFilename().replace('/', '-');
204        blobInfo.length = file.getFileSize();
205        blobInfo.digest = file.getDigest();
206        return new SimpleManagedBlob(blobInfo);
207    }
208
209    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
210        StringBuilder key = new StringBuilder(blobProviderId);
211        key.append(BLOB_KEY_SEPARATOR);
212        key.append(fileInfo.getUser());
213        key.append(BLOB_KEY_SEPARATOR);
214        key.append(fileInfo.getFileId());
215        if (fileInfo.getRevisionId().isPresent()) {
216            key.append(BLOB_KEY_SEPARATOR);
217            key.append(fileInfo.getRevisionId().get());
218        }
219        return key.toString();
220    }
221
222    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
223        String key = blob.getKey();
224        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
225        // According to buildBlobKey we have :
226        // 0 - blobProviderId
227        // 1 - userId
228        // 2 - fileId
229        // 3 - revisionId (optional)
230        if (keyParts.size() < 3 || keyParts.size() > 4) {
231            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
232        }
233        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
234    }
235
236    /**
237     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
238     *
239     * @param fileInfo the file info
240     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
241     */
242    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
243        LiveConnectFile file = getFileFromCache(fileInfo);
244        if (file == null) {
245            file = retrieveFile(fileInfo);
246            putFileInCache(file);
247        }
248        return file;
249    }
250
251    private Cache getCache() {
252        if (cache == null) {
253            cache = Framework.getService(CacheService.class).getCache(getCacheName());
254        }
255        return cache;
256    }
257
258    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
259        return getCredential(fileInfo.getUser());
260    }
261
262    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
263        return getCredential(token.getServiceLogin());
264    }
265
266    public final synchronized Credential getCredential(String user) throws IOException {
267        Credential credential = getCredentialFactory().build(user);
268        if (credential == null) {
269            throw new NuxeoException("No credentials found for user " + user + " and service " + blobProviderId);
270        }
271        Long expiresInSeconds = credential.getExpiresInSeconds();
272        if (expiresInSeconds != null && expiresInSeconds <= 0) {
273            credential.refreshToken();
274        }
275        return credential;
276    }
277
278    /**
279     * Should be overriden by subclasses needing another credential factory.
280     */
281    protected CredentialFactory getCredentialFactory() {
282        return new OAuth2CredentialFactory(getOAuth2Provider());
283    }
284
285    @SuppressWarnings("unchecked")
286    protected final <T extends Serializable> T getFromCache(String key) {
287        return (T) getCache().get(key);
288    }
289
290    protected final <T extends Serializable> void putInCache(String key, T object) {
291        getCache().put(key, object);
292    }
293
294    protected final void invalidateInCache(String key) {
295        getCache().invalidate(key);
296    }
297
298    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
299        return getFromCache(FILE_CACHE_PREFIX + fileInfo.getFileId());
300    }
301
302    protected final void putFileInCache(LiveConnectFile file) {
303        putInCache(FILE_CACHE_PREFIX + file.getInfo().getFileId(), file);
304    }
305
306    /**
307     * Parse a {@link URI}.
308     *
309     * @return the {@link URI} or null if it fails
310     */
311    protected URI asURI(String link) {
312        try {
313            return new URI(link);
314        } catch (URISyntaxException e) {
315            log.error("Invalid URI: " + link, e);
316            return null;
317        }
318    }
319
320    protected abstract String getCacheName();
321
322    protected abstract String getPageProviderNameForUpdate();
323
324    /**
325     * Retrieves the file with API.
326     *
327     * @param fileInfo the file info
328     * @return the file retrieved from API
329     */
330    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
331
332}