001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk.action.computation; 020 021import static org.nuxeo.ecm.core.bulk.action.computation.ZipBlob.ZIP_PARAMETER; 022 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.nio.file.Files; 027import java.nio.file.Path; 028 029import org.apache.commons.io.IOUtils; 030import org.apache.logging.log4j.LogManager; 031import org.apache.logging.log4j.Logger; 032import org.nuxeo.ecm.core.api.Blob; 033import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 034import org.nuxeo.ecm.core.bulk.BulkCodecs; 035import org.nuxeo.ecm.core.bulk.BulkService; 036import org.nuxeo.ecm.core.bulk.message.BulkCommand; 037import org.nuxeo.ecm.core.bulk.message.DataBucket; 038import org.nuxeo.lib.stream.codec.Codec; 039import org.nuxeo.lib.stream.computation.ComputationContext; 040import org.nuxeo.lib.stream.computation.Record; 041import org.nuxeo.runtime.api.Framework; 042 043import com.google.code.externalsorting.ExternalSort; 044 045/** 046 * @since 10.3 047 */ 048public class SortBlob extends AbstractTransientBlobComputation { 049 050 private static final Logger log = LogManager.getLogger(SortBlob.class); 051 052 public static final String NAME = "bulk/sortBlob"; 053 054 public static final String SORT_PARAMETER = "sort"; 055 056 protected static final String ZIP_STREAM = OUTPUT_1; 057 058 protected static final String EXPOSE_BLOB_STREAM = OUTPUT_2; 059 060 protected static final int NB_OUTPUT_STREAMS = 2; 061 062 public SortBlob() { 063 super(NAME, NB_OUTPUT_STREAMS); 064 } 065 066 @Override 067 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 068 Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec(); 069 DataBucket in = codec.decode(record.getData()); 070 071 String commandId = in.getCommandId(); 072 String storeName = Framework.getService(BulkService.class).getStatus(commandId).getAction(); 073 Blob tmpBlob = getBlob(in.getDataAsString(), storeName); 074 tmpBlob = sort(tmpBlob, commandId); 075 076 // Create a new file to add header and footer 077 Path path = createTemp(commandId); 078 try (InputStream is = tmpBlob.getStream(); FileOutputStream os = new FileOutputStream(path.toFile(), true)) { 079 os.write(in.getHeader()); 080 IOUtils.copy(is, os); 081 os.write(in.getFooter()); 082 os.flush(); 083 } catch (IOException e) { 084 log.error("Unable to copy header/footer", e); 085 } 086 try { 087 Files.delete(tmpBlob.getFile().toPath()); 088 } catch (IOException e) { 089 log.error("Unable to delete tmp file", e); 090 } 091 092 storeBlob(new FileBlob(path.toFile()), commandId, storeName); 093 094 BulkCommand command = Framework.getService(BulkService.class).getCommand(commandId); 095 boolean zip = command.getParam(ZIP_PARAMETER) != null ? command.getParam(ZIP_PARAMETER) : false; 096 String outputStream = zip ? ZIP_STREAM : EXPOSE_BLOB_STREAM; 097 098 DataBucket out = new DataBucket(commandId, in.getCount(), getTransientStoreKey(commandId)); 099 context.produceRecord(outputStream, Record.of(commandId, codec.encode(out))); 100 context.askForCheckpoint(); 101 } 102 103 protected Blob sort(Blob blob, String commandId) { 104 try { 105 Path temp = createTemp("tmp" + commandId); 106 try (var cFile = blob.getCloseableFile()) { 107 ExternalSort.sort(cFile.getFile(), temp.toFile()); 108 } 109 return new FileBlob(temp.toFile()); 110 } catch (IOException e) { 111 log.error("Unable to sort blob", e); 112 return blob; 113 } 114 } 115 116}