001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.bulk; 020 021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.DONE_STREAM; 022import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 023import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 024import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; 025 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.Map; 029 030import org.nuxeo.lib.stream.computation.Topology; 031import org.nuxeo.runtime.stream.StreamProcessorTopology; 032 033/** 034 * A Nuxeo Bulk Action to index documents. it decouples the document extraction to build the elasticsearch request and 035 * the indexing. 036 * 037 * @since 10.3 038 */ 039public class IndexAction implements StreamProcessorTopology { 040 public static final String ACTION_NAME = "index"; 041 042 public static final String ES_BULK_SIZE_OPTION = "esBulkSizeBytes"; 043 044 public static final int ES_BULK_SIZE_DEFAULT = 5_242_880; 045 046 public static final String ES_BULK_ACTION_OPTION = "esBulkActions"; 047 048 public static final int ES_BULK_ACTION_DEFAULT = 1_000; 049 050 public static final String BULK_FLUSH_INTERVAL_OPTION = "flushIntervalSeconds"; 051 052 public static final int BULK_FLUSH_INTERVAL_DEFAULT = 10; 053 054 public static final String INDEX_UPDATE_ALIAS_PARAM = "updateAlias"; 055 056 public static final String REFRESH_INDEX_PARAM = "refresh"; 057 058 @Override 059 public Topology getTopology(Map<String, String> options) { 060 int esBulkSize = getOptionAsInteger(options, ES_BULK_SIZE_OPTION, ES_BULK_SIZE_DEFAULT); 061 int esBulkActions = getOptionAsInteger(options, ES_BULK_ACTION_OPTION, ES_BULK_ACTION_DEFAULT); 062 int esBulkFlushInterval = getOptionAsInteger(options, BULK_FLUSH_INTERVAL_OPTION, BULK_FLUSH_INTERVAL_DEFAULT); 063 return Topology.builder() 064 .addComputation(IndexRequestComputation::new, Arrays.asList(INPUT_1 + ":" + ACTION_NAME, // 065 OUTPUT_1 + ":" + BulkIndexComputation.NAME)) 066 .addComputation(() -> new BulkIndexComputation(esBulkSize, esBulkActions, esBulkFlushInterval), 067 Arrays.asList(INPUT_1 + ":" + BulkIndexComputation.NAME, // 068 OUTPUT_1 + ":" + STATUS_STREAM)) 069 .addComputation(() -> new IndexCompletionComputation(), 070 Collections.singletonList(INPUT_1 + ":" + DONE_STREAM)) 071 .build(); 072 073 } 074 075 public static int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) { 076 String value = options.get(option); 077 return value == null ? defaultValue : Integer.parseInt(value); 078 } 079 080}