001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thomas Roger
018 *     Florent Guillaume
019 *     Julien Carsique
020 */
021package org.nuxeo.ecm.csv.core;
022
023import static java.nio.charset.StandardCharsets.UTF_8;
024import static org.nuxeo.ecm.csv.core.CSVImportLog.Status.ERROR;
025import static org.nuxeo.ecm.csv.core.Constants.CSV_NAME_COL;
026import static org.nuxeo.ecm.csv.core.Constants.CSV_TYPE_COL;
027
028import java.io.BufferedReader;
029import java.io.File;
030import java.io.IOException;
031import java.io.InputStream;
032import java.io.InputStreamReader;
033import java.io.Reader;
034import java.io.Serializable;
035import java.text.DateFormat;
036import java.text.ParseException;
037import java.util.ArrayList;
038import java.util.Arrays;
039import java.util.Collections;
040import java.util.Date;
041import java.util.HashMap;
042import java.util.List;
043import java.util.ListIterator;
044import java.util.Map;
045
046import org.apache.commons.csv.CSVFormat;
047import org.apache.commons.csv.CSVParser;
048import org.apache.commons.csv.CSVRecord;
049import org.apache.commons.io.FilenameUtils;
050import org.apache.commons.io.IOUtils;
051import org.apache.commons.io.input.BOMInputStream;
052import org.apache.commons.lang3.StringUtils;
053import org.apache.logging.log4j.LogManager;
054import org.apache.logging.log4j.Logger;
055import org.nuxeo.common.utils.ExceptionUtils;
056import org.nuxeo.common.utils.Path;
057import org.nuxeo.ecm.automation.AutomationService;
058import org.nuxeo.ecm.automation.OperationChain;
059import org.nuxeo.ecm.automation.OperationContext;
060import org.nuxeo.ecm.automation.core.operations.notification.MailTemplateHelper;
061import org.nuxeo.ecm.automation.core.operations.notification.SendMail;
062import org.nuxeo.ecm.automation.core.scripting.Expression;
063import org.nuxeo.ecm.automation.core.scripting.Scripting;
064import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
065import org.nuxeo.ecm.automation.core.util.StringList;
066import org.nuxeo.ecm.core.api.Blob;
067import org.nuxeo.ecm.core.api.Blobs;
068import org.nuxeo.ecm.core.api.DocumentModel;
069import org.nuxeo.ecm.core.api.DocumentRef;
070import org.nuxeo.ecm.core.api.NuxeoException;
071import org.nuxeo.ecm.core.api.NuxeoPrincipal;
072import org.nuxeo.ecm.core.api.PathRef;
073import org.nuxeo.ecm.core.query.sql.NXQL;
074import org.nuxeo.ecm.core.schema.DocumentType;
075import org.nuxeo.ecm.core.schema.SchemaManager;
076import org.nuxeo.ecm.core.schema.types.ComplexType;
077import org.nuxeo.ecm.core.schema.types.CompositeType;
078import org.nuxeo.ecm.core.schema.types.Field;
079import org.nuxeo.ecm.core.schema.types.ListType;
080import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl;
081import org.nuxeo.ecm.core.schema.types.Type;
082import org.nuxeo.ecm.core.schema.types.primitives.BooleanType;
083import org.nuxeo.ecm.core.schema.types.primitives.DateType;
084import org.nuxeo.ecm.core.schema.types.primitives.DoubleType;
085import org.nuxeo.ecm.core.schema.types.primitives.IntegerType;
086import org.nuxeo.ecm.core.schema.types.primitives.LongType;
087import org.nuxeo.ecm.core.schema.types.primitives.StringType;
088import org.nuxeo.ecm.core.transientstore.api.TransientStore;
089import org.nuxeo.ecm.core.transientstore.work.TransientStoreWork;
090import org.nuxeo.ecm.core.work.api.WorkManager;
091import org.nuxeo.ecm.csv.core.CSVImportLog.Status;
092import org.nuxeo.ecm.platform.ec.notification.NotificationEventListener;
093import org.nuxeo.ecm.platform.ec.notification.service.NotificationServiceHelper;
094import org.nuxeo.ecm.platform.types.TypeManager;
095import org.nuxeo.ecm.platform.url.api.DocumentViewCodecManager;
096import org.nuxeo.ecm.platform.url.codec.api.DocumentViewCodec;
097import org.nuxeo.ecm.platform.usermanager.UserManager;
098import org.nuxeo.runtime.api.Framework;
099
100/**
101 * Work task to import form a CSV file. Because the file is read from the local filesystem, this must be executed in a
102 * local queue. Since NXP-15252 the CSV reader manages "records", not "lines".
103 *
104 * @since 5.7
105 */
106public class CSVImporterWork extends TransientStoreWork {
107
108    public static final String NUXEO_CSV_MAIL_TO = "nuxeo.csv.mail.to";
109
110    public static final String LABEL_CSV_IMPORTER_NOT_EXISTING_FIELD = "label.csv.importer.notExistingField";
111
112    public static final String LABEL_CSV_IMPORTER_CANNOT_CONVERT_FIELD_VALUE = "label.csv.importer.cannotConvertFieldValue";
113
114    public static final String LABEL_CSV_IMPORTER_NOT_EXISTING_FILE = "label.csv.importer.notExistingFile";
115
116    public static final String NUXEO_CSV_BLOBS_FOLDER = "nuxeo.csv.blobs.folder";
117
118    public static final String LABEL_CSV_IMPORTER_DOCUMENT_ALREADY_EXISTS = "label.csv.importer.documentAlreadyExists";
119
120    public static final String LABEL_CSV_IMPORTER_UNABLE_TO_UPDATE = "label.csv.importer.unableToUpdate";
121
122    public static final String LABEL_CSV_IMPORTER_DOCUMENT_UPDATED = "label.csv.importer.documentUpdated";
123
124    public static final String LABEL_CSV_IMPORTER_UNABLE_TO_CREATE = "label.csv.importer.unableToCreate";
125
126    public static final String LABEL_CSV_IMPORTER_PARENT_DOES_NOT_EXIST = "label.csv.importer.parentDoesNotExist";
127
128    public static final String LABEL_CSV_IMPORTER_DOCUMENT_CREATED = "label.csv.importer.documentCreated";
129
130    public static final String LABEL_CSV_IMPORTER_NOT_ALLOWED_SUB_TYPE = "label.csv.importer.notAllowedSubType";
131
132    public static final String LABEL_CSV_IMPORTER_UNABLE_TO_SAVE = "label.csv.importer.unableToSave";
133
134    public static final String LABEL_CSV_IMPORTER_ERROR_IMPORTING_LINE = "label.csv.importer.errorImportingLine";
135
136    public static final String LABEL_CSV_IMPORTER_NOT_EXISTING_TYPE = "label.csv.importer.notExistingType";
137
138    public static final String LABEL_CSV_IMPORTER_MISSING_TYPE_VALUE = "label.csv.importer.missingTypeValue";
139
140    public static final String LABEL_CSV_IMPORTER_MISSING_NAME_VALUE = "label.csv.importer.missingNameValue";
141
142    public static final String LABEL_CSV_IMPORTER_MISSING_NAME_COLUMN = "label.csv.importer.missingNameColumn";
143
144    public static final String LABEL_CSV_IMPORTER_EMPTY_FILE = "label.csv.importer.emptyFile";
145
146    public static final String LABEL_CSV_IMPORTER_ERROR_DURING_IMPORT = "label.csv.importer.errorDuringImport";
147
148    public static final String LABEL_CSV_IMPORTER_EMPTY_LINE = "label.csv.importer.emptyLine";
149
150    private static final long serialVersionUID = 1L;
151
152    private static final Logger log = LogManager.getLogger(CSVImporterWork.class);
153
154    private static final String TEMPLATE_IMPORT_RESULT = "templates/csvImportResult.ftl";
155
156    public static final String CATEGORY_CSV_IMPORTER = "csvImporter";
157
158    public static final String CONTENT_FILED_TYPE_NAME = "content";
159
160    private static final long COMPUTE_TOTAL_THRESHOLD_KB = 1000;
161
162    /**
163     * CSV headers that won't be checked if the field exists on the document type.
164     *
165     * @since 7.3
166     */
167    protected static final List<String> AUTHORIZED_HEADERS = Arrays.asList(NXQL.ECM_LIFECYCLESTATE, NXQL.ECM_UUID);
168
169    protected String parentPath;
170
171    protected String username;
172
173    protected CSVImporterOptions options;
174
175    protected boolean hasTypeColumn;
176
177    protected Date startDate;
178
179    protected ArrayList<CSVImportLog> importLogs = new ArrayList<>();
180
181    protected boolean computeTotal = false;
182
183    protected long total = -1L;
184
185    protected long docsCreatedCount;
186
187    public CSVImporterWork(String id) {
188        super(id);
189    }
190
191    public CSVImporterWork(String repositoryName, String parentPath, String username, Blob csvBlob,
192            CSVImporterOptions options) {
193        super(CSVImportId.create(repositoryName, parentPath, csvBlob));
194        getStore().putBlobs(id, Collections.singletonList(csvBlob));
195        setDocument(repositoryName, null);
196        setOriginatingUsername(username);
197        this.parentPath = parentPath;
198        this.username = username;
199        if (csvBlob.getLength() >= 0 && csvBlob.getLength() / 1024 < COMPUTE_TOTAL_THRESHOLD_KB) {
200            computeTotal = true;
201        }
202        this.options = options;
203        startDate = new Date();
204    }
205
206    @Override
207    public String getCategory() {
208        return CATEGORY_CSV_IMPORTER;
209    }
210
211    @Override
212    public String getTitle() {
213        return String.format("CSV import in '%s'", parentPath);
214    }
215
216    public List<CSVImportLog> getImportLogs() {
217        return new ArrayList<>(importLogs);
218    }
219
220    @Override
221    public void work() {
222        TransientStore store = getStore();
223        setStatus("Importing");
224        openUserSession();
225        CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader()
226                                               .withEscape(options.getEscapeCharacter())
227                                               .withCommentMarker(options.getCommentMarker());
228        try (Reader in = newReader(getBlob()); CSVParser parser = csvFormat.parse(in)) {
229            doImport(parser);
230        } catch (IOException e) {
231            logError(0, "Error while doing the import: %s", LABEL_CSV_IMPORTER_ERROR_DURING_IMPORT, e.getMessage());
232            log.debug(e, e);
233        }
234        store.putParameter(id, "logs", importLogs);
235        if (options.sendEmail()) {
236            setStatus("Sending email");
237            sendMail();
238        }
239        setStatus(null);
240    }
241
242    @Override
243    public void cleanUp(boolean ok, Exception e) {
244        try {
245            super.cleanUp(ok, e);
246        } finally {
247            getStore().putParameter(id, "status", new CSVImportStatus(CSVImportStatus.State.COMPLETED, total, total));
248        }
249    }
250
251    static final Serializable EMPTY_LOGS = new ArrayList<CSVImportLog>();
252
253    String launch() {
254        WorkManager works = Framework.getService(WorkManager.class);
255
256        TransientStore store = getStore();
257        store.putParameter(id, "logs", EMPTY_LOGS);
258        store.putParameter(id, "status", new CSVImportStatus(CSVImportStatus.State.SCHEDULED));
259        works.schedule(this);
260        return id;
261    }
262
263    static CSVImportStatus getStatus(String id) {
264        TransientStore store = getStore();
265        if (!store.exists(id)) {
266            return null;
267        }
268        return (CSVImportStatus) store.getParameter(id, "status");
269    }
270
271    @SuppressWarnings("unchecked")
272    static List<CSVImportLog> getLastImportLogs(String id) {
273        TransientStore store = getStore();
274        if (!store.exists(id)) {
275            return Collections.emptyList();
276        }
277        return (ArrayList<CSVImportLog>) store.getParameter(id, "logs");
278    }
279
280    /**
281     * @since 7.3
282     */
283    protected BufferedReader newReader(Blob blob) throws IOException {
284        return new BufferedReader(new InputStreamReader(new BOMInputStream(blob.getStream())));
285    }
286
287    protected void doImport(CSVParser parser) {
288        log.info("Importing CSV file: {}", () -> getBlob().getFilename());
289        Map<String, Integer> header = parser.getHeaderMap();
290        if (header == null) {
291            logError(0, "No header line, empty file?", LABEL_CSV_IMPORTER_EMPTY_FILE);
292            return;
293        }
294        if (!header.containsKey(CSV_NAME_COL)) {
295            logError(0, "Missing 'name' column", LABEL_CSV_IMPORTER_MISSING_NAME_COLUMN);
296            return;
297        }
298        hasTypeColumn = header.containsKey(CSV_TYPE_COL);
299
300        try {
301            int batchSize = options.getBatchSize();
302            Iterable<CSVRecord> it = parser;
303            if (computeTotal) {
304                try {
305                    List<CSVRecord> l = parser.getRecords();
306                    total = l.size();
307                    it = l;
308                } catch (IOException e) {
309                    log.warn("Could not compute total number of document to be imported");
310                }
311            }
312            for (CSVRecord record : it) {
313                if (record.size() == 0) {
314                    // empty record
315                    importLogs.add(new CSVImportLog(getLineNumber(record), Status.SKIPPED, "Empty record",
316                            LABEL_CSV_IMPORTER_EMPTY_LINE));
317                    continue;
318                }
319                try {
320                    if (importRecord(record, header)) {
321                        docsCreatedCount++;
322                        getStore().putParameter(id, "status",
323                                new CSVImportStatus(CSVImportStatus.State.RUNNING, docsCreatedCount, total));
324                        if (docsCreatedCount % batchSize == 0) {
325                            commitOrRollbackTransaction();
326                            startTransaction();
327                        }
328                    }
329                } catch (NuxeoException e) {
330                    // try next line
331                    Throwable unwrappedException = unwrapException(e);
332                    logError(getLineNumber(parser), "Error while importing line: %s",
333                            LABEL_CSV_IMPORTER_ERROR_IMPORTING_LINE, unwrappedException.getMessage());
334                    log.debug(unwrappedException, unwrappedException);
335                }
336            }
337
338            try {
339                session.save();
340            } catch (NuxeoException e) {
341                Throwable ue = unwrapException(e);
342                logError(getLineNumber(parser), "Unable to save: %s", LABEL_CSV_IMPORTER_UNABLE_TO_SAVE,
343                        ue.getMessage());
344                log.debug(ue, ue);
345            }
346        } finally {
347            commitOrRollbackTransaction();
348            startTransaction();
349        }
350        log.info("Done importing CSV file: {}", () -> getBlob().getFilename());
351    }
352
353    /**
354     * Import a line from the CSV file.
355     *
356     * @return {@code true} if a document has been created or updated, {@code false} otherwise.
357     * @since 6.0
358     */
359    protected boolean importRecord(CSVRecord record, Map<String, Integer> header) {
360        String name = record.get(CSV_NAME_COL);
361        if (StringUtils.isBlank(name)) {
362            log.debug("record.isSet={}", () -> record.isSet(CSV_NAME_COL));
363            logError(getLineNumber(record), "Missing 'name' value", LABEL_CSV_IMPORTER_MISSING_NAME_VALUE);
364            return false;
365        }
366
367        Path targetPath = new Path(parentPath).append(name);
368        name = targetPath.lastSegment();
369        String newParentPath = targetPath.removeLastSegments(1).toString();
370        boolean exists = options.getCSVImporterDocumentFactory().exists(session, newParentPath, name, null);
371
372        DocumentRef docRef = null;
373        String type = null;
374        if (exists) {
375            docRef = new PathRef(targetPath.toString());
376            type = session.getDocument(docRef).getType();
377        } else {
378            if (hasTypeColumn) {
379                type = record.get(CSV_TYPE_COL);
380            }
381            if (StringUtils.isBlank(type)) {
382                log.debug("record.isSet={}", () -> record.isSet(CSV_TYPE_COL));
383                logError(getLineNumber(record), "Missing 'type' value", LABEL_CSV_IMPORTER_MISSING_TYPE_VALUE);
384                return false;
385            }
386        }
387
388        DocumentType docType = Framework.getService(SchemaManager.class).getDocumentType(type);
389        if (docType == null) {
390            logError(getLineNumber(record), "The type '%s' does not exist", LABEL_CSV_IMPORTER_NOT_EXISTING_TYPE, type);
391            return false;
392        }
393        Map<String, Serializable> properties = computePropertiesMap(record, docType, header);
394        if (properties == null) {
395            // skip this line
396            return false;
397        }
398
399        long lineNumber = getLineNumber(record);
400        if (exists) {
401            return updateDocument(lineNumber, docRef, properties);
402        } else {
403            return createDocument(lineNumber, newParentPath, name, type, properties);
404        }
405    }
406
407    // our code expects line numbers to start at 1 for the header and 2 for the line after,
408    // but since commons-csv 1.5 record numbers restart at 1 on the line after the header
409    // thus we need to add 1
410    protected long getLineNumber(CSVRecord record) {
411        return record.getRecordNumber() + 1;
412    }
413
414    protected long getLineNumber(CSVParser parser) {
415        return parser.getRecordNumber() + 1;
416    }
417
418    /**
419     * @since 6.0
420     */
421    protected Map<String, Serializable> computePropertiesMap(CSVRecord record, CompositeType compositeType,
422            Map<String, Integer> header) {
423        Map<String, Serializable> values = new HashMap<>();
424        for (String headerValue : header.keySet()) {
425            String lineValue = record.get(headerValue);
426            lineValue = lineValue.trim();
427            String fieldName = headerValue;
428            if (!CSV_NAME_COL.equals(headerValue) && !CSV_TYPE_COL.equals(headerValue)) {
429                if (AUTHORIZED_HEADERS.contains(headerValue) && !StringUtils.isBlank(lineValue)) {
430                    values.put(headerValue, lineValue);
431                } else {
432                    if (!compositeType.hasField(fieldName)) {
433                        fieldName = fieldName.split(":")[1];
434                    }
435                    if (compositeType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) {
436                        Serializable convertedValue = convertValue(compositeType, fieldName, headerValue, lineValue,
437                                getLineNumber(record));
438                        if (convertedValue == null) {
439                            return null;
440                        }
441                        values.put(headerValue, convertedValue);
442                    }
443                }
444            }
445        }
446        return values;
447    }
448
449    @SuppressWarnings("unchecked")
450    protected Serializable convertValue(CompositeType compositeType, String fieldName, String headerValue,
451            String stringValue, long lineNumber) {
452        if (compositeType.hasField(fieldName)) {
453            Field field = compositeType.getField(fieldName);
454            if (field != null) {
455                try {
456                    Serializable fieldValue = null;
457                    Type fieldType = field.getType();
458                    if (fieldType.isComplexType()) {
459                        if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) {
460                            fieldValue = (Serializable) createBlobFromFilePath(stringValue);
461                            if (fieldValue == null) {
462                                logError(lineNumber, "The file '%s' does not exist",
463                                        LABEL_CSV_IMPORTER_NOT_EXISTING_FILE, stringValue);
464                                return null;
465                            }
466                        } else {
467                            fieldValue = (Serializable) ComplexTypeJSONDecoder.decode((ComplexType) fieldType,
468                                    stringValue);
469                            replaceBlobs((Map<String, Object>) fieldValue);
470                        }
471                    } else {
472                        if (fieldType.isListType()) {
473                            Type listFieldType = ((ListType) fieldType).getFieldType();
474                            if (listFieldType.isSimpleType()) {
475                                /*
476                                 * Array.
477                                 */
478                                fieldValue = stringValue.split(options.getListSeparatorRegex());
479                            } else {
480                                /*
481                                 * Complex list.
482                                 */
483                                fieldValue = (Serializable) ComplexTypeJSONDecoder.decodeList((ListType) fieldType,
484                                        stringValue);
485                                replaceBlobs((List<Object>) fieldValue);
486                            }
487                        } else {
488                            /*
489                             * Primitive type.
490                             */
491                            Type type = field.getType();
492                            if (type instanceof SimpleTypeImpl) {
493                                type = type.getSuperType();
494                            }
495                            if (type.isSimpleType()) {
496                                if (type instanceof StringType) {
497                                    fieldValue = stringValue;
498                                } else if (type instanceof IntegerType) {
499                                    fieldValue = Integer.valueOf(stringValue);
500                                } else if (type instanceof LongType) {
501                                    fieldValue = Long.valueOf(stringValue);
502                                } else if (type instanceof DoubleType) {
503                                    fieldValue = Double.valueOf(stringValue);
504                                } else if (type instanceof BooleanType) {
505                                    fieldValue = Boolean.valueOf(stringValue);
506                                } else if (type instanceof DateType) {
507                                    DateFormat dateFormat = options.getDateFormat();
508                                    fieldValue = dateFormat != null ? dateFormat.parse(stringValue) : stringValue;
509                                }
510                            }
511                        }
512                    }
513                    return fieldValue;
514                } catch (ParseException | NumberFormatException | IOException e) {
515                    logError(lineNumber, "Unable to convert field '%s' with value '%s'",
516                            LABEL_CSV_IMPORTER_CANNOT_CONVERT_FIELD_VALUE, headerValue, stringValue);
517                    log.debug(e, e);
518                }
519            }
520        } else {
521            logError(lineNumber, "Field '%s' does not exist on type '%s'", LABEL_CSV_IMPORTER_NOT_EXISTING_FIELD,
522                    headerValue, compositeType.getName());
523        }
524        return null;
525    }
526
527    /**
528     * Creates a {@code Blob} from a relative file path. The File will be looked up in the folder registered by the
529     * {@code nuxeo.csv.blobs.folder} property.
530     *
531     * @since 9.3
532     */
533    protected Blob createBlobFromFilePath(String fileRelativePath) throws IOException {
534        String blobsFolderPath = Framework.getProperty(NUXEO_CSV_BLOBS_FOLDER);
535        String path = FilenameUtils.normalize(blobsFolderPath + "/" + fileRelativePath);
536        File file = new File(path);
537        if (file.exists()) {
538            return Blobs.createBlob(file, null, null, FilenameUtils.getName(fileRelativePath));
539        } else {
540            return null;
541        }
542    }
543
544    /**
545     * Creates a {@code Blob} from a {@code StringBlob}. Assume that the {@code StringBlob} content is the relative file
546     * path. The File will be looked up in the folder registered by the {@code nuxeo.csv.blobs.folder} property.
547     *
548     * @since 9.3
549     */
550    protected Blob createBlobFromStringBlob(Blob stringBlob) throws IOException {
551        String fileRelativePath = stringBlob.getString();
552        Blob blob = createBlobFromFilePath(fileRelativePath);
553        if (blob == null) {
554            throw new IOException(String.format("File %s does not exist", fileRelativePath));
555        }
556
557        blob.setMimeType(stringBlob.getMimeType());
558        blob.setEncoding(stringBlob.getEncoding());
559        String filename = stringBlob.getFilename();
560        if (filename != null) {
561            blob.setFilename(filename);
562        }
563        return blob;
564    }
565
566    /**
567     * Recursively replaces all {@code Blob}s with {@code Blob}s created from Files stored in the folder registered by
568     * the {@code nuxeo.csv.blobs.folder} property.
569     *
570     * @since 9.3
571     */
572    @SuppressWarnings("unchecked")
573    protected void replaceBlobs(Map<String, Object> map) throws IOException {
574        for (Map.Entry<String, Object> entry : map.entrySet()) {
575            Object value = entry.getValue();
576            if (value instanceof Blob) {
577                Blob blob = (Blob) value;
578                entry.setValue(createBlobFromStringBlob(blob));
579            } else if (value instanceof List) {
580                replaceBlobs((List<Object>) value);
581            } else if (value instanceof Map) {
582                replaceBlobs((Map<String, Object>) value);
583            }
584        }
585    }
586
587    /**
588     * Recursively replaces all {@code Blob}s with {@code Blob}s created from Files stored in the folder registered by
589     * the {@code nuxeo.csv.blobs.folder} property.
590     *
591     * @since 9.3
592     */
593    @SuppressWarnings("unchecked")
594    protected void replaceBlobs(List<Object> list) throws IOException {
595        for (ListIterator<Object> it = list.listIterator(); it.hasNext();) {
596            Object value = it.next();
597            if (value instanceof Blob) {
598                Blob blob = (Blob) value;
599                it.set(createBlobFromStringBlob(blob));
600            } else if (value instanceof List) {
601                replaceBlobs((List<Object>) value);
602            } else if (value instanceof Map) {
603                replaceBlobs((Map<String, Object>) value);
604            }
605        }
606    }
607
608    protected boolean createDocument(long lineNumber, String newParentPath, String name, String type,
609            Map<String, Serializable> properties) {
610        try {
611            DocumentRef parentRef = new PathRef(newParentPath);
612            if (session.exists(parentRef)) {
613                DocumentModel parent = session.getDocument(parentRef);
614
615                TypeManager typeManager = Framework.getService(TypeManager.class);
616                if (options.checkAllowedSubTypes() && !typeManager.isAllowedSubType(type, parent.getType())) {
617                    logError(lineNumber, "'%s' type is not allowed in '%s'", LABEL_CSV_IMPORTER_NOT_ALLOWED_SUB_TYPE,
618                            type, parent.getType());
619                } else {
620                    options.getCSVImporterDocumentFactory()
621                           .createDocument(session, newParentPath, name, type, properties);
622                    importLogs.add(new CSVImportLog(lineNumber, Status.SUCCESS, "Document created",
623                            LABEL_CSV_IMPORTER_DOCUMENT_CREATED));
624                    return true;
625                }
626            } else {
627                logError(lineNumber, "Parent document '%s' does not exist", LABEL_CSV_IMPORTER_PARENT_DOES_NOT_EXIST,
628                        newParentPath);
629            }
630        } catch (RuntimeException e) {
631            Throwable unwrappedException = unwrapException(e);
632            logError(lineNumber, "Unable to create document: %s", LABEL_CSV_IMPORTER_UNABLE_TO_CREATE,
633                    unwrappedException.getMessage());
634            log.debug(unwrappedException, unwrappedException);
635        }
636        return false;
637    }
638
639    protected boolean updateDocument(long lineNumber, DocumentRef docRef, Map<String, Serializable> properties) {
640        if (options.updateExisting()) {
641            try {
642                options.getCSVImporterDocumentFactory().updateDocument(session, docRef, properties);
643                importLogs.add(new CSVImportLog(lineNumber, Status.SUCCESS, "Document updated",
644                        LABEL_CSV_IMPORTER_DOCUMENT_UPDATED));
645                return true;
646            } catch (RuntimeException e) {
647                Throwable unwrappedException = unwrapException(e);
648                logError(lineNumber, "Unable to update document: %s", LABEL_CSV_IMPORTER_UNABLE_TO_UPDATE,
649                        unwrappedException.getMessage());
650                log.debug(unwrappedException, unwrappedException);
651            }
652        } else {
653            importLogs.add(new CSVImportLog(lineNumber, Status.SKIPPED, "Document already exists",
654                    LABEL_CSV_IMPORTER_DOCUMENT_ALREADY_EXISTS));
655        }
656        return false;
657    }
658
659    protected void logError(long lineNumber, String message, String localizedMessage, String... params) {
660        importLogs.add(new CSVImportLog(lineNumber, ERROR, String.format(message, (Object[]) params), localizedMessage,
661                params));
662        String lineMessage = String.format("Line %d", lineNumber);
663        String errorMessage = String.format(message, (Object[]) params);
664        log.error("{}: {}", lineMessage, errorMessage);
665        getStore().putParameter(id, "status",
666                new CSVImportStatus(CSVImportStatus.State.ERROR, docsCreatedCount, docsCreatedCount));
667    }
668
669    protected void sendMail() {
670        UserManager userManager = Framework.getService(UserManager.class);
671        NuxeoPrincipal principal = userManager.getPrincipal(username);
672        String email = principal.getEmail();
673        if (email == null) {
674            log.info("Not sending import result email to '{}', no email configured", username);
675            return;
676        }
677
678        try (OperationContext ctx = new OperationContext(session)) {
679            ctx.setInput(session.getRootDocument());
680
681            CSVImporter csvImporter = Framework.getService(CSVImporter.class);
682            List<CSVImportLog> importerLogs = csvImporter.getImportLogs(getId());
683            CSVImportResult importResult = CSVImportResult.fromImportLogs(importerLogs);
684            List<CSVImportLog> skippedAndErrorImportLogs = csvImporter.getImportLogs(getId(), Status.SKIPPED,
685                    Status.ERROR);
686            ctx.put("importResult", importResult);
687            ctx.put("skippedAndErrorImportLogs", skippedAndErrorImportLogs);
688            ctx.put("csvFilename", getBlob().getFilename());
689            ctx.put("startDate", DateFormat.getInstance().format(startDate));
690            ctx.put("username", username);
691
692            DocumentModel importFolder = session.getDocument(new PathRef(parentPath));
693            String importFolderUrl = getDocumentUrl(importFolder);
694            ctx.put("importFolderTitle", importFolder.getTitle());
695            ctx.put("importFolderUrl", importFolderUrl);
696            ctx.put("userUrl", getUserUrl());
697
698            StringList to = buildRecipientsList(email);
699            Expression from = Scripting.newExpression("Env[\"mail.from\"]");
700            String subject = "CSV Import result of " + getBlob().getFilename();
701            String message = loadTemplate(TEMPLATE_IMPORT_RESULT);
702
703            OperationChain chain = new OperationChain("SendMail");
704            chain.add(SendMail.ID)
705                 .set("from", from)
706                 .set("to", to)
707                 .set("HTML", true)
708                 .set("subject", subject)
709                 .set("message", message);
710            Framework.getService(AutomationService.class).run(ctx, chain);
711        } catch (Exception e) {
712            ExceptionUtils.checkInterrupt(e);
713            log.error("Unable to notify user '{}' for import result of '{}': {}", () -> username,
714                    () -> getBlob().getFilename(), e::getMessage);
715            log.debug(e, e);
716            throw ExceptionUtils.runtimeException(e);
717        }
718    }
719
720    /**
721     * @since 9.1
722     */
723    private Blob getBlob() {
724        return getStore().getBlobs(id).get(0);
725    }
726
727    protected String getDocumentUrl(DocumentModel doc) {
728        return MailTemplateHelper.getDocumentUrl(doc, null);
729    }
730
731    protected String getUserUrl() {
732        DocumentViewCodecManager codecService = Framework.getService(DocumentViewCodecManager.class);
733        DocumentViewCodec codec = codecService.getCodec(NotificationEventListener.NOTIFICATION_DOCUMENT_ID_CODEC_NAME);
734        boolean isNotificationCodec = codec != null;
735        boolean isJSFUI = isNotificationCodec
736                && NotificationEventListener.JSF_NOTIFICATION_DOCUMENT_ID_CODEC_PREFIX.equals(codec.getPrefix());
737        StringBuilder userUrl = new StringBuilder();
738        if (isNotificationCodec) {
739
740            userUrl.append(NotificationServiceHelper.getNotificationService().getServerUrlPrefix());
741            if (!isJSFUI) {
742                userUrl.append("ui/");
743                userUrl.append("#!/");
744            }
745            userUrl.append("user/").append(username);
746        }
747        return userUrl.toString();
748    }
749
750    protected StringList buildRecipientsList(String userEmail) {
751        String csvMailTo = Framework.getProperty(NUXEO_CSV_MAIL_TO);
752        if (StringUtils.isBlank(csvMailTo)) {
753            return new StringList(new String[] { userEmail });
754        } else {
755            return new StringList(new String[] { userEmail, csvMailTo });
756        }
757    }
758
759    private static String loadTemplate(String key) {
760        InputStream io = CSVImporterWork.class.getClassLoader().getResourceAsStream(key);
761        if (io != null) {
762            try {
763                return IOUtils.toString(io, UTF_8);
764            } catch (IOException e) {
765                // cannot happen
766                throw new NuxeoException(e);
767            } finally {
768                try {
769                    io.close();
770                } catch (IOException e) {
771                    // nothing to do
772                }
773            }
774        }
775        return null;
776    }
777
778    public static Throwable unwrapException(Throwable t) {
779        Throwable cause = null;
780        if (t != null) {
781            cause = t.getCause();
782        }
783        if (cause == null) {
784            return t;
785        } else {
786            return unwrapException(cause);
787        }
788    }
789
790}