001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk.action.computation; 020 021import static org.nuxeo.ecm.core.bulk.action.computation.SortBlob.SORT_PARAMETER; 022import static org.nuxeo.ecm.core.bulk.action.computation.ZipBlob.ZIP_PARAMETER; 023 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.nio.file.Files; 028import java.nio.file.Path; 029import java.nio.file.StandardCopyOption; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.stream.Collectors; 034 035import org.apache.commons.io.IOUtils; 036import org.apache.logging.log4j.LogManager; 037import org.apache.logging.log4j.Logger; 038import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 039import org.nuxeo.ecm.core.bulk.BulkCodecs; 040import org.nuxeo.ecm.core.bulk.BulkService; 041import org.nuxeo.ecm.core.bulk.message.BulkCommand; 042import org.nuxeo.ecm.core.bulk.message.DataBucket; 043import org.nuxeo.lib.stream.codec.Codec; 044import org.nuxeo.lib.stream.computation.ComputationContext; 045import org.nuxeo.lib.stream.computation.Record; 046import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl; 047import org.nuxeo.runtime.api.Framework; 048 049/** 050 * @since 10.3 051 */ 052public class MakeBlob extends AbstractTransientBlobComputation { 053 054 private static final Logger log = LogManager.getLogger(MakeBlob.class); 055 056 public static final String NAME = "makeBlob"; 057 058 protected static final long CHECK_DELAY_MS = 1000; 059 060 protected static final String SORT_STREAM = OUTPUT_1; 061 062 protected static final String ZIP_STREAM = OUTPUT_2; 063 064 protected static final String EXPOSE_BLOB_STREAM = OUTPUT_3; 065 066 protected static final int NB_OUTPUT_STREAMS = 3; 067 068 protected final Map<String, Long> counters = new HashMap<>(); 069 070 protected final Map<String, Long> totals = new HashMap<>(); 071 072 protected final Map<String, DataBucket> lastBuckets = new HashMap<>(); 073 074 protected final boolean produceImmediate; 075 076 public MakeBlob() { 077 this(false); 078 } 079 080 public MakeBlob(boolean produceImmediate) { 081 super(NAME, NB_OUTPUT_STREAMS); 082 this.produceImmediate = produceImmediate; 083 } 084 085 @Override 086 public void init(ComputationContext context) { 087 super.init(context); 088 context.setTimer("check", System.currentTimeMillis() + CHECK_DELAY_MS); 089 } 090 091 @Override 092 public void processTimer(ComputationContext context, String key, long timestamp) { 093 // This timer is useful only to solve race condition that happens when the total is known 094 // after the records have already been processed 095 List<String> commands = counters.keySet() 096 .stream() 097 .filter(commandId -> !totals.containsKey(commandId) 098 && counters.get(commandId) >= getTotal(commandId)) 099 .collect(Collectors.toList()); 100 commands.forEach(commandId -> finishBlob(context, commandId)); 101 context.setTimer("check", System.currentTimeMillis() + CHECK_DELAY_MS); 102 } 103 104 @Override 105 public void processRecord(ComputationContext context, String documentIdsStreamName, Record record) { 106 Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec(); 107 DataBucket in = codec.decode(record.getData()); 108 String commandId = in.getCommandId(); 109 long nbDocuments = in.getCount(); 110 111 appendToFile(commandId, in.getData()); 112 113 if (counters.containsKey(commandId)) { 114 counters.put(commandId, nbDocuments + counters.get(commandId)); 115 } else { 116 counters.put(commandId, nbDocuments); 117 } 118 lastBuckets.put(commandId, in); 119 if (counters.get(commandId) < getTotal(commandId)) { 120 return; 121 } 122 finishBlob(context, commandId); 123 } 124 125 protected Long getTotal(String commandId) { 126 if (!totals.containsKey(commandId)) { 127 long total = Framework.getService(BulkService.class).getStatus(commandId).getTotal(); 128 if (total == 0) { 129 return Long.MAX_VALUE; 130 } 131 totals.put(commandId, total); 132 } 133 return totals.get(commandId); 134 } 135 136 protected Path appendToFile(String commandId, byte[] content) { 137 Path path = createTemp(commandId); 138 try (FileOutputStream stream = new FileOutputStream(path.toFile(), true)) { 139 stream.write(content); 140 stream.flush(); 141 } catch (IOException e) { 142 log.error("Unable to write content", e); 143 } 144 return path; 145 } 146 147 protected void appendHeaderFooterToFile(Path filePath, String commandId, byte[] header, byte[] footer) { 148 if (header.length == 0 && footer.length == 0) { 149 return; 150 } 151 try { 152 Path tmpPath = Files.move(filePath, createTemp("tmp" + commandId), StandardCopyOption.REPLACE_EXISTING); 153 try (InputStream is = Files.newInputStream(tmpPath); 154 FileOutputStream os = new FileOutputStream(filePath.toFile(), true)) { 155 if (header.length > 0) { 156 os.write(header); 157 } 158 IOUtils.copy(is, os); 159 if (footer.length > 0) { 160 os.write(footer); 161 } 162 os.flush(); 163 } finally { 164 Files.delete(tmpPath); 165 } 166 } catch (IOException e) { 167 log.error("Unable to append header and footer", e); 168 } 169 } 170 171 protected String saveInTransientStore(String commandId, String storeName) { 172 Path path = createTemp(commandId); 173 storeBlob(new FileBlob(path.toFile()), commandId, storeName); 174 try { 175 Files.delete(path); 176 } catch (IOException e) { 177 log.error("Unable to delete file", e); 178 } 179 return getTransientStoreKey(commandId); 180 } 181 182 protected String getOutputStream(String commandId) { 183 String outputStream = EXPOSE_BLOB_STREAM; 184 BulkCommand command = Framework.getService(BulkService.class).getCommand(commandId); 185 boolean sort = true; 186 boolean zip = false; 187 if (command != null) { 188 if (command.getParam(SORT_PARAMETER) != null) { 189 sort = command.getParam(SORT_PARAMETER); 190 } 191 if (command.getParam(ZIP_PARAMETER) != null) { 192 zip = command.getParam(ZIP_PARAMETER); 193 } 194 } 195 if (sort) { 196 outputStream = SORT_STREAM; 197 } else if (zip) { 198 outputStream = ZIP_STREAM; 199 } 200 return outputStream; 201 } 202 203 protected void finishBlob(ComputationContext context, String commandId) { 204 String outputStream = getOutputStream(commandId); 205 DataBucket in = lastBuckets.get(commandId); 206 if (!SORT_STREAM.equals(outputStream)) { 207 appendHeaderFooterToFile(createTemp(commandId), commandId, in.getHeader(), in.getFooter()); 208 } 209 210 String storeName = Framework.getService(BulkService.class).getStatus(commandId).getAction(); 211 String value = saveInTransientStore(commandId, storeName); 212 DataBucket out = new DataBucket(commandId, totals.get(commandId), value, in.getHeaderAsString(), 213 in.getFooterAsString()); 214 Codec<DataBucket> codec = BulkCodecs.getDataBucketCodec(); 215 216 if (produceImmediate) { 217 ((ComputationContextImpl) context).produceRecordImmediate(outputStream, 218 Record.of(commandId, codec.encode(out))); 219 } else { 220 context.produceRecord(outputStream, Record.of(commandId, codec.encode(out))); 221 } 222 totals.remove(commandId); 223 counters.remove(commandId); 224 lastBuckets.remove(commandId); 225 // we checkpoint only if there is not another command in progress 226 if (counters.isEmpty()) { 227 context.askForCheckpoint(); 228 } 229 } 230 231}