001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.stream.Collectors; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.Blob; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.ecm.core.api.repository.RepositoryManager; 040import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 041import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo; 042import org.nuxeo.ecm.core.blob.BlobProvider; 043import org.nuxeo.ecm.core.blob.ManagedBlob; 044import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 045import org.nuxeo.ecm.core.cache.Cache; 046import org.nuxeo.ecm.core.cache.CacheService; 047import org.nuxeo.ecm.core.model.Document; 048import org.nuxeo.ecm.core.work.api.WorkManager; 049import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 050import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 051import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 052import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 053import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 054import org.nuxeo.ecm.platform.query.api.PageProvider; 055import org.nuxeo.ecm.platform.query.api.PageProviderService; 056import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 057import org.nuxeo.runtime.api.Framework; 058 059import com.google.api.client.auth.oauth2.Credential; 060import com.google.common.base.Splitter; 061 062/** 063 * Basic implementation of {@link BlobProvider} for live connect. 064 * 065 * @param <O> The OAuth2 service provider type. 066 * @since 8.1 067 */ 068public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 069 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider { 070 071 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 072 073 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 074 075 private static final char BLOB_KEY_SEPARATOR = ':'; 076 077 /** Resource cache */ 078 private Cache cache; 079 080 /** 081 * Should be overriden by subclasses needing something different. 082 */ 083 @Override 084 public void close() { 085 } 086 087 /** 088 * Should be overriden by subclasses needing something different. 089 */ 090 @Override 091 public Blob readBlob(BlobInfo blobInfo) throws IOException { 092 return new SimpleManagedBlob(blobInfo); 093 } 094 095 /** 096 * Should be overriden by subclasses needing something different. 097 */ 098 @Override 099 public String writeBlob(Blob blob, Document doc) throws IOException { 100 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 101 } 102 103 /** 104 * Should be overriden by subclasses needing something different. 105 */ 106 @Override 107 public boolean isVersion(ManagedBlob blob) { 108 return toFileInfo(blob).getRevisionId().isPresent(); 109 } 110 111 @Override 112 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 113 List<DocumentModel> changedDocuments = new ArrayList<>(); 114 for (DocumentModel doc : docs) { 115 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 116 if (blob == null || isVersion(blob)) { 117 continue; 118 } 119 LiveConnectFileInfo fileInfo = toFileInfo(blob); 120 try { 121 LiveConnectFile file = retrieveFile(fileInfo); 122 putFileInCache(file); 123 if (hasChanged(blob, file)) { 124 if (log.isTraceEnabled()) { 125 log.trace("Updating blob=" + blob.key); 126 } 127 doc.setPropertyValue("content", toBlob(file)); 128 changedDocuments.add(doc); 129 } 130 } catch (IOException e) { 131 log.error("Could not update document=" + fileInfo, e); 132 } 133 134 } 135 return changedDocuments; 136 } 137 138 @Override 139 public void processDocumentsUpdate() { 140 final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class); 141 final WorkManager workManager = Framework.getLocalService(WorkManager.class); 142 for (String repositoryName : repositoryManager.getRepositoryNames()) { 143 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 144 145 long offset = 0; 146 List<DocumentModel> nextDocumentsToBeUpdated; 147 PageProviderService ppService = Framework.getService(PageProviderService.class); 148 Map<String, Serializable> props = new HashMap<>(); 149 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 150 @SuppressWarnings("unchecked") 151 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 152 getPageProviderNameForUpdate(), null, null, null, props); 153 final long maxResult = pp.getPageSize(); 154 do { 155 pp.setCurrentPageOffset(offset); 156 pp.refresh(); 157 nextDocumentsToBeUpdated = pp.getCurrentPage(); 158 159 if (nextDocumentsToBeUpdated.isEmpty()) { 160 break; 161 } 162 List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId) 163 .collect(Collectors.toList()); 164 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(buildWorkId( 165 repositoryName, offset), blobProviderId); 166 work.setDocuments(repositoryName, docIds); 167 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 168 offset += maxResult; 169 } while (nextDocumentsToBeUpdated.size() == maxResult); 170 171 } 172 } 173 } 174 175 private String buildWorkId(String repositoryName, long offset) { 176 return blobProviderId + ':' + repositoryName + ':' + offset; 177 } 178 179 /** 180 * Should be overriden by subclasses wanting to rely on a different fields. 181 */ 182 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 183 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 184 } 185 186 @Override 187 @SuppressWarnings("unchecked") 188 public O getOAuth2Provider() { 189 return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 190 } 191 192 @Override 193 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 194 LiveConnectFile file = getFile(fileInfo); 195 return toBlob(file); 196 } 197 198 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 199 BlobInfo blobInfo = new BlobInfo(); 200 blobInfo.key = buildBlobKey(file.getInfo()); 201 blobInfo.mimeType = file.getMimeType(); 202 blobInfo.encoding = file.getEncoding(); 203 blobInfo.filename = file.getFilename().replace('/', '-'); 204 blobInfo.length = file.getFileSize(); 205 blobInfo.digest = file.getDigest(); 206 return new SimpleManagedBlob(blobInfo); 207 } 208 209 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 210 StringBuilder key = new StringBuilder(blobProviderId); 211 key.append(BLOB_KEY_SEPARATOR); 212 key.append(fileInfo.getUser()); 213 key.append(BLOB_KEY_SEPARATOR); 214 key.append(fileInfo.getFileId()); 215 if (fileInfo.getRevisionId().isPresent()) { 216 key.append(BLOB_KEY_SEPARATOR); 217 key.append(fileInfo.getRevisionId().get()); 218 } 219 return key.toString(); 220 } 221 222 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 223 String key = blob.getKey(); 224 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 225 // According to buildBlobKey we have : 226 // 0 - blobProviderId 227 // 1 - userId 228 // 2 - fileId 229 // 3 - revisionId (optional) 230 if (keyParts.size() < 3 || keyParts.size() > 4) { 231 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 232 } 233 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 234 } 235 236 /** 237 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 238 * 239 * @param fileInfo the file info 240 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 241 */ 242 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 243 LiveConnectFile file = getFileFromCache(fileInfo); 244 if (file == null) { 245 file = retrieveFile(fileInfo); 246 putFileInCache(file); 247 } 248 return file; 249 } 250 251 private Cache getCache() { 252 if (cache == null) { 253 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 254 } 255 return cache; 256 } 257 258 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 259 return getCredential(fileInfo.getUser()); 260 } 261 262 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 263 return getCredential(token.getServiceLogin()); 264 } 265 266 public final synchronized Credential getCredential(String user) throws IOException { 267 Credential credential = getCredentialFactory().build(user); 268 if (credential == null) { 269 throw new NuxeoException("No credentials found for user " + user + " and service " + blobProviderId); 270 } 271 Long expiresInSeconds = credential.getExpiresInSeconds(); 272 if (expiresInSeconds != null && expiresInSeconds <= 0) { 273 credential.refreshToken(); 274 } 275 return credential; 276 } 277 278 /** 279 * Should be overriden by subclasses needing another credential factory. 280 */ 281 protected CredentialFactory getCredentialFactory() { 282 return new OAuth2CredentialFactory(getOAuth2Provider()); 283 } 284 285 @SuppressWarnings("unchecked") 286 protected final <T extends Serializable> T getFromCache(String key) { 287 return (T) getCache().get(key); 288 } 289 290 protected final <T extends Serializable> void putInCache(String key, T object) { 291 getCache().put(key, object); 292 } 293 294 protected final void invalidateInCache(String key) { 295 getCache().invalidate(key); 296 } 297 298 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 299 return getFromCache(FILE_CACHE_PREFIX + fileInfo.getFileId()); 300 } 301 302 protected final void putFileInCache(LiveConnectFile file) { 303 putInCache(FILE_CACHE_PREFIX + file.getInfo().getFileId(), file); 304 } 305 306 /** 307 * Parse a {@link URI}. 308 * 309 * @return the {@link URI} or null if it fails 310 */ 311 protected URI asURI(String link) { 312 try { 313 return new URI(link); 314 } catch (URISyntaxException e) { 315 log.error("Invalid URI: " + link, e); 316 return null; 317 } 318 } 319 320 protected abstract String getCacheName(); 321 322 protected abstract String getPageProviderNameForUpdate(); 323 324 /** 325 * Retrieves the file with API. 326 * 327 * @param fileInfo the file info 328 * @return the file retrieved from API 329 */ 330 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 331 332}