001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.bulk;
020
021import static org.nuxeo.elasticsearch.bulk.IndexAction.INDEX_UPDATE_ALIAS_PARAM;
022import static org.nuxeo.elasticsearch.bulk.IndexAction.REFRESH_INDEX_PARAM;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.bulk.BulkCodecs;
027import org.nuxeo.ecm.core.bulk.BulkService;
028import org.nuxeo.ecm.core.bulk.message.BulkCommand;
029import org.nuxeo.ecm.core.bulk.message.BulkStatus;
030import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
031import org.nuxeo.lib.stream.codec.Codec;
032import org.nuxeo.lib.stream.computation.AbstractComputation;
033import org.nuxeo.lib.stream.computation.ComputationContext;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.runtime.api.Framework;
036
037/**
038 * On indexing completion, do extra tasks like refresh or update index alias.
039 *
040 * @since 10.3
041 */
042public class IndexCompletionComputation extends AbstractComputation {
043    private static final Log log = LogFactory.getLog(IndexCompletionComputation.class);
044
045    public static final String NAME = "indexCompletion";
046
047    protected Codec<BulkStatus> codec;
048
049    public IndexCompletionComputation() {
050        super(NAME, 1, 0);
051    }
052
053    @Override
054    public void init(ComputationContext context) {
055        super.init(context);
056        this.codec = BulkCodecs.getStatusCodec();
057    }
058
059    @Override
060    public void processRecord(ComputationContext context, String inputStream, Record record) {
061        BulkStatus status = codec.decode(record.getData());
062        if (IndexAction.ACTION_NAME.equals(status.getAction())
063                && BulkStatus.State.COMPLETED.equals(status.getState())) {
064            logIndexing(status);
065            BulkService bulkService = Framework.getService(BulkService.class);
066            BulkCommand command = bulkService.getCommand(status.getId());
067            refreshIndexIfNeeded(command);
068            updateAliasIfNeeded(command);
069        }
070        context.askForCheckpoint();
071    }
072
073    protected void refreshIndexIfNeeded(BulkCommand command) {
074        Boolean refresh = command.getParam(REFRESH_INDEX_PARAM);
075        if (Boolean.TRUE.equals(refresh)) {
076            log.warn("Refresh index requested by command: " + command.getId());
077            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
078            esa.refreshRepositoryIndex(command.getRepository());
079        }
080    }
081
082    protected void updateAliasIfNeeded(BulkCommand command) {
083        Boolean updateAlias = command.getParam(INDEX_UPDATE_ALIAS_PARAM);
084        if (Boolean.TRUE.equals(updateAlias)) {
085            log.warn("Update alias requested by command: " + command.getId());
086            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
087            esa.syncSearchAndWriteAlias(esa.getIndexNameForRepository(command.getRepository()));
088        }
089    }
090
091    protected void logIndexing(BulkStatus status) {
092        long elapsed = status.getCompletedTime().toEpochMilli() - status.getSubmitTime().toEpochMilli();
093        long wait = status.getScrollStartTime().toEpochMilli() - status.getSubmitTime().toEpochMilli();
094        long scroll = status.getScrollEndTime().toEpochMilli() - status.getScrollStartTime().toEpochMilli();
095        double rate = 1000.0 * status.getTotal() / (elapsed);
096        log.warn(String.format(
097                "Index command: %s completed: %d docs in %.2fs (wait: %.2fs, scroll: %.2fs) rate: %.2f docs/s",
098                status.getId(), status.getTotal(), elapsed / 1000.0, wait / 1000.0, scroll / 1000.0, rate));
099    }
100
101}