001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.platform.csv.export.computation;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.nuxeo.ecm.core.io.marshallers.csv.AbstractCSVWriter.TEXT_CSV_TYPE;
023import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.SCHEMAS_CTX_DATA;
024import static org.nuxeo.ecm.platform.csv.export.io.DocumentModelCSVWriter.XPATHS_CTX_DATA;
025import static org.nuxeo.ecm.platform.csv.export.io.DocumentPropertyCSVWriter.LANG_CTX_DATA;
026
027import java.io.IOException;
028import java.io.Serializable;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.csv.CSVFormat;
035import org.apache.logging.log4j.LogManager;
036import org.apache.logging.log4j.Logger;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModelList;
039import org.nuxeo.ecm.core.bulk.BulkCodecs;
040import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
041import org.nuxeo.ecm.core.bulk.message.BulkBucket;
042import org.nuxeo.ecm.core.bulk.message.BulkCommand;
043import org.nuxeo.ecm.core.bulk.message.BulkStatus;
044import org.nuxeo.ecm.core.bulk.message.DataBucket;
045import org.nuxeo.ecm.core.io.marshallers.csv.OutputStreamWithCSVWriter;
046import org.nuxeo.ecm.core.io.registry.MarshallerRegistry;
047import org.nuxeo.ecm.core.io.registry.Writer;
048import org.nuxeo.ecm.core.io.registry.context.RenderingContext;
049import org.nuxeo.ecm.platform.csv.export.action.CSVExportAction;
050import org.nuxeo.lib.stream.computation.ComputationContext;
051import org.nuxeo.lib.stream.computation.Record;
052import org.nuxeo.runtime.api.Framework;
053
054/**
055 * Inputs:
056 * <ul>
057 * <li>i1: Reads {@link BulkBucket}</li>
058 * </ul>
059 * Outputs
060 * <ul>
061 * <li>o1: Writes {@link org.nuxeo.lib.stream.computation.Record} containing csv lines</li>
062 * </ul>
063 *
064 * @since 10.3
065 */
066public class CSVProjectionComputation extends AbstractBulkComputation {
067
068    private static final Logger log = LogManager.getLogger(CSVProjectionComputation.class);
069
070    public static final String PARAM_SCHEMAS = "schemas";
071
072    public static final String PARAM_XPATHS = "xpaths";
073
074    public static final String PARAM_LANG = "lang";
075
076    protected OutputStreamWithCSVWriter out;
077
078    protected RenderingContext renderingCtx;
079
080    public CSVProjectionComputation() {
081        super(CSVExportAction.ACTION_FULL_NAME);
082    }
083
084    @Override
085    public void startBucket(String bucketKey) {
086        out = new OutputStreamWithCSVWriter();
087        BulkCommand command = getCurrentCommand();
088        renderingCtx = RenderingContext.CtxBuilder.get();
089        renderingCtx.setParameterValues(SCHEMAS_CTX_DATA, getList(command.getParams().get(PARAM_SCHEMAS)));
090        renderingCtx.setParameterValues(XPATHS_CTX_DATA, getList(command.getParams().get(PARAM_XPATHS)));
091        renderingCtx.setParameterValues(LANG_CTX_DATA, getString(command.getParams().get(PARAM_LANG)));
092    }
093
094    @Override
095    protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
096        DocumentModelList docs = loadDocuments(session, ids);
097        MarshallerRegistry registry = Framework.getService(MarshallerRegistry.class);
098        Writer<DocumentModelList> writer = registry.getWriter(renderingCtx, DocumentModelList.class, TEXT_CSV_TYPE);
099        try {
100            writer.write(docs, DocumentModelList.class, null, TEXT_CSV_TYPE, out);
101        } catch (IOException e) {
102            log.error("Unable to write documents", e);
103        }
104    }
105
106    @Override
107    public void endBucket(ComputationContext context, BulkStatus delta) {
108        String commandId = delta.getId();
109        // Extract header from data
110        String csv = out.toString();
111        String recordSeparator = CSVFormat.DEFAULT.getRecordSeparator();
112        String header = getHeader(csv, recordSeparator);
113        String data = getData(csv, recordSeparator);
114        DataBucket dataBucket = new DataBucket(commandId, delta.getProcessed(), data.getBytes(UTF_8),
115                header.getBytes(UTF_8), new byte[0]);
116        Record record = Record.of(commandId, BulkCodecs.getDataBucketCodec().encode(dataBucket));
117        context.produceRecord(OUTPUT_1, record);
118        out = null;
119    }
120
121    protected String getHeader(String csv, String recordSeparator) {
122        return csv.substring(0, csv.indexOf(recordSeparator) + recordSeparator.length());
123    }
124
125    protected String getData(String csv, String recordSeparator) {
126        return csv.substring(csv.indexOf(recordSeparator) + recordSeparator.length());
127    }
128
129    protected List<String> getList(Serializable value) {
130        if (value == null) {
131            return Collections.emptyList();
132        }
133        if (value instanceof List<?>) {
134            List<?> objects = (List<?>) value;
135            List<String> values = new ArrayList<>(objects.size());
136            for (Object object : objects) {
137                if (object != null) {
138                    values.add(object.toString());
139                }
140            }
141            Collections.sort(values);
142            return values;
143        } else {
144            log.debug("Illegal parameter '{}'", value);
145            return Collections.emptyList();
146        }
147    }
148
149    protected String getString(Serializable value) {
150        if (value instanceof String) {
151            return (String) value;
152        }
153        return null;
154    }
155
156}