001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.platform.csv.export.computation; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.nuxeo.ecm.core.io.marshallers.csv.AbstractCSVWriter.TEXT_CSV_TYPE; 023import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.SCHEMAS_CTX_DATA; 024import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.XPATHS_CTX_DATA; 025import static org.nuxeo.ecm.platform.csv.export.io.DocumentPropertyCSVWriter.LANG_CTX_DATA; 026 027import java.io.IOException; 028import java.io.Serializable; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.csv.CSVFormat; 035import org.apache.logging.log4j.LogManager; 036import org.apache.logging.log4j.Logger; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModelList; 039import org.nuxeo.ecm.core.bulk.BulkCodecs; 040import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 041import org.nuxeo.ecm.core.bulk.message.BulkCommand; 042import org.nuxeo.ecm.core.bulk.message.BulkStatus; 043import org.nuxeo.ecm.core.bulk.message.DataBucket; 044import org.nuxeo.ecm.core.io.marshallers.csv.OutputStreamWithCSVWriter; 045import org.nuxeo.ecm.core.io.registry.MarshallerRegistry; 046import org.nuxeo.ecm.core.io.registry.Writer; 047import org.nuxeo.ecm.core.io.registry.context.RenderingContext; 048import org.nuxeo.ecm.platform.csv.export.action.CSVExportAction; 049import org.nuxeo.lib.stream.computation.ComputationContext; 050import org.nuxeo.lib.stream.computation.Record; 051import org.nuxeo.runtime.api.Framework; 052 053/** 054 * </ul> 055 * Outputs 056 * <ul> 057 * <li>o1: Writes {@link org.nuxeo.lib.stream.computation.Record} containing csv lines</li> 058 * </ul> 059 * 060 * @since 10.3 061 */ 062public class CSVProjectionComputation extends AbstractBulkComputation { 063 064 private static final Logger log = LogManager.getLogger(CSVProjectionComputation.class); 065 066 public static final String PARAM_SCHEMAS = "schemas"; 067 068 public static final String PARAM_XPATHS = "xpaths"; 069 070 public static final String PARAM_LANG = "lang"; 071 072 protected OutputStreamWithCSVWriter out; 073 074 protected RenderingContext renderingCtx; 075 076 public CSVProjectionComputation() { 077 super(CSVExportAction.ACTION_NAME); 078 } 079 080 @Override 081 public void startBucket(String bucketKey) { 082 out = new OutputStreamWithCSVWriter(); 083 BulkCommand command = getCurrentCommand(); 084 renderingCtx = RenderingContext.CtxBuilder.get(); 085 renderingCtx.setParameterValues(SCHEMAS_CTX_DATA, getList(command.getParams().get(PARAM_SCHEMAS))); 086 renderingCtx.setParameterValues(XPATHS_CTX_DATA, getList(command.getParams().get(PARAM_XPATHS))); 087 renderingCtx.setParameterValues(LANG_CTX_DATA, getString(command.getParams().get(PARAM_LANG))); 088 } 089 090 @Override 091 protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) { 092 DocumentModelList docs = loadDocuments(session, ids); 093 MarshallerRegistry registry = Framework.getService(MarshallerRegistry.class); 094 Writer<DocumentModelList> writer = registry.getWriter(renderingCtx, DocumentModelList.class, TEXT_CSV_TYPE); 095 try { 096 writer.write(docs, DocumentModelList.class, null, TEXT_CSV_TYPE, out); 097 } catch (IOException e) { 098 log.error("Unable to write documents", e); 099 } 100 } 101 102 @Override 103 public void endBucket(ComputationContext context, BulkStatus delta) { 104 String commandId = delta.getId(); 105 // Extract header from data 106 String csv = out.toString(); 107 String recordSeparator = CSVFormat.DEFAULT.getRecordSeparator(); 108 String header = getHeader(csv, recordSeparator); 109 String data = getData(csv, recordSeparator); 110 DataBucket dataBucket = new DataBucket(commandId, delta.getProcessed(), data.getBytes(UTF_8), 111 header.getBytes(UTF_8), new byte[0]); 112 Record record = Record.of(commandId, BulkCodecs.getDataBucketCodec().encode(dataBucket)); 113 context.produceRecord(OUTPUT_1, record); 114 out = null; 115 } 116 117 protected String getHeader(String csv, String recordSeparator) { 118 return csv.substring(0, csv.indexOf(recordSeparator) + recordSeparator.length()); 119 } 120 121 protected String getData(String csv, String recordSeparator) { 122 return csv.substring(csv.indexOf(recordSeparator) + recordSeparator.length()); 123 } 124 125 protected List<String> getList(Serializable value) { 126 if (value == null) { 127 return Collections.emptyList(); 128 } 129 if (value instanceof List<?>) { 130 List<?> objects = (List<?>) value; 131 List<String> values = new ArrayList<>(objects.size()); 132 for (Object object : objects) { 133 if (object != null) { 134 values.add(object.toString()); 135 } 136 } 137 Collections.sort(values); 138 return values; 139 } else { 140 log.debug("Illegal parameter '{}'", value); 141 return Collections.emptyList(); 142 } 143 } 144 145 protected String getString(Serializable value) { 146 if (value instanceof String) { 147 return (String) value; 148 } 149 return null; 150 } 151 152}