001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.io.UncheckedIOException; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.stream.Collectors; 032 033import org.apache.commons.lang3.StringUtils; 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 037import org.nuxeo.ecm.core.api.Blob; 038import org.nuxeo.ecm.core.api.CloseableCoreSession; 039import org.nuxeo.ecm.core.api.CoreInstance; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.api.repository.RepositoryManager; 044import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 045import org.nuxeo.ecm.core.blob.BlobInfo; 046import org.nuxeo.ecm.core.blob.BlobProvider; 047import org.nuxeo.ecm.core.blob.DocumentBlobProvider; 048import org.nuxeo.ecm.core.blob.ManagedBlob; 049import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 050import org.nuxeo.ecm.core.cache.Cache; 051import org.nuxeo.ecm.core.cache.CacheService; 052import org.nuxeo.ecm.core.work.api.WorkManager; 053import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 054import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 055import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 056import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 057import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 058import org.nuxeo.ecm.platform.query.api.PageProvider; 059import org.nuxeo.ecm.platform.query.api.PageProviderService; 060import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 061import org.nuxeo.runtime.api.Framework; 062import org.nuxeo.runtime.transaction.TransactionHelper; 063 064import com.google.api.client.auth.oauth2.Credential; 065import com.google.common.base.Splitter; 066 067/** 068 * Basic implementation of {@link BlobProvider} for live connect. 069 * 070 * @param <O> The OAuth2 service provider type. 071 * @since 8.1 072 */ 073public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 074 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider, DocumentBlobProvider { 075 076 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 077 078 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 079 080 private static final char BLOB_KEY_SEPARATOR = ':'; 081 082 static { 083 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONLiveConnectBlobDecoder()); 084 } 085 086 /** Resource cache */ 087 private Cache cache; 088 089 /** 090 * Should be overriden by subclasses needing something different. 091 */ 092 @Override 093 public void close() { 094 } 095 096 /** 097 * Should be overriden by subclasses needing something different. 098 */ 099 @Override 100 public Blob readBlob(BlobInfo blobInfo) throws IOException { 101 return toBlob(toFileInfo(blobInfo.key)); 102 } 103 104 /** 105 * Should be overriden by subclasses needing something different. 106 */ 107 @Override 108 public String writeBlob(Blob blob) throws IOException { 109 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 110 } 111 112 @Override 113 public boolean performsExternalAccessControl(BlobInfo blobInfo) { 114 return true; 115 } 116 117 /** 118 * Should be overriden by subclasses needing something different. 119 */ 120 @Override 121 public boolean isVersion(ManagedBlob blob) { 122 return toFileInfo(blob).getRevisionId().isPresent(); 123 } 124 125 @Override 126 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 127 List<DocumentModel> changedDocuments = new ArrayList<>(); 128 for (DocumentModel doc : docs) { 129 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 130 if (blob == null || isVersion(blob)) { 131 continue; 132 } 133 LiveConnectFileInfo fileInfo = toFileInfo(blob); 134 try { 135 LiveConnectFile file = retrieveFile(fileInfo); 136 putFileInCache(file); 137 if (hasChanged(blob, file)) { 138 if (log.isTraceEnabled()) { 139 log.trace("Updating blob=" + blob.key); 140 } 141 doc.setPropertyValue("content", toBlob(file)); 142 changedDocuments.add(doc); 143 } 144 } catch (IOException e) { 145 log.error("Could not update document=" + fileInfo, e); 146 } 147 148 } 149 return changedDocuments; 150 } 151 152 @Override 153 public void processDocumentsUpdate() { 154 final RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 155 final WorkManager workManager = Framework.getService(WorkManager.class); 156 for (String repositoryName : repositoryManager.getRepositoryNames()) { 157 try (CloseableCoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 158 159 long offset = 0; 160 List<DocumentModel> nextDocumentsToBeUpdated; 161 PageProviderService ppService = Framework.getService(PageProviderService.class); 162 Map<String, Serializable> props = new HashMap<>(); 163 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 164 @SuppressWarnings("unchecked") 165 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 166 getPageProviderNameForUpdate(), null, null, null, props); 167 final long maxResult = pp.getPageSize(); 168 do { 169 pp.setCurrentPageOffset(offset); 170 pp.refresh(); 171 nextDocumentsToBeUpdated = pp.getCurrentPage(); 172 173 if (nextDocumentsToBeUpdated.isEmpty()) { 174 break; 175 } 176 List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId).collect( 177 Collectors.toList()); 178 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork( 179 buildWorkId(repositoryName, offset), blobProviderId); 180 work.setDocuments(repositoryName, docIds); 181 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 182 offset += maxResult; 183 } while (nextDocumentsToBeUpdated.size() == maxResult); 184 185 } 186 } 187 } 188 189 private String buildWorkId(String repositoryName, long offset) { 190 return blobProviderId + ':' + repositoryName + ':' + offset; 191 } 192 193 /** 194 * Should be overriden by subclasses wanting to rely on a different fields. 195 */ 196 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 197 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 198 } 199 200 @Override 201 @SuppressWarnings("unchecked") 202 public O getOAuth2Provider() { 203 return (O) Framework.getService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 204 } 205 206 @Override 207 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 208 LiveConnectFile file; 209 try { 210 file = getFile(fileInfo); 211 } catch (IOException e) { 212 // we don't want to crash everything if the remote file cannot be accessed 213 log.error("Failed to access file: " + fileInfo, e); 214 file = new ErrorLiveConnectFile(fileInfo); 215 } 216 return toBlob(file); 217 } 218 219 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 220 BlobInfo blobInfo = new BlobInfo(); 221 blobInfo.key = buildBlobKey(file.getInfo()); 222 blobInfo.mimeType = file.getMimeType(); 223 blobInfo.encoding = file.getEncoding(); 224 blobInfo.filename = file.getFilename().replace('/', '-'); 225 blobInfo.length = Long.valueOf(file.getFileSize()); 226 blobInfo.digest = file.getDigest(); 227 return new SimpleManagedBlob(blobInfo); 228 } 229 230 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 231 StringBuilder key = new StringBuilder(blobProviderId); 232 key.append(BLOB_KEY_SEPARATOR); 233 key.append(fileInfo.getUser()); 234 key.append(BLOB_KEY_SEPARATOR); 235 key.append(fileInfo.getFileId()); 236 Optional<String> revisionId = fileInfo.getRevisionId(); 237 if (revisionId.isPresent()) { 238 key.append(BLOB_KEY_SEPARATOR); 239 key.append(revisionId.get()); 240 } 241 return key.toString(); 242 } 243 244 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 245 return toFileInfo(blob.getKey()); 246 } 247 248 /** 249 * @since 8.4 250 */ 251 protected LiveConnectFileInfo toFileInfo(String key) { 252 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 253 // According to buildBlobKey we have : 254 // 0 - blobProviderId 255 // 1 - userId 256 // 2 - fileId 257 // 3 - revisionId (optional) 258 if (keyParts.size() < 3 || keyParts.size() > 4) { 259 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 260 } 261 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 262 } 263 264 /** 265 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 266 * 267 * @param fileInfo the file info 268 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 269 */ 270 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 271 LiveConnectFile file = getFileFromCache(fileInfo); 272 if (file == null) { 273 file = retrieveFile(fileInfo); 274 putFileInCache(file); 275 } 276 return file; 277 } 278 279 private Cache getCache() { 280 if (cache == null) { 281 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 282 } 283 return cache; 284 } 285 286 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 287 return getCredential(fileInfo.getUser()); 288 } 289 290 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 291 return getCredential(token.getServiceLogin()); 292 } 293 294 public final synchronized Credential getCredential(String user) throws IOException { 295 // suspend current transaction and start a new one responsible for access token update 296 try { 297 return TransactionHelper.runInNewTransaction(() -> retrieveAndRefreshCredential(user)); 298 } catch (UncheckedIOException uioe) { 299 // re-throw IOException because methods implementing interface throwing IOException use this method 300 throw uioe.getCause(); 301 } 302 } 303 304 /** 305 * Declare this method as {@code private} because we don't want upper class to override it and as there's concurrent 306 * access on it, don't let anyone calling it. 307 */ 308 private Credential retrieveAndRefreshCredential(String user) { 309 try { 310 Credential credential = getCredentialFactory().build(user); 311 if (credential == null) { 312 String message = "No credentials found for user " + user + " and service " + blobProviderId; 313 // TODO NXP-23860 replace this log.error with a dedicated admin service to record missing credentials 314 log.error(message); 315 throw new IOException(message); 316 } 317 Long expiresInSeconds = credential.getExpiresInSeconds(); 318 if (expiresInSeconds != null && expiresInSeconds.longValue() <= 0) { 319 credential.refreshToken(); 320 } 321 return credential; 322 } catch (IOException e) { 323 throw new UncheckedIOException(e); 324 } 325 } 326 327 /** 328 * Should be overriden by subclasses needing another credential factory. 329 */ 330 protected CredentialFactory getCredentialFactory() { 331 return new OAuth2CredentialFactory(getOAuth2Provider()); 332 } 333 334 @SuppressWarnings("unchecked") 335 protected final <T extends Serializable> T getFromCache(String key) { 336 return (T) getCache().get(key); 337 } 338 339 protected final <T extends Serializable> void putInCache(String key, T object) { 340 getCache().put(key, object); 341 } 342 343 protected final void invalidateInCache(LiveConnectFileInfo fileInfo) { 344 getCache().invalidate(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 345 } 346 347 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 348 return getFromCache(FILE_CACHE_PREFIX + buildBlobKey(fileInfo)); 349 } 350 351 protected final void putFileInCache(LiveConnectFile file) { 352 putInCache(FILE_CACHE_PREFIX + buildBlobKey(file.getInfo()), file); 353 } 354 355 /** 356 * Parse a {@link URI}. 357 * 358 * @return the {@link URI} or null if it fails 359 */ 360 protected URI asURI(String link) { 361 try { 362 return new URI(link); 363 } catch (URISyntaxException e) { 364 log.error("Invalid URI: " + link, e); 365 return null; 366 } 367 } 368 369 protected abstract String getCacheName(); 370 371 protected abstract String getPageProviderNameForUpdate(); 372 373 /** 374 * Retrieves the file with API. 375 * 376 * @param fileInfo the file info 377 * @return the file retrieved from API 378 */ 379 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 380 381}