001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Delprat Thierry
018 *     Delbosc Benoit
019 */
020
021package org.nuxeo.elasticsearch.work;
022
023import org.nuxeo.ecm.core.work.api.Work;
024import org.nuxeo.ecm.core.work.api.WorkManager;
025import org.nuxeo.elasticsearch.Timestamp;
026import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
027import org.nuxeo.elasticsearch.commands.IndexingCommand;
028import org.nuxeo.elasticsearch.commands.IndexingCommand.Type;
029import org.nuxeo.runtime.api.Framework;
030
031import java.util.List;
032
033import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_USING_CHILDREN_TRAVERSAL_PROPERTY;
034
035/**
036 * Simple Indexing Worker
037 */
038public class IndexingWorker extends AbstractIndexingWorker implements Work {
039
040    private static final long serialVersionUID = -5141471452954319812L;
041
042    public IndexingWorker(String repositoryName, List<IndexingCommand> cmds) {
043        super(repositoryName, cmds);
044    }
045
046    @Override
047    public String getTitle() {
048        return " ElasticSearch indexing for docs: " + getCmdsDigest();
049    }
050
051    protected boolean needRecurse(IndexingCommand cmd) {
052        if (cmd.isRecurse()) {
053            switch (cmd.getType()) {
054                case INSERT:
055                case UPDATE:
056                case UPDATE_SECURITY:
057                case UPDATE_DIRECT_CHILDREN:
058                    return true;
059                case DELETE:
060                    // recurse deletion is done atomically
061                    return false;
062            }
063        }
064        return false;
065    }
066
067    @Override
068    protected void doIndexingWork(ElasticSearchIndexing esi, List<IndexingCommand> cmds) {
069        long now = Timestamp.currentTimeMicros();
070        for (IndexingCommand cmd : cmds) {
071            cmd.setOrder(now);
072        }
073        esi.indexNonRecursive(cmds);
074        WorkManager wm = Framework.getLocalService(WorkManager.class);
075        for (IndexingCommand cmd : cmds) {
076            if (needRecurse(cmd)) {
077                wm.schedule(getWorker(cmd));
078            }
079        }
080    }
081
082    private Work getWorker(IndexingCommand cmd) {
083        Work ret;
084        if (cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) {
085            ret = new ScrollingIndexingWorker(cmd.getRepositoryName(), String.format(
086                    "SELECT ecm:uuid FROM Document WHERE ecm:parentId = '%s'", cmd.getTargetDocumentId()));
087        } else {
088            boolean useChildrenWorker = Boolean.parseBoolean(Framework.getProperty(REINDEX_USING_CHILDREN_TRAVERSAL_PROPERTY,
089                    "false"));
090            if (useChildrenWorker) {
091                ret = new ChildrenIndexingWorker(cmd);
092            } else {
093                ret = new ScrollingIndexingWorker(cmd.getRepositoryName(), String.format(
094                        "SELECT ecm:uuid FROM Document WHERE ecm:ancestorId = '%s'", cmd.getTargetDocumentId()));
095            }
096        }
097        return ret;
098    }
099
100    public String getCmdsDigest() {
101        String ret = "";
102        for (IndexingCommand cmd : cmds) {
103            ret += " " + cmd.getId();
104        }
105        return ret;
106    }
107}