001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.io.UncheckedIOException; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.stream.Collectors; 031 032import org.apache.commons.lang3.StringUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 036import org.nuxeo.ecm.core.api.Blob; 037import org.nuxeo.ecm.core.api.CoreInstance; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.DocumentModel; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.api.repository.RepositoryManager; 042import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 043import org.nuxeo.ecm.core.blob.BlobInfo; 044import org.nuxeo.ecm.core.blob.BlobProvider; 045import org.nuxeo.ecm.core.blob.DocumentBlobProvider; 046import org.nuxeo.ecm.core.blob.ManagedBlob; 047import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 048import org.nuxeo.ecm.core.cache.Cache; 049import org.nuxeo.ecm.core.cache.CacheService; 050import org.nuxeo.ecm.core.work.api.WorkManager; 051import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 052import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 053import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 054import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 055import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 056import org.nuxeo.ecm.platform.query.api.PageProvider; 057import org.nuxeo.ecm.platform.query.api.PageProviderService; 058import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.transaction.TransactionHelper; 061 062import com.google.api.client.auth.oauth2.Credential; 063import com.google.common.base.Splitter; 064 065/** 066 * Basic implementation of {@link BlobProvider} for live connect. 067 * 068 * @param <O> The OAuth2 service provider type. 069 * @since 8.1 070 */ 071public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 072 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider { 073 074 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 075 076 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 077 078 private static final char BLOB_KEY_SEPARATOR = ':'; 079 080 static { 081 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder()); 082 } 083 084 /** Resource cache */ 085 private Cache cache; 086 087 /** 088 * Should be overriden by subclasses needing something different. 089 */ 090 @Override 091 public void close() { 092 } 093 094 /** 095 * Should be overriden by subclasses needing something different. 096 */ 097 @Override 098 public Blob readBlob(BlobInfo blobInfo) throws IOException { 099 return toBlob(toFileInfo(blobInfo.key)); 100 } 101 102 /** 103 * Should be overriden by subclasses needing something different. 104 */ 105 @Override 106 public String writeBlob(Blob blob) throws IOException { 107 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 108 } 109 110 @Override 111 public boolean performsExternalAccessControl(BlobInfo blobInfo) { 112 return true; 113 } 114 115 /** 116 * Should be overriden by subclasses needing something different. 117 */ 118 @Override 119 public boolean isVersion(ManagedBlob blob) { 120 return toFileInfo(blob).getRevisionId().isPresent(); 121 } 122 123 @Override 124 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 125 List<DocumentModel> changedDocuments = new ArrayList<>(); 126 for (DocumentModel doc : docs) { 127 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 128 if (blob == null || isVersion(blob)) { 129 continue; 130 } 131 LiveConnectFileInfo fileInfo = toFileInfo(blob); 132 try { 133 LiveConnectFile file = retrieveFile(fileInfo); 134 putFileInCache(file); 135 if (hasChanged(blob, file)) { 136 if (log.isTraceEnabled()) { 137 log.trace("Updating blob=" + blob.key); 138 } 139 doc.setPropertyValue("content", toBlob(file)); 140 changedDocuments.add(doc); 141 } 142 } catch (IOException e) { 143 log.error("Could not update document=" + fileInfo, e); 144 } 145 146 } 147 return changedDocuments; 148 } 149 150 @Override 151 public void processDocumentsUpdate() { 152 final RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 153 final WorkManager workManager = Framework.getService(WorkManager.class); 154 for (String repositoryName : repositoryManager.getRepositoryNames()) { 155 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 156 157 long offset = 0; 158 List<DocumentModel> nextDocumentsToBeUpdated; 159 PageProviderService ppService = Framework.getService(PageProviderService.class); 160 Map<String, Serializable> props = new HashMap<>(); 161 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 162 @SuppressWarnings("unchecked") 163 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 164 getPageProviderNameForUpdate(), null, null, null, props); 165 final long maxResult = pp.getPageSize(); 166 do { 167 pp.setCurrentPageOffset(offset); 168 pp.refresh(); 169 nextDocumentsToBeUpdated = pp.getCurrentPage(); 170 171 if (nextDocumentsToBeUpdated.isEmpty()) { 172 break; 173 } 174 List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId).collect( 175 Collectors.toList()); 176 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork( 177 buildWorkId(repositoryName, offset), blobProviderId); 178 work.setDocuments(repositoryName, docIds); 179 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 180 offset += maxResult; 181 } while (nextDocumentsToBeUpdated.size() == maxResult); 182 183 } 184 } 185 } 186 187 private String buildWorkId(String repositoryName, long offset) { 188 return blobProviderId + ':' + repositoryName + ':' + offset; 189 } 190 191 /** 192 * Should be overriden by subclasses wanting to rely on a different fields. 193 */ 194 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 195 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 196 } 197 198 @Override 199 @SuppressWarnings("unchecked") 200 public O getOAuth2Provider() { 201 return (O) Framework.getService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 202 } 203 204 @Override 205 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 206 LiveConnectFile file; 207 try { 208 file = getFile(fileInfo); 209 } catch (IOException e) { 210 // we don't want to crash everything if the remote file cannot be accessed 211 log.error("Failed to access file: " + fileInfo, e); 212 file = new ErrorLiveConnectFile(fileInfo); 213 } 214 return toBlob(file); 215 } 216 217 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 218 BlobInfo blobInfo = new BlobInfo(); 219 blobInfo.key = buildBlobKey(file.getInfo()); 220 blobInfo.mimeType = file.getMimeType(); 221 blobInfo.encoding = file.getEncoding(); 222 blobInfo.filename = file.getFilename().replace('/', '-'); 223 blobInfo.length = Long.valueOf(file.getFileSize()); 224 blobInfo.digest = file.getDigest(); 225 return new SimpleManagedBlob(blobInfo); 226 } 227 228 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 229 StringBuilder key = new StringBuilder(blobProviderId); 230 key.append(BLOB_KEY_SEPARATOR); 231 key.append(fileInfo.getUser()); 232 key.append(BLOB_KEY_SEPARATOR); 233 key.append(fileInfo.getFileId()); 234 if (fileInfo.getRevisionId().isPresent()) { 235 key.append(BLOB_KEY_SEPARATOR); 236 key.append(fileInfo.getRevisionId().get()); 237 } 238 return key.toString(); 239 } 240 241 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 242 return toFileInfo(blob.getKey()); 243 } 244 245 /** 246 * @since 8.4 247 */ 248 protected LiveConnectFileInfo toFileInfo(String key) { 249 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 250 // According to buildBlobKey we have : 251 // 0 - blobProviderId 252 // 1 - userId 253 // 2 - fileId 254 // 3 - revisionId (optional) 255 if (keyParts.size() < 3 || keyParts.size() > 4) { 256 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 257 } 258 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 259 } 260 261 /** 262 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 263 * 264 * @param fileInfo the file info 265 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 266 */ 267 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 268 LiveConnectFile file = getFileFromCache(fileInfo); 269 if (file == null) { 270 file = retrieveFile(fileInfo); 271 putFileInCache(file); 272 } 273 return file; 274 } 275 276 private Cache getCache() { 277 if (cache == null) { 278 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 279 } 280 return cache; 281 } 282 283 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 284 return getCredential(fileInfo.getUser()); 285 } 286 287 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 288 return getCredential(token.getServiceLogin()); 289 } 290 291 public final synchronized Credential getCredential(String user) throws IOException { 292 // suspend current transaction and start a new one responsible for access token update 293 try { 294 return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user)); 295 } catch (UncheckedIOException uioe) { 296 // re-throw IOException because methods implementing interface throwing IOException use this method 297 throw uioe.getCause(); 298 } 299 } 300 301 /** 302 * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent 303 * access on it, don't let anyone calling it. 304 */ 305 private Credential retrieveAndRefreshCredential(String user) { 306 try { 307 Credential credential = getCredentialFactory().build(user); 308 if (credential == null) { 309 String message = "No credentials found for user " + user + " and service " + blobProviderId; 310 // TODO NXP-23860 replace this log.error with a dedicated admin service to record missing credentials 311 log.error(message); 312 throw new IOException(message); 313 } 314 Long expiresInSeconds = credential.getExpiresInSeconds(); 315 if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) { 316 credential.refreshToken(); 317 } 318 return credential; 319 } catch (IOException e) { 320 throw new UncheckedIOException(e); 321 } 322 } 323 324 /** 325 * Should be overriden by subclasses needing another credential factory. 326 */ 327 protected CredentialFactory getCredentialFactory() { 328 return new OAuth2CredentialFactory(getOAuth2Provider()); 329 } 330 331 @SuppressWarnings("unchecked") 332 protected final <T extends Serializable> T getFromCache(String key) { 333 return (T) getCache().get(key); 334 } 335 336 protected final <T extends Serializable> void putInCache(String key, T object) { 337 getCache().put(key, object); 338 } 339 340 protected final void invalidateInCache(LiveConnectFileInfo fileInfo) { 341 getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 342 } 343 344 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 345 return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 346 } 347 348 protected final void putFileInCache(LiveConnectFile file) { 349 putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file); 350 } 351 352 /** 353 * Parse a {@link URI}. 354 * 355 * @return the {@link URI} or null if it fails 356 */ 357 protected URI asURI(String link) { 358 try { 359 return new URI(link); 360 } catch (URISyntaxException e) { 361 log.error("Invalid URI: " + link, e); 362 return null; 363 } 364 } 365 366 protected abstract String getCacheName(); 367 368 protected abstract String getPageProviderNameForUpdate(); 369 370 /** 371 * Retrieves the file with API. 372 * 373 * @param fileInfo the file info 374 * @return the file retrieved from API 375 */ 376 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 377 378}