001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.platform.csv.export.computation; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.nuxeo.ecm.core.io.marshallers.csv.AbstractCSVWriter.TEXT_CSV_TYPE; 023import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.SCHEMAS_CTX_DATA; 024import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.XPATHS_CTX_DATA; 025import static org.nuxeo.ecm.platform.csv.export.io.DocumentPropertyCSVWriter.LANG_CTX_DATA; 026 027import java.io.IOException; 028import java.io.Serializable; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.csv.CSVFormat; 035import org.apache.logging.log4j.LogManager; 036import org.apache.logging.log4j.Logger; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModelList; 039import org.nuxeo.ecm.core.bulk.BulkCodecs; 040import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 041import org.nuxeo.ecm.core.bulk.message.BulkBucket; 042import org.nuxeo.ecm.core.bulk.message.BulkCommand; 043import org.nuxeo.ecm.core.bulk.message.BulkStatus; 044import org.nuxeo.ecm.core.bulk.message.DataBucket; 045import org.nuxeo.ecm.core.io.marshallers.csv.OutputStreamWithCSVWriter; 046import org.nuxeo.ecm.core.io.registry.MarshallerRegistry; 047import org.nuxeo.ecm.core.io.registry.Writer; 048import org.nuxeo.ecm.core.io.registry.context.RenderingContext; 049import org.nuxeo.ecm.platform.csv.export.action.CSVExportAction; 050import org.nuxeo.lib.stream.computation.ComputationContext; 051import org.nuxeo.lib.stream.computation.Record; 052import org.nuxeo.runtime.api.Framework; 053 054/** 055 * Inputs: 056 * <ul> 057 * <li>i1: Reads {@link BulkBucket}</li> 058 * </ul> 059 * Outputs 060 * <ul> 061 * <li>o1: Writes {@link org.nuxeo.lib.stream.computation.Record} containing csv lines</li> 062 * </ul> 063 * 064 * @since 10.3 065 */ 066public class CSVProjectionComputation extends AbstractBulkComputation { 067 068 private static final Logger log = LogManager.getLogger(CSVProjectionComputation.class); 069 070 public static final String PARAM_SCHEMAS = "schemas"; 071 072 public static final String PARAM_XPATHS = "xpaths"; 073 074 public static final String PARAM_LANG = "lang"; 075 076 protected OutputStreamWithCSVWriter out; 077 078 protected RenderingContext renderingCtx; 079 080 public CSVProjectionComputation() { 081 super(CSVExportAction.ACTION_FULL_NAME); 082 } 083 084 @Override 085 public void startBucket(String bucketKey) { 086 out = new OutputStreamWithCSVWriter(); 087 BulkCommand command = getCurrentCommand(); 088 renderingCtx = RenderingContext.CtxBuilder.get(); 089 renderingCtx.setParameterValues(SCHEMAS_CTX_DATA, getList(command.getParams().get(PARAM_SCHEMAS))); 090 renderingCtx.setParameterValues(XPATHS_CTX_DATA, getList(command.getParams().get(PARAM_XPATHS))); 091 renderingCtx.setParameterValues(LANG_CTX_DATA, getString(command.getParams().get(PARAM_LANG))); 092 } 093 094 @Override 095 protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) { 096 DocumentModelList docs = loadDocuments(session, ids); 097 MarshallerRegistry registry = Framework.getService(MarshallerRegistry.class); 098 Writer<DocumentModelList> writer = registry.getWriter(renderingCtx, DocumentModelList.class, TEXT_CSV_TYPE); 099 try { 100 writer.write(docs, DocumentModelList.class, null, TEXT_CSV_TYPE, out); 101 } catch (IOException e) { 102 log.error("Unable to write documents", e); 103 } 104 } 105 106 @Override 107 public void endBucket(ComputationContext context, BulkStatus delta) { 108 String commandId = delta.getId(); 109 // Extract header from data 110 String csv = out.toString(); 111 String recordSeparator = CSVFormat.DEFAULT.getRecordSeparator(); 112 String header = getHeader(csv, recordSeparator); 113 String data = getData(csv, recordSeparator); 114 DataBucket dataBucket = new DataBucket(commandId, delta.getProcessed(), data.getBytes(UTF_8), 115 header.getBytes(UTF_8), new byte[0]); 116 Record record = Record.of(commandId, BulkCodecs.getDataBucketCodec().encode(dataBucket)); 117 context.produceRecord(OUTPUT_1, record); 118 out = null; 119 } 120 121 protected String getHeader(String csv, String recordSeparator) { 122 return csv.substring(0, csv.indexOf(recordSeparator) + recordSeparator.length()); 123 } 124 125 protected String getData(String csv, String recordSeparator) { 126 return csv.substring(csv.indexOf(recordSeparator) + recordSeparator.length()); 127 } 128 129 protected List<String> getList(Serializable value) { 130 if (value == null) { 131 return Collections.emptyList(); 132 } 133 if (value instanceof List<?>) { 134 List<?> objects = (List<?>) value; 135 List<String> values = new ArrayList<>(objects.size()); 136 for (Object object : objects) { 137 if (object != null) { 138 values.add(object.toString()); 139 } 140 } 141 Collections.sort(values); 142 return values; 143 } else { 144 log.debug("Illegal parameter '{}'", value); 145 return Collections.emptyList(); 146 } 147 } 148 149 protected String getString(Serializable value) { 150 if (value instanceof String) { 151 return (String) value; 152 } 153 return null; 154 } 155 156}