001/* 002* (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003* 004* Licensed under the Apache License, Version 2.0 (the "License"); 005* you may not use this file except in compliance with the License. 006* You may obtain a copy of the License at 007* 008* http://www.apache.org/licenses/LICENSE-2.0 009* 010* Unless required by applicable law or agreed to in writing, software 011* distributed under the License is distributed on an "AS IS" BASIS, 012* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013* See the License for the specific language governing permissions and 014* limitations under the License. 015* 016* Contributors: 017* bdelbosc 018*/ 019 020package org.nuxeo.elasticsearch.bulk; 021 022import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE; 023import static org.nuxeo.elasticsearch.bulk.IndexAction.ACTION_NAME; 024 025import java.io.IOException; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Map; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.elasticsearch.action.bulk.BulkRequest; 034import org.elasticsearch.action.index.IndexRequest; 035import org.elasticsearch.common.bytes.BytesReference; 036import org.elasticsearch.common.io.stream.BytesStreamOutput; 037import org.elasticsearch.common.xcontent.XContentType; 038import org.elasticsearch.index.VersionType; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.DocumentModelList; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.bulk.BulkCodecs; 044import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 045import org.nuxeo.ecm.core.bulk.message.BulkStatus; 046import org.nuxeo.ecm.core.bulk.message.DataBucket; 047import org.nuxeo.elasticsearch.Timestamp; 048import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 049import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 050import org.nuxeo.lib.stream.computation.ComputationContext; 051import org.nuxeo.lib.stream.computation.Record; 052import org.nuxeo.runtime.api.Framework; 053 054/** 055 * Build elasticsearch requests to index documents. 056 * 057 * @since 10.3 058 */ 059public class IndexRequestComputation extends AbstractBulkComputation { 060 private static final Log log = LogFactory.getLog(IndexRequestComputation.class); 061 062 // we want to avoid record bigger than 1MB because they requires specific configuration and impact performance 063 protected static final long MAX_RECORD_SIZE = 900_000; 064 065 protected static final String INDEX_OPTION = "indexName"; 066 067 protected BulkRequest bulkRequest; 068 069 protected List<BulkRequest> bulkRequests = new ArrayList<>(); 070 071 protected String bucketKey; 072 073 public IndexRequestComputation() { 074 super(ACTION_NAME, 1); 075 } 076 077 @Override 078 public void startBucket(String bucketKey) { 079 this.bucketKey = bucketKey; 080 bulkRequests.clear(); 081 bulkRequest = new BulkRequest(); 082 } 083 084 @Override 085 protected void compute(CoreSession session, List<String> documentIds, Map<String, Serializable> properties) { 086 long now = Timestamp.currentTimeMicros(); 087 String indexName = getIndexName(session, properties); 088 DocumentModelList docs = loadDocuments(session, documentIds); 089 ElasticSearchIndexing esi = Framework.getService(ElasticSearchIndexing.class); 090 for (DocumentModel doc : docs) { 091 try { 092 append(new IndexRequest(indexName, DOC_TYPE, doc.getId()).source(esi.source(doc), XContentType.JSON) 093 .versionType(VersionType.EXTERNAL) 094 .version(now)); 095 } catch (IOException e) { 096 throw new NuxeoException("Cannot build source for document: " + doc.getId(), e); 097 } 098 } 099 } 100 101 protected void append(IndexRequest indexRequest) { 102 if (bulkRequest.estimatedSizeInBytes() + indexRequest.source().length() > MAX_RECORD_SIZE) { 103 if (bulkRequest.numberOfActions() > 0) { 104 // Create multiple elastic bulk requests when we exceed the record size 105 bulkRequests.add(bulkRequest); 106 bulkRequest = new BulkRequest(); 107 } 108 if (indexRequest.source().length() > MAX_RECORD_SIZE) { 109 log.warn(String.format("Indexing request for doc: %s, is too large: %d, max record size: %d", 110 indexRequest.id(), indexRequest.source().length(), MAX_RECORD_SIZE)); 111 } 112 } 113 bulkRequest.add(indexRequest); 114 } 115 116 @Override 117 public void endBucket(ComputationContext context, BulkStatus delta) { 118 long bucketSize = delta.getProcessed(); 119 bulkRequests.add(bulkRequest); 120 String commandId = getCurrentCommand().getId(); 121 int i = 0; 122 int count = 0; 123 for (BulkRequest request : bulkRequests) { 124 DataBucket dataBucket = new DataBucket(commandId, request.numberOfActions(), toBytes(request)); 125 // use distinct key to distribute the message evenly between partitions 126 String key = bucketKey + "-" + i++; 127 context.produceRecord(OUTPUT_1, Record.of(key, BulkCodecs.getDataBucketCodec().encode(dataBucket))); 128 count += request.numberOfActions(); 129 } 130 if (count < bucketSize) { 131 log.warn(String.format("Command: %s offset: %s created %d documents out of %d, %d not accessible", 132 commandId, context.getLastOffset(), count, bucketSize, bucketSize - count)); 133 DataBucket dataBucket = new DataBucket(commandId, bucketSize - count, toBytes(new BulkRequest())); 134 context.produceRecord(OUTPUT_1, 135 Record.of(bucketKey + "-missing", BulkCodecs.getDataBucketCodec().encode(dataBucket))); 136 } 137 bulkRequest = null; 138 bulkRequests.clear(); 139 } 140 141 protected String getIndexName(CoreSession session, Map<String, Serializable> properties) { 142 if (properties.containsKey(INDEX_OPTION)) { 143 return (String) properties.get(INDEX_OPTION); 144 } 145 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 146 return esa.getWriteIndexName(esa.getIndexNameForRepository(session.getRepositoryName())); 147 } 148 149 protected byte[] toBytes(BulkRequest request) { 150 BytesStreamOutput out = new BytesStreamOutput(); 151 try { 152 request.writeTo(out); 153 return BytesReference.toBytes(out.bytes()); 154 } catch (IOException e) { 155 throw new NuxeoException("Cannot write elasticsearch bulk request " + request, e); 156 } 157 } 158 159}