001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.platform.csv.export.computation;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.nuxeo.ecm.core.io.marshallers.csv.AbstractCSVWriter.TEXT_CSV_TYPE;
023import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.SCHEMAS_CTX_DATA;
024import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.XPATHS_CTX_DATA;
025import static org.nuxeo.ecm.platform.csv.export.io.DocumentPropertyCSVWriter.LANG_CTX_DATA;
026
027import java.io.IOException;
028import java.io.Serializable;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.csv.CSVFormat;
035import org.apache.logging.log4j.LogManager;
036import org.apache.logging.log4j.Logger;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModelList;
039import org.nuxeo.ecm.core.bulk.BulkCodecs;
040import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
041import org.nuxeo.ecm.core.bulk.message.BulkCommand;
042import org.nuxeo.ecm.core.bulk.message.BulkStatus;
043import org.nuxeo.ecm.core.bulk.message.DataBucket;
044import org.nuxeo.ecm.core.io.marshallers.csv.OutputStreamWithCSVWriter;
045import org.nuxeo.ecm.core.io.registry.MarshallerRegistry;
046import org.nuxeo.ecm.core.io.registry.Writer;
047import org.nuxeo.ecm.core.io.registry.context.RenderingContext;
048import org.nuxeo.ecm.platform.csv.export.action.CSVExportAction;
049import org.nuxeo.lib.stream.computation.ComputationContext;
050import org.nuxeo.lib.stream.computation.Record;
051import org.nuxeo.runtime.api.Framework;
052
053/**
054 * </ul>
055 * Outputs
056 * <ul>
057 * <li>o1: Writes {@link org.nuxeo.lib.stream.computation.Record} containing csv lines</li>
058 * </ul>
059 *
060 * @since 10.3
061 */
062public class CSVProjectionComputation extends AbstractBulkComputation {
063
064    private static final Logger log = LogManager.getLogger(CSVProjectionComputation.class);
065
066    public static final String PARAM_SCHEMAS = "schemas";
067
068    public static final String PARAM_XPATHS = "xpaths";
069
070    public static final String PARAM_LANG = "lang";
071
072    protected OutputStreamWithCSVWriter out;
073
074    protected RenderingContext renderingCtx;
075
076    public CSVProjectionComputation() {
077        super(CSVExportAction.ACTION_NAME);
078    }
079
080    @Override
081    public void startBucket(String bucketKey) {
082        out = new OutputStreamWithCSVWriter();
083        BulkCommand command = getCurrentCommand();
084        renderingCtx = RenderingContext.CtxBuilder.get();
085        renderingCtx.setParameterValues(SCHEMAS_CTX_DATA, getList(command.getParams().get(PARAM_SCHEMAS)));
086        renderingCtx.setParameterValues(XPATHS_CTX_DATA, getList(command.getParams().get(PARAM_XPATHS)));
087        renderingCtx.setParameterValues(LANG_CTX_DATA, getString(command.getParams().get(PARAM_LANG)));
088    }
089
090    @Override
091    protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
092        DocumentModelList docs = loadDocuments(session, ids);
093        MarshallerRegistry registry = Framework.getService(MarshallerRegistry.class);
094        Writer<DocumentModelList> writer = registry.getWriter(renderingCtx, DocumentModelList.class, TEXT_CSV_TYPE);
095        try {
096            writer.write(docs, DocumentModelList.class, null, TEXT_CSV_TYPE, out);
097        } catch (IOException e) {
098            log.error("Unable to write documents", e);
099        }
100    }
101
102    @Override
103    public void endBucket(ComputationContext context, BulkStatus delta) {
104        String commandId = delta.getId();
105        // Extract header from data
106        String csv = out.toString();
107        String recordSeparator = CSVFormat.DEFAULT.getRecordSeparator();
108        String header = getHeader(csv, recordSeparator);
109        String data = getData(csv, recordSeparator);
110        DataBucket dataBucket = new DataBucket(commandId, delta.getProcessed(), data.getBytes(UTF_8),
111                header.getBytes(UTF_8), new byte[0]);
112        Record record = Record.of(commandId, BulkCodecs.getDataBucketCodec().encode(dataBucket));
113        context.produceRecord(OUTPUT_1, record);
114        out = null;
115    }
116
117    protected String getHeader(String csv, String recordSeparator) {
118        return csv.substring(0, csv.indexOf(recordSeparator) + recordSeparator.length());
119    }
120
121    protected String getData(String csv, String recordSeparator) {
122        return csv.substring(csv.indexOf(recordSeparator) + recordSeparator.length());
123    }
124
125    protected List<String> getList(Serializable value) {
126        if (value == null) {
127            return Collections.emptyList();
128        }
129        if (value instanceof List<?>) {
130            List<?> objects = (List<?>) value;
131            List<String> values = new ArrayList<>(objects.size());
132            for (Object object : objects) {
133                if (object != null) {
134                    values.add(object.toString());
135                }
136            }
137            Collections.sort(values);
138            return values;
139        } else {
140            log.debug("Illegal parameter '{}'", value);
141            return Collections.emptyList();
142        }
143    }
144
145    protected String getString(Serializable value) {
146        if (value instanceof String) {
147            return (String) value;
148        }
149        return null;
150    }
151
152}