001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.stream.Collectors; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModel; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.repository.RepositoryManager; 041import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 042import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo; 043import org.nuxeo.ecm.core.blob.BlobProvider; 044import org.nuxeo.ecm.core.blob.ManagedBlob; 045import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 046import org.nuxeo.ecm.core.cache.Cache; 047import org.nuxeo.ecm.core.cache.CacheService; 048import org.nuxeo.ecm.core.model.Document; 049import org.nuxeo.ecm.core.work.api.WorkManager; 050import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 051import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 052import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 054import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 055import org.nuxeo.ecm.platform.query.api.PageProvider; 056import org.nuxeo.ecm.platform.query.api.PageProviderService; 057import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 058import org.nuxeo.runtime.api.Framework; 059 060import com.google.api.client.auth.oauth2.Credential; 061import com.google.common.base.Splitter; 062 063/** 064 * Basic implementation of {@link BlobProvider} for live connect. 065 * 066 * @param <O> The OAuth2 service provider type. 067 * @since 8.1 068 */ 069public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 070 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider { 071 072 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 073 074 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 075 076 private static final char BLOB_KEY_SEPARATOR = ':'; 077 078 static { 079 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder()); 080 } 081 082 /** Resource cache */ 083 private Cache cache; 084 085 /** 086 * Should be overriden by subclasses needing something different. 087 */ 088 @Override 089 public void close() { 090 } 091 092 /** 093 * Should be overriden by subclasses needing something different. 094 */ 095 @Override 096 public Blob readBlob(BlobInfo blobInfo) throws IOException { 097 return toBlob(toFileInfo(blobInfo.key)); 098 } 099 100 /** 101 * Should be overriden by subclasses needing something different. 102 */ 103 @Override 104 public String writeBlob(Blob blob, Document doc) throws IOException { 105 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 106 } 107 108 @Override 109 public boolean performsExternalAccessControl(BlobInfo blobInfo) { 110 return true; 111 } 112 113 /** 114 * Should be overriden by subclasses needing something different. 115 */ 116 @Override 117 public boolean isVersion(ManagedBlob blob) { 118 return toFileInfo(blob).getRevisionId().isPresent(); 119 } 120 121 @Override 122 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 123 List<DocumentModel> changedDocuments = new ArrayList<>(); 124 for (DocumentModel doc : docs) { 125 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 126 if (blob == null || isVersion(blob)) { 127 continue; 128 } 129 LiveConnectFileInfo fileInfo = toFileInfo(blob); 130 try { 131 LiveConnectFile file = retrieveFile(fileInfo); 132 putFileInCache(file); 133 if (hasChanged(blob, file)) { 134 if (log.isTraceEnabled()) { 135 log.trace("Updating blob=" + blob.key); 136 } 137 doc.setPropertyValue("content", toBlob(file)); 138 changedDocuments.add(doc); 139 } 140 } catch (IOException e) { 141 log.error("Could not update document=" + fileInfo, e); 142 } 143 144 } 145 return changedDocuments; 146 } 147 148 @Override 149 public void processDocumentsUpdate() { 150 final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class); 151 final WorkManager workManager = Framework.getLocalService(WorkManager.class); 152 for (String repositoryName : repositoryManager.getRepositoryNames()) { 153 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 154 155 long offset = 0; 156 List<DocumentModel> nextDocumentsToBeUpdated; 157 PageProviderService ppService = Framework.getService(PageProviderService.class); 158 Map<String, Serializable> props = new HashMap<>(); 159 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 160 @SuppressWarnings("unchecked") 161 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 162 getPageProviderNameForUpdate(), null, null, null, props); 163 final long maxResult = pp.getPageSize(); 164 do { 165 pp.setCurrentPageOffset(offset); 166 pp.refresh(); 167 nextDocumentsToBeUpdated = pp.getCurrentPage(); 168 169 if (nextDocumentsToBeUpdated.isEmpty()) { 170 break; 171 } 172 List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId) 173 .collect(Collectors.toList()); 174 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(buildWorkId( 175 repositoryName, offset), blobProviderId); 176 work.setDocuments(repositoryName, docIds); 177 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 178 offset += maxResult; 179 } while (nextDocumentsToBeUpdated.size() == maxResult); 180 181 } 182 } 183 } 184 185 private String buildWorkId(String repositoryName, long offset) { 186 return blobProviderId + ':' + repositoryName + ':' + offset; 187 } 188 189 /** 190 * Should be overriden by subclasses wanting to rely on a different fields. 191 */ 192 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 193 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 194 } 195 196 @Override 197 @SuppressWarnings("unchecked") 198 public O getOAuth2Provider() { 199 return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 200 } 201 202 @Override 203 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 204 LiveConnectFile file = getFile(fileInfo); 205 return toBlob(file); 206 } 207 208 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 209 BlobInfo blobInfo = new BlobInfo(); 210 blobInfo.key = buildBlobKey(file.getInfo()); 211 blobInfo.mimeType = file.getMimeType(); 212 blobInfo.encoding = file.getEncoding(); 213 blobInfo.filename = file.getFilename().replace('/', '-'); 214 blobInfo.length = file.getFileSize(); 215 blobInfo.digest = file.getDigest(); 216 return new SimpleManagedBlob(blobInfo); 217 } 218 219 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 220 StringBuilder key = new StringBuilder(blobProviderId); 221 key.append(BLOB_KEY_SEPARATOR); 222 key.append(fileInfo.getUser()); 223 key.append(BLOB_KEY_SEPARATOR); 224 key.append(fileInfo.getFileId()); 225 if (fileInfo.getRevisionId().isPresent()) { 226 key.append(BLOB_KEY_SEPARATOR); 227 key.append(fileInfo.getRevisionId().get()); 228 } 229 return key.toString(); 230 } 231 232 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 233 return toFileInfo(blob.getKey()); 234 } 235 236 /** 237 * @since 8.4 238 */ 239 protected LiveConnectFileInfo toFileInfo(String key) { 240 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 241 // According to buildBlobKey we have : 242 // 0 - blobProviderId 243 // 1 - userId 244 // 2 - fileId 245 // 3 - revisionId (optional) 246 if (keyParts.size() < 3 || keyParts.size() > 4) { 247 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 248 } 249 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 250 } 251 252 /** 253 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 254 * 255 * @param fileInfo the file info 256 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 257 */ 258 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 259 LiveConnectFile file = getFileFromCache(fileInfo); 260 if (file == null) { 261 file = retrieveFile(fileInfo); 262 putFileInCache(file); 263 } 264 return file; 265 } 266 267 private Cache getCache() { 268 if (cache == null) { 269 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 270 } 271 return cache; 272 } 273 274 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 275 return getCredential(fileInfo.getUser()); 276 } 277 278 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 279 return getCredential(token.getServiceLogin()); 280 } 281 282 public final synchronized Credential getCredential(String user) throws IOException { 283 Credential credential = getCredentialFactory().build(user); 284 if (credential == null) { 285 throw new NuxeoException("No credentials found for user " + user + " and service " + blobProviderId); 286 } 287 Long expiresInSeconds = credential.getExpiresInSeconds(); 288 if (expiresInSeconds != null && expiresInSeconds <= 0) { 289 credential.refreshToken(); 290 } 291 return credential; 292 } 293 294 /** 295 * Should be overriden by subclasses needing another credential factory. 296 */ 297 protected CredentialFactory getCredentialFactory() { 298 return new OAuth2CredentialFactory(getOAuth2Provider()); 299 } 300 301 @SuppressWarnings("unchecked") 302 protected final <T extends Serializable> T getFromCache(String key) { 303 return (T) getCache().get(key); 304 } 305 306 protected final <T extends Serializable> void putInCache(String key, T object) { 307 getCache().put(key, object); 308 } 309 310 protected final void invalidateInCache(LiveConnectFileInfo fileInfo) { 311 getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 312 } 313 314 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 315 return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 316 } 317 318 protected final void putFileInCache(LiveConnectFile file) { 319 putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file); 320 } 321 322 /** 323 * Parse a {@link URI}. 324 * 325 * @return the {@link URI} or null if it fails 326 */ 327 protected URI asURI(String link) { 328 try { 329 return new URI(link); 330 } catch (URISyntaxException e) { 331 log.error("Invalid URI: " + link, e); 332 return null; 333 } 334 } 335 336 protected abstract String getCacheName(); 337 338 protected abstract String getPageProviderNameForUpdate(); 339 340 /** 341 * Retrieves the file with API. 342 * 343 * @param fileInfo the file info 344 * @return the file retrieved from API 345 */ 346 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 347 348}