001/*
002* (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003*
004* Licensed under the Apache License, Version 2.0 (the "License");
005* you may not use this file except in compliance with the License.
006* You may obtain a copy of the License at
007*
008*     http://www.apache.org/licenses/LICENSE-2.0
009*
010* Unless required by applicable law or agreed to in writing, software
011* distributed under the License is distributed on an "AS IS" BASIS,
012* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013* See the License for the specific language governing permissions and
014* limitations under the License.
015*
016* Contributors:
017*     bdelbosc
018*/
019
020package org.nuxeo.elasticsearch.bulk;
021
022import static org.nuxeo.elasticsearch.bulk.IndexAction.ACTION_FULL_NAME;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.elasticsearch.action.bulk.BulkRequest;
033import org.elasticsearch.action.index.IndexRequest;
034import org.elasticsearch.common.bytes.BytesReference;
035import org.elasticsearch.common.io.stream.BytesStreamOutput;
036import org.elasticsearch.common.xcontent.XContentType;
037import org.elasticsearch.index.VersionType;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.DocumentModel;
040import org.nuxeo.ecm.core.api.DocumentModelList;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.bulk.BulkCodecs;
043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
044import org.nuxeo.ecm.core.bulk.message.BulkStatus;
045import org.nuxeo.ecm.core.bulk.message.DataBucket;
046import org.nuxeo.elasticsearch.Timestamp;
047import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
048import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
049import org.nuxeo.lib.stream.computation.ComputationContext;
050import org.nuxeo.lib.stream.computation.Record;
051import org.nuxeo.runtime.api.Framework;
052
053/**
054 * Build elasticsearch requests to index documents.
055 *
056 * @since 10.3
057 */
058public class IndexRequestComputation extends AbstractBulkComputation {
059    private static final Log log = LogFactory.getLog(IndexRequestComputation.class);
060
061    // we want to avoid record bigger than 1MB because they requires specific configuration and impact performance
062    protected static final long MAX_RECORD_SIZE = 900_000;
063
064    protected static final String INDEX_OPTION = "indexName";
065
066    protected BulkRequest bulkRequest;
067
068    protected List<BulkRequest> bulkRequests = new ArrayList<>();
069
070    protected String bucketKey;
071
072    public IndexRequestComputation() {
073        super(ACTION_FULL_NAME, 1);
074    }
075
076    @Override
077    public void startBucket(String bucketKey) {
078        this.bucketKey = bucketKey;
079        bulkRequests.clear();
080        bulkRequest = new BulkRequest();
081    }
082
083    @Override
084    protected void compute(CoreSession session, List<String> documentIds, Map<String, Serializable> properties) {
085        long now = Timestamp.currentTimeMicros();
086        String indexName = getIndexName(session, properties);
087        DocumentModelList docs = loadDocuments(session, documentIds);
088        ElasticSearchIndexing esi = Framework.getService(ElasticSearchIndexing.class);
089        for (DocumentModel doc : docs) {
090            try {
091                append(new IndexRequest(indexName).id(doc.getId())
092                                                  .source(esi.source(doc), XContentType.JSON)
093                                                                         .versionType(VersionType.EXTERNAL)
094                                                                         .version(now));
095            } catch (IOException e) {
096                throw new NuxeoException("Cannot build source for document: " + doc.getId(), e);
097            }
098        }
099    }
100
101    protected void append(IndexRequest indexRequest) {
102        if (bulkRequest.estimatedSizeInBytes() + indexRequest.source().length() > MAX_RECORD_SIZE) {
103            if (bulkRequest.numberOfActions() > 0) {
104                // Create multiple elastic bulk requests when we exceed the record size
105                bulkRequests.add(bulkRequest);
106                bulkRequest = new BulkRequest();
107            }
108            if (indexRequest.source().length() > MAX_RECORD_SIZE) {
109                log.warn(String.format("Indexing request for doc: %s, is too large: %d, max record size: %d",
110                        indexRequest.id(), indexRequest.source().length(), MAX_RECORD_SIZE));
111            }
112        }
113        bulkRequest.add(indexRequest);
114    }
115
116    @Override
117    public void endBucket(ComputationContext context, BulkStatus delta) {
118        long bucketSize = delta.getProcessed();
119        bulkRequests.add(bulkRequest);
120        String commandId = getCurrentCommand().getId();
121        int i = 0;
122        int count = 0;
123        for (BulkRequest request : bulkRequests) {
124            DataBucket dataBucket = new DataBucket(commandId, request.numberOfActions(), toBytes(request));
125            // use distinct key to distribute the message evenly between partitions
126            String key = bucketKey + "-" + i++;
127            context.produceRecord(OUTPUT_1, Record.of(key, BulkCodecs.getDataBucketCodec().encode(dataBucket)));
128            count += request.numberOfActions();
129        }
130        if (count < bucketSize) {
131            log.warn(String.format("Command: %s offset: %s created %d documents out of %d, %d not accessible",
132                    commandId, context.getLastOffset(), count, bucketSize, bucketSize - count));
133            DataBucket dataBucket = new DataBucket(commandId, bucketSize - count, toBytes(new BulkRequest()));
134            context.produceRecord(OUTPUT_1,
135                    Record.of(bucketKey + "-missing", BulkCodecs.getDataBucketCodec().encode(dataBucket)));
136        }
137        bulkRequest = null;
138        bulkRequests.clear();
139    }
140
141    protected String getIndexName(CoreSession session, Map<String, Serializable> properties) {
142        if (properties.containsKey(INDEX_OPTION)) {
143            return (String) properties.get(INDEX_OPTION);
144        }
145        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
146        return esa.getWriteIndexName(esa.getIndexNameForRepository(session.getRepositoryName()));
147    }
148
149    protected byte[] toBytes(BulkRequest request) {
150        try (BytesStreamOutput out = new BytesStreamOutput()) {
151            request.writeTo(out);
152            return BytesReference.toBytes(out.bytes());
153        } catch (IOException e) {
154            throw new NuxeoException("Cannot write elasticsearch bulk request " + request, e);
155        }
156    }
157
158}