001/*
002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.liveconnect.core;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.Blob;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.repository.RepositoryManager;
039import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
040import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo;
041import org.nuxeo.ecm.core.blob.BlobProvider;
042import org.nuxeo.ecm.core.blob.ManagedBlob;
043import org.nuxeo.ecm.core.blob.SimpleManagedBlob;
044import org.nuxeo.ecm.core.cache.Cache;
045import org.nuxeo.ecm.core.cache.CacheService;
046import org.nuxeo.ecm.core.model.Document;
047import org.nuxeo.ecm.core.work.api.WorkManager;
048import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider;
049import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
050import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider;
051import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry;
052import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token;
053import org.nuxeo.ecm.platform.query.api.PageProvider;
054import org.nuxeo.ecm.platform.query.api.PageProviderService;
055import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
056import org.nuxeo.runtime.api.Framework;
057
058import com.google.api.client.auth.oauth2.Credential;
059import com.google.common.base.Splitter;
060
061/**
062 * Basic implementation of {@link BlobProvider} for live connect.
063 *
064 * @param <O> The OAuth2 service provider type.
065 * @since 8.1
066 */
067public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider
068        implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider {
069
070    private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class);
071
072    private static final String FILE_CACHE_PREFIX = "liveconnect_file_";
073
074    private static final char BLOB_KEY_SEPARATOR = ':';
075
076    /** Resource cache */
077    private Cache cache;
078
079    /**
080     * Should be overriden by subclasses needing something different.
081     */
082    @Override
083    public void close() {
084    }
085
086    /**
087     * Should be overriden by subclasses needing something different.
088     */
089    @Override
090    public Blob readBlob(BlobInfo blobInfo) throws IOException {
091        return new SimpleManagedBlob(blobInfo);
092    }
093
094    /**
095     * Should be overriden by subclasses needing something different.
096     */
097    @Override
098    public String writeBlob(Blob blob, Document doc) throws IOException {
099        throw new UnsupportedOperationException("Writing a blob to live connect service is not supported");
100    }
101
102    /**
103     * Should be overriden by subclasses needing something different.
104     */
105    @Override
106    public boolean isVersion(ManagedBlob blob) {
107        return toFileInfo(blob).getRevisionId().isPresent();
108    }
109
110    @Override
111    public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) {
112        List<DocumentModel> changedDocuments = new ArrayList<>();
113        for (DocumentModel doc : docs) {
114            final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue();
115            if (blob == null || isVersion(blob)) {
116                continue;
117            }
118            LiveConnectFileInfo fileInfo = toFileInfo(blob);
119            try {
120                LiveConnectFile file = retrieveFile(fileInfo);
121                putFileInCache(file);
122                if (hasChanged(blob, file)) {
123                    if (log.isTraceEnabled()) {
124                        log.trace("Updating blob=" + blob.key);
125                    }
126                    doc.setPropertyValue("content", toBlob(file));
127                    changedDocuments.add(doc);
128                }
129            } catch (IOException e) {
130                log.error("Could not update document=" + fileInfo, e);
131            }
132
133        }
134        return changedDocuments;
135    }
136
137    @Override
138    public void processDocumentsUpdate() {
139        final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class);
140        final WorkManager workManager = Framework.getLocalService(WorkManager.class);
141        for (String repositoryName : repositoryManager.getRepositoryNames()) {
142            try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) {
143
144                long offset = 0;
145                List<DocumentModel> nextDocumentsToBeUpdated;
146                PageProviderService ppService = Framework.getService(PageProviderService.class);
147                Map<String, Serializable> props = new HashMap<>();
148                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
149                @SuppressWarnings("unchecked")
150                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
151                        getPageProviderNameForUpdate(), null, null, null, props);
152                final long maxResult = pp.getPageSize();
153                do {
154                    pp.setCurrentPageOffset(offset);
155                    pp.refresh();
156                    nextDocumentsToBeUpdated = pp.getCurrentPage();
157
158                    if (nextDocumentsToBeUpdated.isEmpty()) {
159                        break;
160                    }
161                    List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId)
162                            .collect(Collectors.toList());
163                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(buildWorkId(
164                            repositoryName, offset), blobProviderId);
165                    work.setDocuments(repositoryName, docIds);
166                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
167                    offset += maxResult;
168                } while (nextDocumentsToBeUpdated.size() == maxResult);
169
170            }
171        }
172    }
173
174    private String buildWorkId(String repositoryName, long offset) {
175        return blobProviderId + ':' + repositoryName + ':' + offset;
176    }
177
178    /**
179     * Should be overriden by subclasses wanting to rely on a different fields.
180     */
181    protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) {
182        return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest());
183    }
184
185    @Override
186    @SuppressWarnings("unchecked")
187    public O getOAuth2Provider() {
188        return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId);
189    }
190
191    @Override
192    public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException {
193        LiveConnectFile file = getFile(fileInfo);
194        return toBlob(file);
195    }
196
197    protected SimpleManagedBlob toBlob(LiveConnectFile file) {
198        BlobInfo blobInfo = new BlobInfo();
199        blobInfo.key = buildBlobKey(file.getInfo());
200        blobInfo.mimeType = file.getMimeType();
201        blobInfo.encoding = file.getEncoding();
202        blobInfo.filename = file.getFilename().replace('/', '-');
203        blobInfo.length = file.getFileSize();
204        blobInfo.digest = file.getDigest();
205        return new SimpleManagedBlob(blobInfo);
206    }
207
208    protected String buildBlobKey(LiveConnectFileInfo fileInfo) {
209        StringBuilder key = new StringBuilder(blobProviderId);
210        key.append(BLOB_KEY_SEPARATOR);
211        key.append(fileInfo.getUser());
212        key.append(BLOB_KEY_SEPARATOR);
213        key.append(fileInfo.getFileId());
214        if (fileInfo.getRevisionId().isPresent()) {
215            key.append(BLOB_KEY_SEPARATOR);
216            key.append(fileInfo.getRevisionId().get());
217        }
218        return key.toString();
219    }
220
221    protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) {
222        String key = blob.getKey();
223        List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key);
224        // According to buildBlobKey we have :
225        // 0 - blobProviderId
226        // 1 - userId
227        // 2 - fileId
228        // 3 - revisionId (optional)
229        if (keyParts.size() < 3 || keyParts.size() > 4) {
230            throw new IllegalArgumentException("The key doesn't have a valid format=" + key);
231        }
232        return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null);
233    }
234
235    /**
236     * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it.
237     *
238     * @param fileInfo the file info
239     * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it
240     */
241    protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException {
242        LiveConnectFile file = getFileFromCache(fileInfo);
243        if (file == null) {
244            file = retrieveFile(fileInfo);
245            putFileInCache(file);
246        }
247        return file;
248    }
249
250    private Cache getCache() {
251        if (cache == null) {
252            cache = Framework.getService(CacheService.class).getCache(getCacheName());
253        }
254        return cache;
255    }
256
257    protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException {
258        return getCredential(fileInfo.getUser());
259    }
260
261    protected Credential getCredential(NuxeoOAuth2Token token) throws IOException {
262        return getCredential(token.getServiceLogin());
263    }
264
265    public final Credential getCredential(String user) throws IOException {
266        Credential credential = getCredentialFactory().build(user);
267        Long expiresInSeconds = credential.getExpiresInSeconds();
268        if (expiresInSeconds != null && expiresInSeconds <= 0) {
269            credential.refreshToken();
270        }
271        return credential;
272    }
273
274    /**
275     * Should be overriden by subclasses needing another credential factory.
276     */
277    protected CredentialFactory getCredentialFactory() {
278        return new OAuth2CredentialFactory(getOAuth2Provider());
279    }
280
281    @SuppressWarnings("unchecked")
282    protected final <T extends Serializable> T getFromCache(String key) {
283        return (T) getCache().get(key);
284    }
285
286    protected final <T extends Serializable> void putInCache(String key, T object) {
287        getCache().put(key, object);
288    }
289
290    protected final void invalidateInCache(String key) {
291        getCache().invalidate(key);
292    }
293
294    protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) {
295        return getFromCache(FILE_CACHE_PREFIX + fileInfo.getFileId());
296    }
297
298    protected final void putFileInCache(LiveConnectFile file) {
299        putInCache(FILE_CACHE_PREFIX + file.getInfo().getFileId(), file);
300    }
301
302    /**
303     * Parse a {@link URI}.
304     *
305     * @return the {@link URI} or null if it fails
306     */
307    protected URI asURI(String link) {
308        try {
309            return new URI(link);
310        } catch (URISyntaxException e) {
311            log.error("Invalid URI: " + link, e);
312            return null;
313        }
314    }
315
316    protected abstract String getCacheName();
317
318    protected abstract String getPageProviderNameForUpdate();
319
320    /**
321     * Retrieves the file with API.
322     *
323     * @param fileInfo the file info
324     * @return the file retrieved from API
325     */
326    protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException;
327
328}