001/*
002* (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003*
004* Licensed under the Apache License, Version 2.0 (the "License");
005* you may not use this file except in compliance with the License.
006* You may obtain a copy of the License at
007*
008*     http://www.apache.org/licenses/LICENSE-2.0
009*
010* Unless required by applicable law or agreed to in writing, software
011* distributed under the License is distributed on an "AS IS" BASIS,
012* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013* See the License for the specific language governing permissions and
014* limitations under the License.
015*
016* Contributors:
017*     bdelbosc
018*/
019
020package org.nuxeo.elasticsearch.bulk;
021
022import static org.nuxeo.elasticsearch.ElasticSearchConstants.DOC_TYPE;
023import static org.nuxeo.elasticsearch.bulk.IndexAction.ACTION_NAME;
024
025import java.io.IOException;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.elasticsearch.action.bulk.BulkRequest;
034import org.elasticsearch.action.index.IndexRequest;
035import org.elasticsearch.common.bytes.BytesReference;
036import org.elasticsearch.common.io.stream.BytesStreamOutput;
037import org.elasticsearch.common.xcontent.XContentType;
038import org.elasticsearch.index.VersionType;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.DocumentModelList;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.bulk.BulkCodecs;
044import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
045import org.nuxeo.ecm.core.bulk.message.BulkStatus;
046import org.nuxeo.ecm.core.bulk.message.DataBucket;
047import org.nuxeo.elasticsearch.Timestamp;
048import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
049import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
050import org.nuxeo.lib.stream.computation.ComputationContext;
051import org.nuxeo.lib.stream.computation.Record;
052import org.nuxeo.runtime.api.Framework;
053
054/**
055 * Build elasticsearch requests to index documents.
056 *
057 * @since 10.3
058 */
059public class IndexRequestComputation extends AbstractBulkComputation {
060    private static final Log log = LogFactory.getLog(IndexRequestComputation.class);
061
062    // we want to avoid record bigger than 1MB because they requires specific configuration and impact performance
063    protected static final long MAX_RECORD_SIZE = 900_000;
064
065    protected static final String INDEX_OPTION = "indexName";
066
067    protected BulkRequest bulkRequest;
068
069    protected List<BulkRequest> bulkRequests = new ArrayList<>();
070
071    protected String bucketKey;
072
073    public IndexRequestComputation() {
074        super(ACTION_NAME, 1);
075    }
076
077    @Override
078    public void startBucket(String bucketKey) {
079        this.bucketKey = bucketKey;
080        bulkRequests.clear();
081        bulkRequest = new BulkRequest();
082    }
083
084    @Override
085    protected void compute(CoreSession session, List<String> documentIds, Map<String, Serializable> properties) {
086        long now = Timestamp.currentTimeMicros();
087        String indexName = getIndexName(session, properties);
088        DocumentModelList docs = loadDocuments(session, documentIds);
089        ElasticSearchIndexing esi = Framework.getService(ElasticSearchIndexing.class);
090        for (DocumentModel doc : docs) {
091            try {
092                append(new IndexRequest(indexName, DOC_TYPE, doc.getId()).source(esi.source(doc), XContentType.JSON)
093                                                                         .versionType(VersionType.EXTERNAL)
094                                                                         .version(now));
095            } catch (IOException e) {
096                throw new NuxeoException("Cannot build source for document: " + doc.getId(), e);
097            }
098        }
099    }
100
101    protected void append(IndexRequest indexRequest) {
102        if (bulkRequest.estimatedSizeInBytes() + indexRequest.source().length() > MAX_RECORD_SIZE) {
103            if (bulkRequest.numberOfActions() > 0) {
104                // Create multiple elastic bulk requests when we exceed the record size
105                bulkRequests.add(bulkRequest);
106                bulkRequest = new BulkRequest();
107            }
108            if (indexRequest.source().length() > MAX_RECORD_SIZE) {
109                log.warn(String.format("Indexing request for doc: %s, is too large: %d, max record size: %d",
110                        indexRequest.id(), indexRequest.source().length(), MAX_RECORD_SIZE));
111            }
112        }
113        bulkRequest.add(indexRequest);
114    }
115
116    @Override
117    public void endBucket(ComputationContext context, BulkStatus delta) {
118        long bucketSize = delta.getProcessed();
119        bulkRequests.add(bulkRequest);
120        String commandId = getCurrentCommand().getId();
121        int i = 0;
122        int count = 0;
123        for (BulkRequest request : bulkRequests) {
124            DataBucket dataBucket = new DataBucket(commandId, request.numberOfActions(), toBytes(request));
125            // use distinct key to distribute the message evenly between partitions
126            String key = bucketKey + "-" + i++;
127            context.produceRecord(OUTPUT_1, Record.of(key, BulkCodecs.getDataBucketCodec().encode(dataBucket)));
128            count += request.numberOfActions();
129        }
130        if (count < bucketSize) {
131            log.warn(String.format("Command: %s offset: %s created %d documents out of %d, %d not accessible",
132                    commandId, context.getLastOffset(), count, bucketSize, bucketSize - count));
133            DataBucket dataBucket = new DataBucket(commandId, bucketSize - count, toBytes(new BulkRequest()));
134            context.produceRecord(OUTPUT_1,
135                    Record.of(bucketKey + "-missing", BulkCodecs.getDataBucketCodec().encode(dataBucket)));
136        }
137        bulkRequest = null;
138        bulkRequests.clear();
139    }
140
141    protected String getIndexName(CoreSession session, Map<String, Serializable> properties) {
142        if (properties.containsKey(INDEX_OPTION)) {
143            return (String) properties.get(INDEX_OPTION);
144        }
145        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
146        return esa.getWriteIndexName(esa.getIndexNameForRepository(session.getRepositoryName()));
147    }
148
149    protected byte[] toBytes(BulkRequest request) {
150        BytesStreamOutput out = new BytesStreamOutput();
151        try {
152            request.writeTo(out);
153            return BytesReference.toBytes(out.bytes());
154        } catch (IOException e) {
155            throw new NuxeoException("Cannot write elasticsearch bulk request " + request, e);
156        }
157    }
158
159}