001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.core.bulk.action.computation;
020
021import static org.nuxeo.ecm.core.bulk.action.computation.SortBlob.SORT_PARAMETER;
022import static org.nuxeo.ecm.core.bulk.action.computation.ZipBlob.ZIP_PARAMETER;
023
024import java.io.FileOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.nio.file.Files;
028import java.nio.file.Path;
029import java.nio.file.StandardCopyOption;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.stream.Collectors;
034
035import org.apache.commons.io.IOUtils;
036import org.apache.logging.log4j.LogManager;
037import org.apache.logging.log4j.Logger;
038import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
039import org.nuxeo.ecm.core.bulk.BulkCodecs;
040import org.nuxeo.ecm.core.bulk.BulkService;
041import org.nuxeo.ecm.core.bulk.message.BulkCommand;
042import org.nuxeo.ecm.core.bulk.message.DataBucket;
043import org.nuxeo.lib.stream.codec.Codec;
044import org.nuxeo.lib.stream.computation.ComputationContext;
045import org.nuxeo.lib.stream.computation.Record;
046import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
047import org.nuxeo.runtime.api.Framework;
048
049/**
050 * @since 10.3
051 */
052public class MakeBlob extends AbstractTransientBlobComputation {
053
054    private static final Logger log = LogManager.getLogger(MakeBlob.class);
055
056    public static final String NAME = "bulk/makeBlob";
057
058    protected static final long CHECK_DELAY_MS = 1000;
059
060    protected static final String SORT_STREAM = OUTPUT_1;
061
062    protected static final String ZIP_STREAM = OUTPUT_2;
063
064    protected static final String EXPOSE_BLOB_STREAM = OUTPUT_3;
065
066    protected static final int NB_OUTPUT_STREAMS = 3;
067
068    protected final Map<String, Long> counters = new HashMap<>();
069
070    protected final Map<String, Long> totals = new HashMap<>();
071
072    protected final Map<String, DataBucket> lastBuckets = new HashMap<>();
073
074    protected final boolean produceImmediate;
075
076    public MakeBlob() {
077        this(false);
078    }
079
080    public MakeBlob(boolean produceImmediate) {
081        super(NAME, NB_OUTPUT_STREAMS);
082        this.produceImmediate = produceImmediate;
083    }
084
085    @Override
086    public void init(ComputationContext context) {
087        super.init(context);
088        context.setTimer("check", System.currentTimeMillis() + CHECK_DELAY_MS);
089    }
090
091    @Override
092    public void processTimer(ComputationContext context, String key, long timestamp) {
093        // This timer is useful only to solve race condition that happens when the total is known
094        // after the records have already been processed
095        List<String> commands = counters.keySet()
096                                        .stream()
097                                        .filter(commandId -> !totals.containsKey(commandId)
098                                                && counters.get(commandId) >= getTotal(commandId))
099                                        .collect(Collectors.toList());
100        commands.forEach(commandId -> finishBlob(context, commandId));
101        context.setTimer("check", System.currentTimeMillis() + CHECK_DELAY_MS);
102    }
103
104    @Override
105    public void processRecord(ComputationContext context, String documentIdsStreamName, Record record) {
106        Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec();
107        DataBucket in = codec.decode(record.getData());
108        String commandId = in.getCommandId();
109        long nbDocuments = in.getCount();
110
111        appendToFile(commandId, in.getData());
112
113        if (counters.containsKey(commandId)) {
114            counters.put(commandId, nbDocuments + counters.get(commandId));
115        } else {
116            counters.put(commandId, nbDocuments);
117        }
118        lastBuckets.put(commandId, in);
119        if (counters.get(commandId) < getTotal(commandId)) {
120            return;
121        }
122        finishBlob(context, commandId);
123    }
124
125    protected Long getTotal(String commandId) {
126        if (!totals.containsKey(commandId)) {
127            long total = Framework.getService(BulkService.class).getStatus(commandId).getTotal();
128            if (total == 0) {
129                return Long.MAX_VALUE;
130            }
131            totals.put(commandId, total);
132        }
133        return totals.get(commandId);
134    }
135
136    protected Path appendToFile(String commandId, byte[] content) {
137        Path path = createTemp(commandId);
138        try (FileOutputStream stream = new FileOutputStream(path.toFile(), true)) {
139            stream.write(content);
140            stream.flush();
141        } catch (IOException e) {
142            log.error("Unable to write content", e);
143        }
144        return path;
145    }
146
147    protected void appendHeaderFooterToFile(Path filePath, String commandId, byte[] header, byte[] footer) {
148        if (header.length == 0 && footer.length == 0) {
149            return;
150        }
151        try {
152            Path tmpPath = Files.move(filePath, createTemp("tmp" + commandId), StandardCopyOption.REPLACE_EXISTING);
153            try (InputStream is = Files.newInputStream(tmpPath);
154                    FileOutputStream os = new FileOutputStream(filePath.toFile(), true)) {
155                if (header.length > 0) {
156                    os.write(header);
157                }
158                IOUtils.copy(is, os);
159                if (footer.length > 0) {
160                    os.write(footer);
161                }
162                os.flush();
163            } finally {
164                Files.delete(tmpPath);
165            }
166        } catch (IOException e) {
167            log.error("Unable to append header and footer", e);
168        }
169    }
170
171    protected String saveInTransientStore(String commandId, String storeName) {
172        Path path = createTemp(commandId);
173        storeBlob(new FileBlob(path.toFile()), commandId, storeName);
174        try {
175            Files.delete(path);
176        } catch (IOException e) {
177            log.error("Unable to delete file", e);
178        }
179        return getTransientStoreKey(commandId);
180    }
181
182    protected String getOutputStream(String commandId) {
183        String outputStream = EXPOSE_BLOB_STREAM;
184        BulkCommand command = Framework.getService(BulkService.class).getCommand(commandId);
185        boolean sort = true;
186        boolean zip = false;
187        if (command != null) {
188            if (command.getParam(SORT_PARAMETER) != null) {
189                sort = command.getParam(SORT_PARAMETER);
190            }
191            if (command.getParam(ZIP_PARAMETER) != null) {
192                zip = command.getParam(ZIP_PARAMETER);
193            }
194        }
195        if (sort) {
196            outputStream = SORT_STREAM;
197        } else if (zip) {
198            outputStream = ZIP_STREAM;
199        }
200        return outputStream;
201    }
202
203    protected void finishBlob(ComputationContext context, String commandId) {
204        String outputStream = getOutputStream(commandId);
205        DataBucket in = lastBuckets.get(commandId);
206        if (!SORT_STREAM.equals(outputStream)) {
207            appendHeaderFooterToFile(createTemp(commandId), commandId, in.getHeader(), in.getFooter());
208        }
209
210        String storeName = Framework.getService(BulkService.class).getStatus(commandId).getAction();
211        String value = saveInTransientStore(commandId, storeName);
212        DataBucket out = new DataBucket(commandId, totals.get(commandId), value, in.getHeaderAsString(),
213                in.getFooterAsString());
214        Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec();
215
216        if (produceImmediate) {
217            ((ComputationContextImpl) context).produceRecordImmediate(outputStream,
218                    Record.of(commandId, codec.encode(out)));
219        } else {
220            context.produceRecord(outputStream, Record.of(commandId, codec.encode(out)));
221        }
222        totals.remove(commandId);
223        counters.remove(commandId);
224        lastBuckets.remove(commandId);
225        // we checkpoint only if there is not another command in progress
226        if (counters.isEmpty()) {
227            context.askForCheckpoint();
228        }
229    }
230
231}