001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.bulk; 020 021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.DONE_STREAM; 022import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 023import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 024import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; 025 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.Map; 029 030import org.nuxeo.lib.stream.computation.Topology; 031import org.nuxeo.runtime.stream.StreamProcessorTopology; 032 033/** 034 * A Nuxeo Bulk Action to index documents. it decouples the document extraction to build the elasticsearch request and 035 * the indexing. 036 * 037 * @since 10.3 038 */ 039public class IndexAction implements StreamProcessorTopology { 040 public static final String ACTION_NAME = "index"; 041 042 // @since 11.1 043 public static final String ACTION_FULL_NAME = "bulk/" + ACTION_NAME; 044 045 public static final String ES_BULK_SIZE_OPTION = "esBulkSizeBytes"; 046 047 public static final int ES_BULK_SIZE_DEFAULT = 5_242_880; 048 049 public static final String ES_BULK_ACTION_OPTION = "esBulkActions"; 050 051 public static final int ES_BULK_ACTION_DEFAULT = 1_000; 052 053 public static final String BULK_FLUSH_INTERVAL_OPTION = "flushIntervalSeconds"; 054 055 public static final int BULK_FLUSH_INTERVAL_DEFAULT = 10; 056 057 public static final String INDEX_UPDATE_ALIAS_PARAM = "updateAlias"; 058 059 public static final String REFRESH_INDEX_PARAM = "refresh"; 060 061 @Override 062 public Topology getTopology(Map<String, String> options) { 063 int esBulkSize = getOptionAsInteger(options, ES_BULK_SIZE_OPTION, ES_BULK_SIZE_DEFAULT); 064 int esBulkActions = getOptionAsInteger(options, ES_BULK_ACTION_OPTION, ES_BULK_ACTION_DEFAULT); 065 int esBulkFlushInterval = getOptionAsInteger(options, BULK_FLUSH_INTERVAL_OPTION, BULK_FLUSH_INTERVAL_DEFAULT); 066 return Topology.builder() 067 .addComputation(IndexRequestComputation::new, Arrays.asList(INPUT_1 + ":" + ACTION_FULL_NAME, // 068 OUTPUT_1 + ":" + BulkIndexComputation.NAME)) 069 .addComputation(() -> new BulkIndexComputation(esBulkSize, esBulkActions, esBulkFlushInterval), 070 Arrays.asList(INPUT_1 + ":" + BulkIndexComputation.NAME, // 071 OUTPUT_1 + ":" + STATUS_STREAM)) 072 .addComputation(() -> new IndexCompletionComputation(), 073 Collections.singletonList(INPUT_1 + ":" + DONE_STREAM)) 074 .build(); 075 076 } 077 078 public static int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) { 079 String value = options.get(option); 080 return value == null ? defaultValue : Integer.parseInt(value); 081 } 082 083}