001/* 002* (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003* 004* Licensed under the Apache License, Version 2.0 (the "License"); 005* you may not use this file except in compliance with the License. 006* You may obtain a copy of the License at 007* 008* http://www.apache.org/licenses/LICENSE-2.0 009* 010* Unless required by applicable law or agreed to in writing, software 011* distributed under the License is distributed on an "AS IS" BASIS, 012* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013* See the License for the specific language governing permissions and 014* limitations under the License. 015* 016* Contributors: 017* bdelbosc 018*/ 019 020package org.nuxeo.elasticsearch.bulk; 021 022import static org.nuxeo.elasticsearch.bulk.IndexAction.ACTION_FULL_NAME; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.elasticsearch.action.bulk.BulkRequest; 033import org.elasticsearch.action.index.IndexRequest; 034import org.elasticsearch.common.bytes.BytesReference; 035import org.elasticsearch.common.io.stream.BytesStreamOutput; 036import org.elasticsearch.common.xcontent.XContentType; 037import org.elasticsearch.index.VersionType; 038import org.nuxeo.ecm.core.api.CoreSession; 039import org.nuxeo.ecm.core.api.DocumentModel; 040import org.nuxeo.ecm.core.api.DocumentModelList; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.bulk.BulkCodecs; 043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 044import org.nuxeo.ecm.core.bulk.message.BulkStatus; 045import org.nuxeo.ecm.core.bulk.message.DataBucket; 046import org.nuxeo.elasticsearch.Timestamp; 047import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 048import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 049import org.nuxeo.lib.stream.computation.ComputationContext; 050import org.nuxeo.lib.stream.computation.Record; 051import org.nuxeo.runtime.api.Framework; 052 053/** 054 * Build elasticsearch requests to index documents. 055 * 056 * @since 10.3 057 */ 058public class IndexRequestComputation extends AbstractBulkComputation { 059 private static final Log log = LogFactory.getLog(IndexRequestComputation.class); 060 061 // we want to avoid record bigger than 1MB because they requires specific configuration and impact performance 062 protected static final long MAX_RECORD_SIZE = 900_000; 063 064 protected static final String INDEX_OPTION = "indexName"; 065 066 protected BulkRequest bulkRequest; 067 068 protected List<BulkRequest> bulkRequests = new ArrayList<>(); 069 070 protected String bucketKey; 071 072 public IndexRequestComputation() { 073 super(ACTION_FULL_NAME, 1); 074 } 075 076 @Override 077 public void startBucket(String bucketKey) { 078 this.bucketKey = bucketKey; 079 bulkRequests.clear(); 080 bulkRequest = new BulkRequest(); 081 } 082 083 @Override 084 protected void compute(CoreSession session, List<String> documentIds, Map<String, Serializable> properties) { 085 long now = Timestamp.currentTimeMicros(); 086 String indexName = getIndexName(session, properties); 087 DocumentModelList docs = loadDocuments(session, documentIds); 088 ElasticSearchIndexing esi = Framework.getService(ElasticSearchIndexing.class); 089 for (DocumentModel doc : docs) { 090 try { 091 append(new IndexRequest(indexName).id(doc.getId()) 092 .source(esi.source(doc), XContentType.JSON) 093 .versionType(VersionType.EXTERNAL) 094 .version(now)); 095 } catch (IOException e) { 096 throw new NuxeoException("Cannot build source for document: " + doc.getId(), e); 097 } 098 } 099 } 100 101 protected void append(IndexRequest indexRequest) { 102 if (bulkRequest.estimatedSizeInBytes() + indexRequest.source().length() > MAX_RECORD_SIZE) { 103 if (bulkRequest.numberOfActions() > 0) { 104 // Create multiple elastic bulk requests when we exceed the record size 105 bulkRequests.add(bulkRequest); 106 bulkRequest = new BulkRequest(); 107 } 108 if (indexRequest.source().length() > MAX_RECORD_SIZE) { 109 log.warn(String.format("Indexing request for doc: %s, is too large: %d, max record size: %d", 110 indexRequest.id(), indexRequest.source().length(), MAX_RECORD_SIZE)); 111 } 112 } 113 bulkRequest.add(indexRequest); 114 } 115 116 @Override 117 public void endBucket(ComputationContext context, BulkStatus delta) { 118 long bucketSize = delta.getProcessed(); 119 bulkRequests.add(bulkRequest); 120 String commandId = getCurrentCommand().getId(); 121 int i = 0; 122 int count = 0; 123 for (BulkRequest request : bulkRequests) { 124 DataBucket dataBucket = new DataBucket(commandId, request.numberOfActions(), toBytes(request)); 125 // use distinct key to distribute the message evenly between partitions 126 String key = bucketKey + "-" + i++; 127 context.produceRecord(OUTPUT_1, Record.of(key, BulkCodecs.getDataBucketCodec().encode(dataBucket))); 128 count += request.numberOfActions(); 129 } 130 if (count < bucketSize) { 131 log.warn(String.format("Command: %s offset: %s created %d documents out of %d, %d not accessible", 132 commandId, context.getLastOffset(), count, bucketSize, bucketSize - count)); 133 DataBucket dataBucket = new DataBucket(commandId, bucketSize - count, toBytes(new BulkRequest())); 134 context.produceRecord(OUTPUT_1, 135 Record.of(bucketKey + "-missing", BulkCodecs.getDataBucketCodec().encode(dataBucket))); 136 } 137 bulkRequest = null; 138 bulkRequests.clear(); 139 } 140 141 protected String getIndexName(CoreSession session, Map<String, Serializable> properties) { 142 if (properties.containsKey(INDEX_OPTION)) { 143 return (String) properties.get(INDEX_OPTION); 144 } 145 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 146 return esa.getWriteIndexName(esa.getIndexNameForRepository(session.getRepositoryName())); 147 } 148 149 protected byte[] toBytes(BulkRequest request) { 150 try (BytesStreamOutput out = new BytesStreamOutput()) { 151 request.writeTo(out); 152 return BytesReference.toBytes(out.bytes()); 153 } catch (IOException e) { 154 throw new NuxeoException("Cannot write elasticsearch bulk request " + request, e); 155 } 156 } 157 158}