001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     <a href="mailto:grenard@nuxeo.com">Guillaume Renard</a>
016 *
017 */
018
019package org.nuxeo.ecm.liveconnect.update;
020
021import java.io.IOException;
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.nuxeo.ecm.core.api.CoreInstance;
029import org.nuxeo.ecm.core.api.CoreSession;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.api.repository.RepositoryManager;
032import org.nuxeo.ecm.core.work.api.WorkManager;
033import org.nuxeo.ecm.liveconnect.update.listener.BlobProviderDocumentsUpdateListener;
034import org.nuxeo.ecm.liveconnect.update.worker.BlobProviderDocumentsUpdateWork;
035import org.nuxeo.ecm.platform.query.api.PageProvider;
036import org.nuxeo.ecm.platform.query.api.PageProviderService;
037import org.nuxeo.ecm.platform.query.nxql.CoreQueryDocumentPageProvider;
038import org.nuxeo.runtime.api.Framework;
039
040/**
041 * Interface to batch update documents provided by implementing provider. The method {@link #processDocumentsUpdate()}
042 * is called by {@link BlobProviderDocumentsUpdateListener}.
043 * <p>
044 * The implementation of {@link #processDocumentsUpdate()} must schedule a {@link BlobProviderDocumentsUpdateWork} with
045 * the document ids to be checked and updated if needed.
046 * <p>
047 * The @{link BlobProviderDocumentsUpdateWork} will then call the implementation of
048 * {@link #checkChangesAndUpdateBlob(List)}.
049 * <p>
050 * Note that it is recommended to schedule many workers dealing with a smaller amount of documents (using
051 * {@link #MAX_RESULT}) rather than a single one processing all document brought by the provider.
052 *
053 * @since 7.3
054 */
055public interface BatchUpdateBlobProvider {
056
057    static final long MAX_RESULT = 50;
058
059    /**
060     * Check the given list of document for change and update if needed. Note that session.save still needs to be called
061     * on changed documents.
062     *
063     * @param documents to be checked for update
064     * @return the list of DocumentModel that have changed
065     * @throws IOException
066     */
067    List<DocumentModel> checkChangesAndUpdateBlob(List<DocumentModel> doc);
068
069    String getPageProviderNameForUpdate();
070
071    String getBlobProviderId();
072
073    /**
074     * Trigger the documents update for the implementing providers.
075     */
076    default void processDocumentsUpdate() {
077        final RepositoryManager repositoryManager = Framework.getLocalService(RepositoryManager.class);
078        final WorkManager workManager = Framework.getLocalService(WorkManager.class);
079        for (String repositoryName : repositoryManager.getRepositoryNames()) {
080            CoreSession session = null;
081            try {
082                session = CoreInstance.openCoreSessionSystem(repositoryName);
083
084                long offset = 0;
085                List<DocumentModel> nextDocumentsToBeUpdated;
086                PageProviderService ppService = Framework.getService(PageProviderService.class);
087                Map<String, Serializable> props = new HashMap<String, Serializable>();
088                props.put(CoreQueryDocumentPageProvider.CORE_SESSION_PROPERTY, (Serializable) session);
089                @SuppressWarnings("unchecked")
090                PageProvider<DocumentModel> pp = (PageProvider<DocumentModel>) ppService.getPageProvider(
091                        getPageProviderNameForUpdate(), null, null, null, props);
092                final long maxResult = pp.getPageSize();
093                do {
094                    pp.setCurrentPageOffset(offset);
095                    pp.refresh();
096                    nextDocumentsToBeUpdated = pp.getCurrentPage();
097
098                    if (nextDocumentsToBeUpdated.isEmpty()) {
099                        break;
100                    }
101                    List<String> docIds = new ArrayList<>();
102                    for (DocumentModel doc : nextDocumentsToBeUpdated) {
103                        docIds.add(doc.getId());
104                    }
105                    BlobProviderDocumentsUpdateWork work = new BlobProviderDocumentsUpdateWork(
106                            getBlobProviderId() + ":" + repositoryName + ":" + offset, getBlobProviderId());
107                    work.setDocuments(repositoryName, docIds);
108                    workManager.schedule(work, WorkManager.Scheduling.IF_NOT_SCHEDULED, true);
109                    offset += maxResult;
110                } while (nextDocumentsToBeUpdated.size() == maxResult);
111
112            } finally {
113                if (session != null) {
114                    session.close();
115                }
116            }
117        }
118    }
119
120}