001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * <a href="mailto:grenard@nuxeo.com">Guillaume Renard</a> 016 * 017 */ 018 019package org.nuxeo.ecm.liveconnect.update; 020 021import java.io.IOException; 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.nuxeo.ecm.core.api.CoreInstance; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.api.repository.RepositoryManager; 032import org.nuxeo.ecm.core.work.api.WorkManager; 033import org.nuxeo.ecm.liveconnect.update.listener.BlobProviderDocumentsUpdateListener; 034import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork; 035import org.nuxeo.ecm.platform.query.api.PageProvider; 036import org.nuxeo.ecm.platform.query.api.PageProviderService; 037import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider; 038import org.nuxeo.runtime.api.Framework; 039 040/** 041 * Interface to batch update documents provided by implementing provider. The method {@link #processDocumentsUpdate()} 042 * is called by {@link BlobProviderDocumentsUpdateListener}. 043 * <p> 044 * The implementation of {@link #processDocumentsUpdate()} must schedule a {@link BlobProviderDocumentsUpdateWork} with 045 * the document ids to be checked and updated if needed. 046 * <p> 047 * The @{link BlobProviderDocumentsUpdateWork} will then call the implementation of 048 * {@link #checkChangesAndUpdateBlob(List)}. 049 * <p> 050 * Note that it is recommended to schedule many workers dealing with a smaller amount of documents (using 051 * {@link #MAX_RESULT}) rather than a single one processing all document brought by the provider. 052 * 053 * @since 7.3 054 */ 055public interface BatchUpdateBlobProvider { 056 057 static final long MAX_RESULT = 50; 058 059 /** 060 * Check the given list of document for change and update if needed. Note that session.save still needs to be called 061 * on changed documents. 062 * 063 * @param documents to be checked for update 064 * @return the list of DocumentModel that have changed 065 * @throws IOException 066 */ 067 List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> doc); 068 069 String getPageProviderNameForUpdate(); 070 071 String getBlobProviderId(); 072 073 /** 074 * Trigger the documents update for the implementing providers. 075 */ 076 default void processDocumentsUpdate() { 077 final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class); 078 final WorkManager workManager = Framework.getLocalService(WorkManager.class); 079 for (String repositoryName : repositoryManager.getRepositoryNames()) { 080 CoreSession session = null; 081 try { 082 session = CoreInstance.openCoreSessionSystem(repositoryName); 083 084 long offset = 0; 085 List<DocumentModel> nextDocumentsToBeUpdated; 086 PageProviderService ppService = Framework.getService(PageProviderService.class); 087 Map<String, Serializable> props = new HashMap<String, Serializable>(); 088 props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session); 089 @SuppressWarnings("unchecked") 090 PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider( 091 getPageProviderNameForUpdate(), null, null, null, props); 092 final long maxResult = pp.getPageSize(); 093 do { 094 pp.setCurrentPageOffset(offset); 095 pp.refresh(); 096 nextDocumentsToBeUpdated = pp.getCurrentPage(); 097 098 if (nextDocumentsToBeUpdated.isEmpty()) { 099 break; 100 } 101 List<String> docIds = new ArrayList<>(); 102 for (DocumentModel doc : nextDocumentsToBeUpdated) { 103 docIds.add(doc.getId()); 104 } 105 BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork( 106 getBlobProviderId() + ":" + repositoryName + ":" + offset, getBlobProviderId()); 107 work.setDocuments(repositoryName, docIds); 108 workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true); 109 offset += maxResult; 110 } while (nextDocumentsToBeUpdated.size() == maxResult); 111 112 } finally { 113 if (session != null) { 114 session.close(); 115 } 116 } 117 } 118 } 119 120}