001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.io.UncheckedIOException; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.stream.Collectors; 031 032import org.apache.commons.lang3.StringUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 036import org.nuxeo.ecm.core.api.Blob; 037import org.nuxeo.ecm.core.api.CoreInstance; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.DocumentModel; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.api.repository.RepositoryManager; 042import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 043import org.nuxeo.ecm.core.blob.BlobInfo; 044import org.nuxeo.ecm.core.blob.BlobProvider; 045import org.nuxeo.ecm.core.blob.DocumentBlobProvider; 046import org.nuxeo.ecm.core.blob.ManagedBlob; 047import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 048import org.nuxeo.ecm.core.cache.Cache; 049import org.nuxeo.ecm.core.cache.CacheService; 050import org.nuxeo.ecm.core.work.api.WorkManager; 051import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 052import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 054import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 055import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 056import org.nuxeo.ecm.platform.query.api.PageProvider; 057import org.nuxeo.ecm.platform.query.api.PageProviderService; 058import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.transaction.TransactionHelper; 061 062import com.google.api.client.auth.oauth2.Credential; 063import com.google.common.base.Splitter; 064 065/** 066 * Basic implementation of {@link BlobProvider} for live connect. 067 * 068 * @param <O> The OAuth2 service provider type. 069 * @since 8.1 070 */ 071public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 072 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider { 073 074 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 075 076 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 077 078 private static final char BLOB_KEY_SEPARATOR = ':'; 079 080 static { 081 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder()); 082 } 083 084 /** Resource cache */ 085 private Cache cache; 086 087 /** 088 * Should be overriden by subclasses needing something different. 089 */ 090 @Override 091 public void close() { 092 } 093 094 /** 095 * Should be overriden by subclasses needing something different. 096 */ 097 @Override 098 public Blob readBlob(BlobInfo blobInfo) throws IOException { 099 return toBlob(toFileInfo(blobInfo.key)); 100 } 101 102 /** 103 * Should be overriden by subclasses needing something different. 104 */ 105 @Override 106 public String writeBlob(Blob blob) throws IOException { 107 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 108 } 109 110 @Override 111 public boolean performsExternalAccessControl(BlobInfo blobInfo) { 112 return true; 113 } 114 115 /** 116 * Should be overriden by subclasses needing something different. 117 */ 118 @Override 119 public boolean isVersion(ManagedBlob blob) { 120 return toFileInfo(blob).getRevisionId().isPresent(); 121 } 122 123 @Override 124 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 125 List<DocumentModel> changedDocuments = new ArrayList<>(); 126 for (DocumentModel doc : docs) { 127 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 128 if (blob == null || isVersion(blob)) { 129 continue; 130 } 131 LiveConnectFileInfo fileInfo = toFileInfo(blob); 132 try { 133 LiveConnectFile file = retrieveFile(fileInfo); 134 putFileInCache(file); 135 if (hasChanged(blob, file)) { 136 if (log.isTraceEnabled()) { 137 log.trace("Updating blob=" + blob.key); 138 } 139 doc.setPropertyValue("content", toBlob(file)); 140 changedDocuments.add(doc); 141 } 142 } catch (IOException e) { 143 log.error("Could not update document=" + fileInfo, e); 144 } 145 146 } 147 return changedDocuments; 148 } 149 150 @Override 151 public void processDocumentsUpdate() { 152 final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class); 153 final WorkManager workManager = Framework.getLocalService(WorkManager.class); 154 for (String repositoryName : repositoryManager.getRepositoryNames()) { 155 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 156 157 long offset = 0; 158 List<DocumentModel> nextDocumentsToBeUpdated; 159 PageProviderService ppService = Framework.getService(PageProviderService.class); 160 Map<String, Serializable> props = new HashMap<>(); 161 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 162 @SuppressWarnings("unchecked") 163 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 164 getPageProviderNameForUpdate(), null, null, null, props); 165 final long maxResult = pp.getPageSize(); 166 do { 167 pp.setCurrentPageOffset(offset); 168 pp.refresh(); 169 nextDocumentsToBeUpdated = pp.getCurrentPage(); 170 171 if (nextDocumentsToBeUpdated.isEmpty()) { 172 break; 173 } 174 List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId).collect( 175 Collectors.toList()); 176 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork( 177 buildWorkId(repositoryName, offset), blobProviderId); 178 work.setDocuments(repositoryName, docIds); 179 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 180 offset += maxResult; 181 } while (nextDocumentsToBeUpdated.size() == maxResult); 182 183 } 184 } 185 } 186 187 private String buildWorkId(String repositoryName, long offset) { 188 return blobProviderId + ':' + repositoryName + ':' + offset; 189 } 190 191 /** 192 * Should be overriden by subclasses wanting to rely on a different fields. 193 */ 194 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 195 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 196 } 197 198 @Override 199 @SuppressWarnings("unchecked") 200 public O getOAuth2Provider() { 201 return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 202 } 203 204 @Override 205 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 206 LiveConnectFile file = getFile(fileInfo); 207 return toBlob(file); 208 } 209 210 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 211 BlobInfo blobInfo = new BlobInfo(); 212 blobInfo.key = buildBlobKey(file.getInfo()); 213 blobInfo.mimeType = file.getMimeType(); 214 blobInfo.encoding = file.getEncoding(); 215 blobInfo.filename = file.getFilename().replace('/', '-'); 216 blobInfo.length = Long.valueOf(file.getFileSize()); 217 blobInfo.digest = file.getDigest(); 218 return new SimpleManagedBlob(blobInfo); 219 } 220 221 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 222 StringBuilder key = new StringBuilder(blobProviderId); 223 key.append(BLOB_KEY_SEPARATOR); 224 key.append(fileInfo.getUser()); 225 key.append(BLOB_KEY_SEPARATOR); 226 key.append(fileInfo.getFileId()); 227 if (fileInfo.getRevisionId().isPresent()) { 228 key.append(BLOB_KEY_SEPARATOR); 229 key.append(fileInfo.getRevisionId().get()); 230 } 231 return key.toString(); 232 } 233 234 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 235 return toFileInfo(blob.getKey()); 236 } 237 238 /** 239 * @since 8.4 240 */ 241 protected LiveConnectFileInfo toFileInfo(String key) { 242 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 243 // According to buildBlobKey we have : 244 // 0 - blobProviderId 245 // 1 - userId 246 // 2 - fileId 247 // 3 - revisionId (optional) 248 if (keyParts.size() < 3 || keyParts.size() > 4) { 249 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 250 } 251 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 252 } 253 254 /** 255 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 256 * 257 * @param fileInfo the file info 258 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 259 */ 260 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 261 LiveConnectFile file = getFileFromCache(fileInfo); 262 if (file == null) { 263 file = retrieveFile(fileInfo); 264 putFileInCache(file); 265 } 266 return file; 267 } 268 269 private Cache getCache() { 270 if (cache == null) { 271 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 272 } 273 return cache; 274 } 275 276 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 277 return getCredential(fileInfo.getUser()); 278 } 279 280 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 281 return getCredential(token.getServiceLogin()); 282 } 283 284 public final synchronized Credential getCredential(String user) throws IOException { 285 // suspend current transaction and start a new one responsible for access token update 286 try { 287 return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user)); 288 } catch (UncheckedIOException uioe) { 289 // re-throw IOException because methods implementing interface throwing IOException use this method 290 throw uioe.getCause(); 291 } 292 } 293 294 /** 295 * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent 296 * access on it, don't let anyone calling it. 297 */ 298 private Credential retrieveAndRefreshCredential(String user) { 299 try { 300 Credential credential = getCredentialFactory().build(user); 301 if (credential == null) { 302 throw new NuxeoException( 303 "No credentials found for user " + user + " and service " + blobProviderId); 304 } 305 Long expiresInSeconds = credential.getExpiresInSeconds(); 306 if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) { 307 credential.refreshToken(); 308 } 309 return credential; 310 } catch (IOException e) { 311 throw new UncheckedIOException(e); 312 } 313 } 314 315 /** 316 * Should be overriden by subclasses needing another credential factory. 317 */ 318 protected CredentialFactory getCredentialFactory() { 319 return new OAuth2CredentialFactory(getOAuth2Provider()); 320 } 321 322 @SuppressWarnings("unchecked") 323 protected final <T extends Serializable> T getFromCache(String key) { 324 return (T) getCache().get(key); 325 } 326 327 protected final <T extends Serializable> void putInCache(String key, T object) { 328 getCache().put(key, object); 329 } 330 331 protected final void invalidateInCache(LiveConnectFileInfo fileInfo) { 332 getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 333 } 334 335 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 336 return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 337 } 338 339 protected final void putFileInCache(LiveConnectFile file) { 340 putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file); 341 } 342 343 /** 344 * Parse a {@link URI}. 345 * 346 * @return the {@link URI} or null if it fails 347 */ 348 protected URI asURI(String link) { 349 try { 350 return new URI(link); 351 } catch (URISyntaxException e) { 352 log.error("Invalid URI: " + link, e); 353 return null; 354 } 355 } 356 357 protected abstract String getCacheName(); 358 359 protected abstract String getPageProviderNameForUpdate(); 360 361 /** 362 * Retrieves the file with API. 363 * 364 * @param fileInfo the file info 365 * @return the file retrieved from API 366 */ 367 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 368 369}