001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Delprat Thierry 018 * Delbosc Benoit 019 */ 020 021package org.nuxeo.elasticsearch.work; 022 023import org.nuxeo.ecm.core.work.api.Work; 024import org.nuxeo.ecm.core.work.api.WorkManager; 025import org.nuxeo.elasticsearch.Timestamp; 026import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 027import org.nuxeo.elasticsearch.commands.IndexingCommand; 028import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 029import org.nuxeo.runtime.api.Framework; 030 031import java.util.List; 032 033import static org.nuxeo.elasticsearch.ElasticSearchConstants.REINDEX_USING_CHILDREN_TRAVERSAL_PROPERTY; 034 035/** 036 * Simple Indexing Worker 037 */ 038public class IndexingWorker extends AbstractIndexingWorker implements Work { 039 040 private static final long serialVersionUID = -5141471452954319812L; 041 042 public IndexingWorker(String repositoryName, List<IndexingCommand> cmds) { 043 super(repositoryName, cmds); 044 } 045 046 @Override 047 public String getTitle() { 048 return " ElasticSearch indexing for docs: " + getCmdsDigest(); 049 } 050 051 protected boolean needRecurse(IndexingCommand cmd) { 052 if (cmd.isRecurse()) { 053 switch (cmd.getType()) { 054 case INSERT: 055 case UPDATE: 056 case UPDATE_SECURITY: 057 case UPDATE_DIRECT_CHILDREN: 058 return true; 059 case DELETE: 060 // recurse deletion is done atomically 061 return false; 062 } 063 } 064 return false; 065 } 066 067 @Override 068 protected void doIndexingWork(ElasticSearchIndexing esi, List<IndexingCommand> cmds) { 069 long now = Timestamp.currentTimeMicros(); 070 for (IndexingCommand cmd : cmds) { 071 cmd.setOrder(now); 072 } 073 esi.indexNonRecursive(cmds); 074 WorkManager wm = Framework.getLocalService(WorkManager.class); 075 for (IndexingCommand cmd : cmds) { 076 if (needRecurse(cmd)) { 077 wm.schedule(getWorker(cmd)); 078 } 079 } 080 } 081 082 private Work getWorker(IndexingCommand cmd) { 083 Work ret; 084 if (cmd.getType() == Type.UPDATE_DIRECT_CHILDREN) { 085 ret = new ScrollingIndexingWorker(cmd.getRepositoryName(), String.format( 086 "SELECT ecm:uuid FROM Document WHERE ecm:parentId = '%s'", cmd.getTargetDocumentId())); 087 } else { 088 boolean useChildrenWorker = Boolean.parseBoolean(Framework.getProperty(REINDEX_USING_CHILDREN_TRAVERSAL_PROPERTY, 089 "false")); 090 if (useChildrenWorker) { 091 ret = new ChildrenIndexingWorker(cmd); 092 } else { 093 ret = new ScrollingIndexingWorker(cmd.getRepositoryName(), String.format( 094 "SELECT ecm:uuid FROM Document WHERE ecm:ancestorId = '%s'", cmd.getTargetDocumentId())); 095 } 096 } 097 return ret; 098 } 099 100 public String getCmdsDigest() { 101 String ret = ""; 102 for (IndexingCommand cmd : cmds) { 103 ret += " " + cmd.getId(); 104 } 105 return ret; 106 } 107}