001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Nuxeo
016 */
017
018package org.nuxeo.elasticsearch.work;
019
020import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_BUCKET_WRITE_PROPERTY;
021
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.nuxeo.ecm.core.api.CoreSession;
028import org.nuxeo.ecm.core.api.DocumentLocation;
029import org.nuxeo.ecm.core.api.DocumentModel;
030import org.nuxeo.ecm.core.query.sql.NXQL;
031import org.nuxeo.ecm.core.work.api.Work;
032import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
033import org.nuxeo.elasticsearch.commands.IndexingCommand;
034import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
035import org.nuxeo.runtime.api.Framework;
036
037/**
038 * œ Worker to index a bucket of documents
039 *
040 * @since 7.1
041 */
042public class BucketIndexingWorker extends BaseIndexingWorker implements Work {
043    private static final Log log = LogFactory.getLog(BucketIndexingWorker.class);
044
045    private static final long serialVersionUID = -4665673026513796882L;
046
047    private static final String DEFAULT_BUCKET_SIZE = "50";
048
049    private final boolean warnAtEnd;
050
051    private final int documentCount;
052
053    public BucketIndexingWorker(String repositoryName, List<String> docIds, boolean warnAtEnd) {
054        setDocuments(repositoryName, docIds);
055        documentCount = docIds.size();
056        this.warnAtEnd = warnAtEnd;
057    }
058
059    @Override
060    public String getTitle() {
061        return " ElasticSearch bucket indexer size " + documentCount;
062    }
063
064    @Override
065    protected void doWork() {
066        ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class);
067        CoreSession session = initSession(repositoryName);
068        int bucketSize = Math.min(documentCount, getBucketSize());
069        List<String> ids = new ArrayList<>(bucketSize);
070        for (DocumentLocation doc : getDocuments()) {
071            ids.add(doc.getIdRef().value);
072            if ((ids.size() % bucketSize) == 0) {
073                esi.indexNonRecursive(getIndexingCommands(session, ids));
074                ids.clear();
075            }
076        }
077        if (!ids.isEmpty()) {
078            esi.indexNonRecursive(getIndexingCommands(session, ids));
079            ids.clear();
080        }
081        if (warnAtEnd) {
082            log.warn(String.format("Re-indexing job: %s completed.", getSchedulePath().getParentPath()));
083        }
084    }
085
086    private List<IndexingCommand> getIndexingCommands(CoreSession session, List<String> ids) {
087        List<IndexingCommand> ret = new ArrayList<>(ids.size());
088        for (DocumentModel doc : fetchDocuments(session, ids)) {
089            IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, false);
090            ret.add(cmd);
091        }
092        return ret;
093    }
094
095    private List<DocumentModel> fetchDocuments(CoreSession session, List<String> ids) {
096        StringBuilder sb = new StringBuilder();
097        sb.append("SELECT * FROM Document, Relation WHERE ecm:uuid IN (");
098        for (int i = 0; i < ids.size(); i++) {
099            sb.append(NXQL.escapeString(ids.get(i)));
100            if (i < ids.size() - 1) {
101                sb.append(", ");
102            }
103        }
104        sb.append(")");
105        // read invalidation
106        session.save();
107        return session.query(sb.toString());
108    }
109
110    protected int getBucketSize() {
111        String value = Framework.getProperty(REINDEX_BUCKET_WRITE_PROPERTY, DEFAULT_BUCKET_SIZE);
112        return Integer.parseInt(value);
113    }
114
115}