001/* 002 * (C) Copyright 2015-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.liveconnect.core; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.stream.Collectors; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.Blob; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.repository.RepositoryManager; 039import org.nuxeo.ecm.core.blob.AbstractBlobProvider; 040import org.nuxeo.ecm.core.blob.BlobManager.BlobInfo; 041import org.nuxeo.ecm.core.blob.BlobProvider; 042import org.nuxeo.ecm.core.blob.ManagedBlob; 043import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 044import org.nuxeo.ecm.core.cache.Cache; 045import org.nuxeo.ecm.core.cache.CacheService; 046import org.nuxeo.ecm.core.model.Document; 047import org.nuxeo.ecm.core.work.api.WorkManager; 048import org.nuxeo.ecm.liveconnect.update.BatchUpdateBlobProvider; 049import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 050import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProvider; 051import org.nuxeo.ecm.platform.oauth2.providers.OAuth2ServiceProviderRegistry; 052import org.nuxeo.ecm.platform.oauth2.tokens.NuxeoOAuth2Token; 053import org.nuxeo.ecm.platform.query.api.PageProvider; 054import org.nuxeo.ecm.platform.query.api.PageProviderService; 055import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 056import org.nuxeo.runtime.api.Framework; 057 058import com.google.api.client.auth.oauth2.Credential; 059import com.google.common.base.Splitter; 060 061/** 062 * Basic implementation of {@link BlobProvider} for live connect. 063 * 064 * @param <O> The OAuth2 service provider type. 065 * @since 8.1 066 */ 067public abstract class AbstractLiveConnectBlobProvider<O extends OAuth2ServiceProvider> extends AbstractBlobProvider 068 implements LiveConnectBlobProvider<O>, BatchUpdateBlobProvider { 069 070 private static final Log log = LogFactory.getLog(AbstractLiveConnectBlobProvider.class); 071 072 private static final String FILE_CACHE_PREFIX = "liveconnect_file_"; 073 074 private static final char BLOB_KEY_SEPARATOR = ':'; 075 076 /** Resource cache */ 077 private Cache cache; 078 079 /** 080 * Should be overriden by subclasses needing something different. 081 */ 082 @Override 083 public void close() { 084 } 085 086 /** 087 * Should be overriden by subclasses needing something different. 088 */ 089 @Override 090 public Blob readBlob(BlobInfo blobInfo) throws IOException { 091 return new SimpleManagedBlob(blobInfo); 092 } 093 094 /** 095 * Should be overriden by subclasses needing something different. 096 */ 097 @Override 098 public String writeBlob(Blob blob, Document doc) throws IOException { 099 throw new UnsupportedOperationException("Writing a blob to live connect service is not supported"); 100 } 101 102 /** 103 * Should be overriden by subclasses needing something different. 104 */ 105 @Override 106 public boolean isVersion(ManagedBlob blob) { 107 return toFileInfo(blob).getRevisionId().isPresent(); 108 } 109 110 @Override 111 public List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> docs) { 112 List<DocumentModel> changedDocuments = new ArrayList<>(); 113 for (DocumentModel doc : docs) { 114 final SimpleManagedBlob blob = (SimpleManagedBlob) doc.getProperty("content").getValue(); 115 if (blob == null || isVersion(blob)) { 116 continue; 117 } 118 LiveConnectFileInfo fileInfo = toFileInfo(blob); 119 try { 120 LiveConnectFile file = retrieveFile(fileInfo); 121 putFileInCache(file); 122 if (hasChanged(blob, file)) { 123 if (log.isTraceEnabled()) { 124 log.trace("Updating blob=" + blob.key); 125 } 126 doc.setPropertyValue("content", toBlob(file)); 127 changedDocuments.add(doc); 128 } 129 } catch (IOException e) { 130 log.error("Could not update document=" + fileInfo, e); 131 } 132 133 } 134 return changedDocuments; 135 } 136 137 @Override 138 public void processDocumentsUpdate() { 139 final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class); 140 final WorkManager workManager = Framework.getLocalService(WorkManager.class); 141 for (String repositoryName : repositoryManager.getRepositoryNames()) { 142 try (CoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 143 144 long offset = 0; 145 List<DocumentModel> nextDocumentsToBeUpdated; 146 PageProviderService ppService = Framework.getService(PageProviderService.class); 147 Map<String, Serializable> props = new HashMap<>(); 148 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 149 @SuppressWarnings("unchecked") 150 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 151 getPageProviderNameForUpdate(), null, null, null, props); 152 final long maxResult = pp.getPageSize(); 153 do { 154 pp.setCurrentPageOffset(offset); 155 pp.refresh(); 156 nextDocumentsToBeUpdated = pp.getCurrentPage(); 157 158 if (nextDocumentsToBeUpdated.isEmpty()) { 159 break; 160 } 161 List<String> docIds = nextDocumentsToBeUpdated.stream().map(DocumentModel::getId) 162 .collect(Collectors.toList()); 163 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(buildWorkId( 164 repositoryName, offset), blobProviderId); 165 work.setDocuments(repositoryName, docIds); 166 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 167 offset += maxResult; 168 } while (nextDocumentsToBeUpdated.size() == maxResult); 169 170 } 171 } 172 } 173 174 private String buildWorkId(String repositoryName, long offset) { 175 return blobProviderId + ':' + repositoryName + ':' + offset; 176 } 177 178 /** 179 * Should be overriden by subclasses wanting to rely on a different fields. 180 */ 181 protected boolean hasChanged(SimpleManagedBlob blob, LiveConnectFile file) { 182 return StringUtils.isBlank(blob.getDigest()) || !blob.getDigest().equals(file.getDigest()); 183 } 184 185 @Override 186 @SuppressWarnings("unchecked") 187 public O getOAuth2Provider() { 188 return (O) Framework.getLocalService(OAuth2ServiceProviderRegistry.class).getProvider(blobProviderId); 189 } 190 191 @Override 192 public SimpleManagedBlob toBlob(LiveConnectFileInfo fileInfo) throws IOException { 193 LiveConnectFile file = getFile(fileInfo); 194 return toBlob(file); 195 } 196 197 protected SimpleManagedBlob toBlob(LiveConnectFile file) { 198 BlobInfo blobInfo = new BlobInfo(); 199 blobInfo.key = buildBlobKey(file.getInfo()); 200 blobInfo.mimeType = file.getMimeType(); 201 blobInfo.encoding = file.getEncoding(); 202 blobInfo.filename = file.getFilename().replace('/', '-'); 203 blobInfo.length = file.getFileSize(); 204 blobInfo.digest = file.getDigest(); 205 return new SimpleManagedBlob(blobInfo); 206 } 207 208 protected String buildBlobKey(LiveConnectFileInfo fileInfo) { 209 StringBuilder key = new StringBuilder(blobProviderId); 210 key.append(BLOB_KEY_SEPARATOR); 211 key.append(fileInfo.getUser()); 212 key.append(BLOB_KEY_SEPARATOR); 213 key.append(fileInfo.getFileId()); 214 if (fileInfo.getRevisionId().isPresent()) { 215 key.append(BLOB_KEY_SEPARATOR); 216 key.append(fileInfo.getRevisionId().get()); 217 } 218 return key.toString(); 219 } 220 221 protected LiveConnectFileInfo toFileInfo(ManagedBlob blob) { 222 String key = blob.getKey(); 223 List<String> keyParts = Splitter.on(BLOB_KEY_SEPARATOR).splitToList(key); 224 // According to buildBlobKey we have : 225 // 0 - blobProviderId 226 // 1 - userId 227 // 2 - fileId 228 // 3 - revisionId (optional) 229 if (keyParts.size() < 3 || keyParts.size() > 4) { 230 throw new IllegalArgumentException("The key doesn't have a valid format=" + key); 231 } 232 return new LiveConnectFileInfo(keyParts.get(1), keyParts.get(2), keyParts.size() == 4 ? keyParts.get(3) : null); 233 } 234 235 /** 236 * Returns the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it. 237 * 238 * @param fileInfo the file info 239 * @return the {@link LiveConnectFile} from cache, if it doesn't exist retrieves it with API and cache it 240 */ 241 protected LiveConnectFile getFile(LiveConnectFileInfo fileInfo) throws IOException { 242 LiveConnectFile file = getFileFromCache(fileInfo); 243 if (file == null) { 244 file = retrieveFile(fileInfo); 245 putFileInCache(file); 246 } 247 return file; 248 } 249 250 private Cache getCache() { 251 if (cache == null) { 252 cache = Framework.getService(CacheService.class).getCache(getCacheName()); 253 } 254 return cache; 255 } 256 257 protected Credential getCredential(LiveConnectFileInfo fileInfo) throws IOException { 258 return getCredential(fileInfo.getUser()); 259 } 260 261 protected Credential getCredential(NuxeoOAuth2Token token) throws IOException { 262 return getCredential(token.getServiceLogin()); 263 } 264 265 public final Credential getCredential(String user) throws IOException { 266 Credential credential = getCredentialFactory().build(user); 267 Long expiresInSeconds = credential.getExpiresInSeconds(); 268 if (expiresInSeconds != null && expiresInSeconds <= 0) { 269 credential.refreshToken(); 270 } 271 return credential; 272 } 273 274 /** 275 * Should be overriden by subclasses needing another credential factory. 276 */ 277 protected CredentialFactory getCredentialFactory() { 278 return new OAuth2CredentialFactory(getOAuth2Provider()); 279 } 280 281 @SuppressWarnings("unchecked") 282 protected final <T extends Serializable> T getFromCache(String key) { 283 return (T) getCache().get(key); 284 } 285 286 protected final <T extends Serializable> void putInCache(String key, T object) { 287 getCache().put(key, object); 288 } 289 290 protected final void invalidateInCache(String key) { 291 getCache().invalidate(key); 292 } 293 294 protected final LiveConnectFile getFileFromCache(LiveConnectFileInfo fileInfo) { 295 return getFromCache(FILE_CACHE_PREFIX + fileInfo.getFileId()); 296 } 297 298 protected final void putFileInCache(LiveConnectFile file) { 299 putInCache(FILE_CACHE_PREFIX + file.getInfo().getFileId(), file); 300 } 301 302 /** 303 * Parse a {@link URI}. 304 * 305 * @return the {@link URI} or null if it fails 306 */ 307 protected URI asURI(String link) { 308 try { 309 return new URI(link); 310 } catch (URISyntaxException e) { 311 log.error("Invalid URI: " + link, e); 312 return null; 313 } 314 } 315 316 protected abstract String getCacheName(); 317 318 protected abstract String getPageProviderNameForUpdate(); 319 320 /** 321 * Retrieves the file with API. 322 * 323 * @param fileInfo the file info 324 * @return the file retrieved from API 325 */ 326 protected abstract LiveConnectFile retrieveFile(LiveConnectFileInfo fileInfo) throws IOException; 327 328}