001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.bulk; 020 021import static org.nuxeo.elasticsearch.bulk.IndexAction.INDEX_UPDATE_ALIAS_PARAM; 022import static org.nuxeo.elasticsearch.bulk.IndexAction.REFRESH_INDEX_PARAM; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.bulk.BulkCodecs; 027import org.nuxeo.ecm.core.bulk.BulkService; 028import org.nuxeo.ecm.core.bulk.message.BulkCommand; 029import org.nuxeo.ecm.core.bulk.message.BulkStatus; 030import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 031import org.nuxeo.lib.stream.codec.Codec; 032import org.nuxeo.lib.stream.computation.AbstractComputation; 033import org.nuxeo.lib.stream.computation.ComputationContext; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.runtime.api.Framework; 036 037/** 038 * On indexing completion, do extra tasks like refresh or update index alias. 039 * 040 * @since 10.3 041 */ 042public class IndexCompletionComputation extends AbstractComputation { 043 private static final Log log = LogFactory.getLog(IndexCompletionComputation.class); 044 045 public static final String NAME = "indexCompletion"; 046 047 protected Codec<BulkStatus> codec; 048 049 public IndexCompletionComputation() { 050 super(NAME, 1, 0); 051 } 052 053 @Override 054 public void init(ComputationContext context) { 055 super.init(context); 056 this.codec = BulkCodecs.getStatusCodec(); 057 } 058 059 @Override 060 public void processRecord(ComputationContext context, String inputStream, Record record) { 061 BulkStatus status = codec.decode(record.getData()); 062 if (IndexAction.ACTION_NAME.equals(status.getAction()) 063 && BulkStatus.State.COMPLETED.equals(status.getState())) { 064 logIndexing(status); 065 BulkService bulkService = Framework.getService(BulkService.class); 066 BulkCommand command = bulkService.getCommand(status.getId()); 067 refreshIndexIfNeeded(command); 068 updateAliasIfNeeded(command); 069 } 070 context.askForCheckpoint(); 071 } 072 073 protected void refreshIndexIfNeeded(BulkCommand command) { 074 Boolean refresh = command.getParam(REFRESH_INDEX_PARAM); 075 if (Boolean.TRUE.equals(refresh)) { 076 log.warn("Refresh index requested by command: " + command.getId()); 077 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 078 esa.refreshRepositoryIndex(command.getRepository()); 079 } 080 } 081 082 protected void updateAliasIfNeeded(BulkCommand command) { 083 Boolean updateAlias = command.getParam(INDEX_UPDATE_ALIAS_PARAM); 084 if (Boolean.TRUE.equals(updateAlias)) { 085 log.warn("Update alias requested by command: " + command.getId()); 086 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 087 esa.syncSearchAndWriteAlias(esa.getIndexNameForRepository(command.getRepository())); 088 } 089 } 090 091 protected void logIndexing(BulkStatus status) { 092 long elapsed = status.getCompletedTime().toEpochMilli() - status.getSubmitTime().toEpochMilli(); 093 long wait = status.getScrollStartTime().toEpochMilli() - status.getSubmitTime().toEpochMilli(); 094 long scroll = status.getScrollEndTime().toEpochMilli() - status.getScrollStartTime().toEpochMilli(); 095 double rate = 1000.0 * status.getTotal() / (elapsed); 096 log.warn(String.format( 097 "Index command: %s completed: %d docs in %.2fs (wait: %.2fs, scroll: %.2fs) rate: %.2f docs/s", 098 status.getId(), status.getTotal(), elapsed / 1000.0, wait / 1000.0, scroll / 1000.0, rate)); 099 } 100 101}