001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.core.bulk.action.computation;
020
021import static org.nuxeo.ecm.core.bulk.action.computation.ZipBlob.ZIP_PARAMETER;
022
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.nio.file.Files;
027import java.nio.file.Path;
028
029import org.apache.commons.io.IOUtils;
030import org.apache.logging.log4j.LogManager;
031import org.apache.logging.log4j.Logger;
032import org.nuxeo.ecm.core.api.Blob;
033import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
034import org.nuxeo.ecm.core.bulk.BulkCodecs;
035import org.nuxeo.ecm.core.bulk.BulkService;
036import org.nuxeo.ecm.core.bulk.message.BulkCommand;
037import org.nuxeo.ecm.core.bulk.message.DataBucket;
038import org.nuxeo.lib.stream.codec.Codec;
039import org.nuxeo.lib.stream.computation.ComputationContext;
040import org.nuxeo.lib.stream.computation.Record;
041import org.nuxeo.runtime.api.Framework;
042
043import com.google.code.externalsorting.ExternalSort;
044
045/**
046 * @since 10.3
047 */
048public class SortBlob extends AbstractTransientBlobComputation {
049
050    private static final Logger log = LogManager.getLogger(SortBlob.class);
051
052    public static final String NAME = "bulk/sortBlob";
053
054    public static final String SORT_PARAMETER = "sort";
055
056    protected static final String ZIP_STREAM = OUTPUT_1;
057
058    protected static final String EXPOSE_BLOB_STREAM = OUTPUT_2;
059
060    protected static final int NB_OUTPUT_STREAMS = 2;
061
062    public SortBlob() {
063        super(NAME, NB_OUTPUT_STREAMS);
064    }
065
066    @Override
067    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
068        Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec();
069        DataBucket in = codec.decode(record.getData());
070
071        String commandId = in.getCommandId();
072        String storeName = Framework.getService(BulkService.class).getStatus(commandId).getAction();
073        Blob tmpBlob = getBlob(in.getDataAsString(), storeName);
074        tmpBlob = sort(tmpBlob, commandId);
075
076        // Create a new file to add header and footer
077        Path path = createTemp(commandId);
078        try (InputStream is = tmpBlob.getStream(); FileOutputStream os = new FileOutputStream(path.toFile(), true)) {
079            os.write(in.getHeader());
080            IOUtils.copy(is, os);
081            os.write(in.getFooter());
082            os.flush();
083        } catch (IOException e) {
084            log.error("Unable to copy header/footer", e);
085        }
086        try {
087            Files.delete(tmpBlob.getFile().toPath());
088        } catch (IOException e) {
089            log.error("Unable to delete tmp file", e);
090        }
091
092        storeBlob(new FileBlob(path.toFile()), commandId, storeName);
093
094        BulkCommand command = Framework.getService(BulkService.class).getCommand(commandId);
095        boolean zip = command.getParam(ZIP_PARAMETER) != null ? command.getParam(ZIP_PARAMETER) : false;
096        String outputStream = zip ? ZIP_STREAM : EXPOSE_BLOB_STREAM;
097
098        DataBucket out = new DataBucket(commandId, in.getCount(), getTransientStoreKey(commandId));
099        context.produceRecord(outputStream, Record.of(commandId, codec.encode(out)));
100        context.askForCheckpoint();
101    }
102
103    protected Blob sort(Blob blob, String commandId) {
104        try {
105            Path temp = createTemp("tmp" + commandId);
106            try (var cFile = blob.getCloseableFile()) {
107                ExternalSort.sort(cFile.getFile(), temp.toFile());
108            }
109            return new FileBlob(temp.toFile());
110        } catch (IOException e) {
111            log.error("Unable to sort blob", e);
112            return blob;
113        }
114    }
115
116}