001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.io.UncheckedIOException; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.stream.Collectors; 032 033import org.apache.commons.lang3.StringUtils; 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.repository.RepositoryManager; 042import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 043import org.nuxeo.ecm.core.blob.BlobInfo; 044import org.nuxeo.ecm.core.blob.BlobProvider; 045import org.nuxeo.ecm.core.blob.DocumentBlobProvider; 046import org.nuxeo.ecm.core.blob.ManagedBlob; 047import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 048import org.nuxeo.ecm.core.cache.Cache; 049import org.nuxeo.ecm.core.cache.CacheService; 050import org.nuxeo.ecm.core.work.api.WorkManager; 051import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 052import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 054import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 055import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 056import org.nuxeo.ecm.platform.query.api.PageProvider; 057import org.nuxeo.ecm.platform.query.api.PageProviderService; 058import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.transaction.TransactionHelper; 061 062import com.google.api.client.auth.oauth2.Credential; 063import com.google.common.base.Splitter; 064 065/** 066 * Basic implementation of {@link BlobProvider} for live connect. 067 * 068 * @param <O> The OAuth2 service provider type. 069 * @since 8.1 070 */ 071public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 072 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider { 073 074 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 075 076 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 077 078 private static final char BLOB_KEY_SEPARATOR = ':'; 079 080 static { 081 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder()); 082 } 083 084 /** Resource cache */ 085 private Cache cache; 086 087 /** 088 * Should be overriden by subclasses needing something different. 089 */ 090 @Override 091 public void close() { 092 } 093 094 /** 095 * Should be overriden by subclasses needing something different. 096 */ 097 @Override 098 public Blob readBlob(BlobInfo blobInfo) throws IOException { 099 return toBlob(toFileInfo(blobInfo.key)); 100 } 101 102 /** 103 * Should be overriden by subclasses needing something different. 104 */ 105 @Override 106 public String writeBlob(Blob blob) throws IOException { 107 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 108 } 109 110 @Override 111 public boolean performsExternalAccessControl(BlobInfo blobInfo) { 112 return true; 113 } 114 115 @Override 116 public boolean supportsSync() { 117 return supportsUserUpdate(); 118 } 119 120 /** 121 * Should be overriden by subclasses needing something different. 122 */ 123 @Override 124 public boolean isVersion(ManagedBlob blob) { 125 return toFileInfo(blob).getRevisionId().isPresent(); 126 } 127 128 @Override 129 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 130 List<DocumentModel> changedDocuments = new ArrayList<>(); 131 for (DocumentModel doc : docs) { 132 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 133 if (blob == null || isVersion(blob)) { 134 continue; 135 } 136 LiveConnectFileInfo fileInfo = toFileInfo(blob); 137 try { 138 LiveConnectFile file = retrieveFile(fileInfo); 139 putFileInCache(file); 140 if (hasChanged(blob, file)) { 141 if (log.isTraceEnabled()) { 142 log.trace("Updating blob=" + blob.key); 143 } 144 doc.setPropertyValue("content", toBlob(file)); 145 changedDocuments.add(doc); 146 } 147 } catch (IOException e) { 148 log.error("Could not update document=" + fileInfo, e); 149 } 150 151 } 152 return changedDocuments; 153 } 154 155 @Override 156 public void processDocumentsUpdate() { 157 final RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 158 final WorkManager workManager = Framework.getService(WorkManager.class); 159 for (String repositoryName : repositoryManager.getRepositoryNames()) { 160 CoreSession session = CoreInstance.getCoreSessionSystem(repositoryName); 161 long offset = 0; 162 List<DocumentModel> nextDocumentsToBeUpdated; 163 PageProviderService ppService = Framework.getService(PageProviderService.class); 164 Map<String, Serializable> props = new HashMap<>(); 165 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 166 @SuppressWarnings("unchecked") 167 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 168 getPageProviderNameForUpdate(), null, null, null, props); 169 final long maxResult = pp.getPageSize(); 170 do { 171 pp.setCurrentPageOffset(offset); 172 pp.refresh(); 173 nextDocumentsToBeUpdated = pp.getCurrentPage(); 174 175 if (nextDocumentsToBeUpdated.isEmpty()) { 176 break; 177 } 178 List<String> docIds = nextDocumentsToBeUpdated.stream() 179 .map(DocumentModel::getId) 180 .collect(Collectors.toList()); 181 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork( 182 buildWorkId(repositoryName, offset), blobProviderId); 183 work.setDocuments(repositoryName, docIds); 184 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 185 offset += maxResult; 186 } while (nextDocumentsToBeUpdated.size() == maxResult); 187 } 188 } 189 190 private String buildWorkId(String repositoryName, long offset) { 191 return blobProviderId + ':' + repositoryName + ':' + offset; 192 } 193 194 /** 195 * Should be overriden by subclasses wanting to rely on a different fields. 196 */ 197 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 198 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 199 } 200 201 @Override 202 @SuppressWarnings("unchecked") 203 public O getOAuth2Provider() { 204 return (O) Framework.getService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 205 } 206 207 @Override 208 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 209 LiveConnectFile file; 210 try { 211 file = getFile(fileInfo); 212 } catch (IOException e) { 213 // we don't want to crash everything if the remote file cannot be accessed 214 log.error("Failed to access file: " + fileInfo, e); 215 file = new ErrorLiveConnectFile(fileInfo); 216 } 217 return toBlob(file); 218 } 219 220 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 221 BlobInfo blobInfo = new BlobInfo(); 222 blobInfo.key = buildBlobKey(file.getInfo()); 223 blobInfo.mimeType = file.getMimeType(); 224 blobInfo.encoding = file.getEncoding(); 225 blobInfo.filename = file.getFilename().replace('/', '-'); 226 blobInfo.length = Long.valueOf(file.getFileSize()); 227 blobInfo.digest = file.getDigest(); 228 return new SimpleManagedBlob(blobInfo); 229 } 230 231 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 232 StringBuilder key = new StringBuilder(blobProviderId); 233 key.append(BLOB_KEY_SEPARATOR); 234 key.append(fileInfo.getUser()); 235 key.append(BLOB_KEY_SEPARATOR); 236 key.append(fileInfo.getFileId()); 237 Optional<String> revisionId = fileInfo.getRevisionId(); 238 if (revisionId.isPresent()) { 239 key.append(BLOB_KEY_SEPARATOR); 240 key.append(revisionId.get()); 241 } 242 return key.toString(); 243 } 244 245 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 246 return toFileInfo(blob.getKey()); 247 } 248 249 /** 250 * @since 8.4 251 */ 252 protected LiveConnectFileInfo toFileInfo(String key) { 253 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 254 // According to buildBlobKey we have : 255 // 0 - blobProviderId 256 // 1 - userId 257 // 2 - fileId 258 // 3 - revisionId (optional) 259 if (keyParts.size() < 3 || keyParts.size() > 4) { 260 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 261 } 262 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 263 } 264 265 /** 266 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 267 * 268 * @param fileInfo the file info 269 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 270 */ 271 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 272 LiveConnectFile file = getFileFromCache(fileInfo); 273 if (file == null) { 274 file = retrieveFile(fileInfo); 275 putFileInCache(file); 276 } 277 return file; 278 } 279 280 private Cache getCache() { 281 if (cache == null) { 282 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 283 } 284 return cache; 285 } 286 287 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 288 return getCredential(fileInfo.getUser()); 289 } 290 291 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 292 return getCredential(token.getServiceLogin()); 293 } 294 295 public final synchronized Credential getCredential(String user) throws IOException { 296 // suspend current transaction and start a new one responsible for access token update 297 try { 298 return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user)); 299 } catch (UncheckedIOException uioe) { 300 // re-throw IOException because methods implementing interface throwing IOException use this method 301 throw uioe.getCause(); 302 } 303 } 304 305 /** 306 * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent 307 * access on it, don't let anyone calling it. 308 */ 309 private Credential retrieveAndRefreshCredential(String user) { 310 try { 311 Credential credential = getCredentialFactory().build(user); 312 if (credential == null) { 313 String message = "No credentials found for user " + user + " and service " + blobProviderId; 314 // TODO NXP-23860 replace this log.error with a dedicated admin service to record missing credentials 315 log.error(message); 316 throw new IOException(message); 317 } 318 Long expiresInSeconds = credential.getExpiresInSeconds(); 319 if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) { 320 credential.refreshToken(); 321 } 322 return credential; 323 } catch (IOException e) { 324 throw new UncheckedIOException(e); 325 } 326 } 327 328 /** 329 * Should be overriden by subclasses needing another credential factory. 330 */ 331 protected CredentialFactory getCredentialFactory() { 332 return new OAuth2CredentialFactory(getOAuth2Provider()); 333 } 334 335 @SuppressWarnings("unchecked") 336 protected final <T extends Serializable> T getFromCache(String key) { 337 return (T) getCache().get(key); 338 } 339 340 protected final <T extends Serializable> void putInCache(String key, T object) { 341 getCache().put(key, object); 342 } 343 344 protected final void invalidateInCache(LiveConnectFileInfo fileInfo) { 345 getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 346 } 347 348 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 349 return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 350 } 351 352 protected final void putFileInCache(LiveConnectFile file) { 353 putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file); 354 } 355 356 /** 357 * Parse a {@link URI}. 358 * 359 * @return the {@link URI} or null if it fails 360 */ 361 protected URI asURI(String link) { 362 try { 363 return new URI(link); 364 } catch (URISyntaxException e) { 365 log.error("Invalid URI: " + link, e); 366 return null; 367 } 368 } 369 370 protected abstract String getCacheName(); 371 372 protected abstract String getPageProviderNameForUpdate(); 373 374 /** 375 * Retrieves the file with API. 376 * 377 * @param fileInfo the file info 378 * @return the file retrieved from API 379 */ 380 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 381 382}