001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thomas Roger
018 *     Florent Guillaume
019 *     Julien Carsique
020 */
021package org.nuxeo.ecm.csv.core;
022
023import static java.nio.charset.StandardCharsets.UTF_8;
024import static org.nuxeo.ecm.csv.core.CSVImportLog.Status.ERROR;
025import static org.nuxeo.ecm.csv.core.Constants.CSV_NAME_COL;
026import static org.nuxeo.ecm.csv.core.Constants.CSV_TYPE_COL;
027
028import java.io.BufferedReader;
029import java.io.File;
030import java.io.IOException;
031import java.io.InputStream;
032import java.io.InputStreamReader;
033import java.io.Reader;
034import java.io.Serializable;
035import java.text.DateFormat;
036import java.text.ParseException;
037import java.text.SimpleDateFormat;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collections;
041import java.util.Date;
042import java.util.HashMap;
043import java.util.List;
044import java.util.ListIterator;
045import java.util.Map;
046
047import org.apache.commons.csv.CSVFormat;
048import org.apache.commons.csv.CSVParser;
049import org.apache.commons.csv.CSVRecord;
050import org.apache.commons.io.FilenameUtils;
051import org.apache.commons.io.IOUtils;
052import org.apache.commons.io.input.BOMInputStream;
053import org.apache.commons.lang3.StringUtils;
054import org.apache.commons.logging.Log;
055import org.apache.commons.logging.LogFactory;
056import org.nuxeo.common.utils.ExceptionUtils;
057import org.nuxeo.common.utils.Path;
058import org.nuxeo.ecm.automation.AutomationService;
059import org.nuxeo.ecm.automation.OperationChain;
060import org.nuxeo.ecm.automation.OperationContext;
061import org.nuxeo.ecm.automation.core.operations.notification.MailTemplateHelper;
062import org.nuxeo.ecm.automation.core.operations.notification.SendMail;
063import org.nuxeo.ecm.automation.core.scripting.Expression;
064import org.nuxeo.ecm.automation.core.scripting.Scripting;
065import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder;
066import org.nuxeo.ecm.automation.core.util.StringList;
067import org.nuxeo.ecm.core.api.Blob;
068import org.nuxeo.ecm.core.api.Blobs;
069import org.nuxeo.ecm.core.api.DocumentModel;
070import org.nuxeo.ecm.core.api.DocumentRef;
071import org.nuxeo.ecm.core.api.NuxeoException;
072import org.nuxeo.ecm.core.api.NuxeoPrincipal;
073import org.nuxeo.ecm.core.api.PathRef;
074import org.nuxeo.ecm.core.query.sql.NXQL;
075import org.nuxeo.ecm.core.schema.DocumentType;
076import org.nuxeo.ecm.core.schema.SchemaManager;
077import org.nuxeo.ecm.core.schema.types.ComplexType;
078import org.nuxeo.ecm.core.schema.types.CompositeType;
079import org.nuxeo.ecm.core.schema.types.Field;
080import org.nuxeo.ecm.core.schema.types.ListType;
081import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl;
082import org.nuxeo.ecm.core.schema.types.Type;
083import org.nuxeo.ecm.core.schema.types.primitives.BooleanType;
084import org.nuxeo.ecm.core.schema.types.primitives.DateType;
085import org.nuxeo.ecm.core.schema.types.primitives.DoubleType;
086import org.nuxeo.ecm.core.schema.types.primitives.IntegerType;
087import org.nuxeo.ecm.core.schema.types.primitives.LongType;
088import org.nuxeo.ecm.core.schema.types.primitives.StringType;
089import org.nuxeo.ecm.core.transientstore.api.TransientStore;
090import org.nuxeo.ecm.core.transientstore.work.TransientStoreWork;
091import org.nuxeo.ecm.core.work.api.WorkManager;
092import org.nuxeo.ecm.csv.core.CSVImportLog.Status;
093import org.nuxeo.ecm.platform.ec.notification.NotificationEventListener;
094import org.nuxeo.ecm.platform.ec.notification.service.NotificationServiceHelper;
095import org.nuxeo.ecm.platform.types.TypeManager;
096import org.nuxeo.ecm.platform.url.api.DocumentViewCodecManager;
097import org.nuxeo.ecm.platform.url.codec.api.DocumentViewCodec;
098import org.nuxeo.ecm.platform.usermanager.UserManager;
099import org.nuxeo.runtime.api.Framework;
100
101/**
102 * Work task to import form a CSV file. Because the file is read from the local filesystem, this must be executed in a
103 * local queue. Since NXP-15252 the CSV reader manages "records", not "lines".
104 *
105 * @since 5.7
106 */
107public class CSVImporterWork extends TransientStoreWork {
108
109    public static final String NUXEO_CSV_MAIL_TO = "nuxeo.csv.mail.to";
110
111    public static final String LABEL_CSV_IMPORTER_NOT_EXISTING_FIELD = "label.csv.importer.notExistingField";
112
113    public static final String LABEL_CSV_IMPORTER_CANNOT_CONVERT_FIELD_VALUE = "label.csv.importer.cannotConvertFieldValue";
114
115    public static final String LABEL_CSV_IMPORTER_NOT_EXISTING_FILE = "label.csv.importer.notExistingFile";
116
117    public static final String NUXEO_CSV_BLOBS_FOLDER = "nuxeo.csv.blobs.folder";
118
119    public static final String LABEL_CSV_IMPORTER_DOCUMENT_ALREADY_EXISTS = "label.csv.importer.documentAlreadyExists";
120
121    public static final String LABEL_CSV_IMPORTER_UNABLE_TO_UPDATE = "label.csv.importer.unableToUpdate";
122
123    public static final String LABEL_CSV_IMPORTER_DOCUMENT_UPDATED = "label.csv.importer.documentUpdated";
124
125    public static final String LABEL_CSV_IMPORTER_UNABLE_TO_CREATE = "label.csv.importer.unableToCreate";
126
127    public static final String LABEL_CSV_IMPORTER_PARENT_DOES_NOT_EXIST = "label.csv.importer.parentDoesNotExist";
128
129    public static final String LABEL_CSV_IMPORTER_DOCUMENT_CREATED = "label.csv.importer.documentCreated";
130
131    public static final String LABEL_CSV_IMPORTER_NOT_ALLOWED_SUB_TYPE = "label.csv.importer.notAllowedSubType";
132
133    public static final String LABEL_CSV_IMPORTER_UNABLE_TO_SAVE = "label.csv.importer.unableToSave";
134
135    public static final String LABEL_CSV_IMPORTER_ERROR_IMPORTING_LINE = "label.csv.importer.errorImportingLine";
136
137    public static final String LABEL_CSV_IMPORTER_NOT_EXISTING_TYPE = "label.csv.importer.notExistingType";
138
139    public static final String LABEL_CSV_IMPORTER_MISSING_TYPE_VALUE = "label.csv.importer.missingTypeValue";
140
141    public static final String LABEL_CSV_IMPORTER_MISSING_NAME_VALUE = "label.csv.importer.missingNameValue";
142
143    public static final String LABEL_CSV_IMPORTER_MISSING_NAME_COLUMN = "label.csv.importer.missingNameColumn";
144
145    public static final String LABEL_CSV_IMPORTER_EMPTY_FILE = "label.csv.importer.emptyFile";
146
147    public static final String LABEL_CSV_IMPORTER_ERROR_DURING_IMPORT = "label.csv.importer.errorDuringImport";
148
149    public static final String LABEL_CSV_IMPORTER_EMPTY_LINE = "label.csv.importer.emptyLine";
150
151    private static final long serialVersionUID = 1L;
152
153    private static final Log log = LogFactory.getLog(CSVImporterWork.class);
154
155    private static final String TEMPLATE_IMPORT_RESULT = "templates/csvImportResult.ftl";
156
157    public static final String CATEGORY_CSV_IMPORTER = "csvImporter";
158
159    public static final String CONTENT_FILED_TYPE_NAME = "content";
160
161    private static final long COMPUTE_TOTAL_THRESHOLD_KB = 1000;
162
163    /**
164     * CSV headers that won't be checked if the field exists on the document type.
165     *
166     * @since 7.3
167     */
168    protected static final List<String> AUTHORIZED_HEADERS = Arrays.asList(NXQL.ECM_LIFECYCLESTATE, NXQL.ECM_UUID);
169
170    protected String parentPath;
171
172    protected String username;
173
174    protected CSVImporterOptions options;
175
176    protected transient DateFormat dateformat;
177
178    protected boolean hasTypeColumn;
179
180    protected Date startDate;
181
182    protected ArrayList<CSVImportLog> importLogs = new ArrayList<>();
183
184    protected boolean computeTotal = false;
185
186    protected long total = -1L;
187
188    protected long docsCreatedCount;
189
190    public CSVImporterWork(String id) {
191        super(id);
192    }
193
194    public CSVImporterWork(String repositoryName, String parentPath, String username, Blob csvBlob,
195            CSVImporterOptions options) {
196        super(CSVImportId.create(repositoryName, parentPath, csvBlob));
197        getStore().putBlobs(id, Collections.singletonList(csvBlob));
198        setDocument(repositoryName, null);
199        setOriginatingUsername(username);
200        this.parentPath = parentPath;
201        this.username = username;
202        if (csvBlob.getLength() >= 0 && csvBlob.getLength() / 1024 < COMPUTE_TOTAL_THRESHOLD_KB) {
203            computeTotal = true;
204        }
205        this.options = options;
206        startDate = new Date();
207    }
208
209    @Override
210    public String getCategory() {
211        return CATEGORY_CSV_IMPORTER;
212    }
213
214    @Override
215    public String getTitle() {
216        return String.format("CSV import in '%s'", parentPath);
217    }
218
219    public List<CSVImportLog> getImportLogs() {
220        return new ArrayList<>(importLogs);
221    }
222
223    @Override
224    public void work() {
225        TransientStore store = getStore();
226        setStatus("Importing");
227        openUserSession();
228        CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader().withEscape(options.getEscapeCharacter()).withCommentMarker(
229                options.getCommentMarker());
230        try (Reader in = newReader(getBlob()); CSVParser parser = csvFormat.parse(in)) {
231            doImport(parser);
232        } catch (IOException e) {
233            logError(0, "Error while doing the import: %s", LABEL_CSV_IMPORTER_ERROR_DURING_IMPORT, e.getMessage());
234            log.debug(e, e);
235        }
236        store.putParameter(id, "logs", importLogs);
237        if (options.sendEmail()) {
238            setStatus("Sending email");
239            sendMail();
240        }
241        setStatus(null);
242    }
243
244    @Override
245    public void cleanUp(boolean ok, Exception e) {
246        try {
247            super.cleanUp(ok, e);
248        } finally {
249            getStore().putParameter(id, "status", new CSVImportStatus(CSVImportStatus.State.COMPLETED, total, total));
250        }
251    }
252
253    static final Serializable EMPTY_LOGS = new ArrayList<CSVImportLog>();
254
255    String launch() {
256        WorkManager works = Framework.getService(WorkManager.class);
257
258        TransientStore store = getStore();
259        store.putParameter(id, "logs", EMPTY_LOGS);
260        store.putParameter(id, "status", new CSVImportStatus(CSVImportStatus.State.SCHEDULED));
261        works.schedule(this);
262        return id;
263    }
264
265    static CSVImportStatus getStatus(String id) {
266        TransientStore store = getStore();
267        if (!store.exists(id)) {
268            return null;
269        }
270        return (CSVImportStatus) store.getParameter(id, "status");
271    }
272
273    @SuppressWarnings("unchecked")
274    static List<CSVImportLog> getLastImportLogs(String id) {
275        TransientStore store = getStore();
276        if (!store.exists(id)) {
277            return Collections.emptyList();
278        }
279        return (ArrayList<CSVImportLog>) store.getParameter(id, "logs");
280    }
281
282    /**
283     * @since 7.3
284     */
285    protected BufferedReader newReader(Blob blob) throws IOException {
286        return new BufferedReader(new InputStreamReader(new BOMInputStream(blob.getStream())));
287    }
288
289    protected void doImport(CSVParser parser) {
290        log.info(String.format("Importing CSV file: %s", getBlob().getFilename()));
291        Map<String, Integer> header = parser.getHeaderMap();
292        if (header == null) {
293            logError(0, "No header line, empty file?", LABEL_CSV_IMPORTER_EMPTY_FILE);
294            return;
295        }
296        if (!header.containsKey(CSV_NAME_COL)) {
297            logError(0, "Missing 'name' column", LABEL_CSV_IMPORTER_MISSING_NAME_COLUMN);
298            return;
299        }
300        hasTypeColumn = header.containsKey(CSV_TYPE_COL);
301
302        try {
303            int batchSize = options.getBatchSize();
304            Iterable<CSVRecord> it = parser;
305            if (computeTotal) {
306                try {
307                    List<CSVRecord> l = parser.getRecords();
308                    total = l.size();
309                    it = l;
310                } catch (IOException e) {
311                    log.warn("Could not compute total number of document to be imported");
312                }
313            }
314            for (CSVRecord record : it) {
315                if (record.size() == 0) {
316                    // empty record
317                    importLogs.add(new CSVImportLog(getLineNumber(record), Status.SKIPPED, "Empty record",
318                            LABEL_CSV_IMPORTER_EMPTY_LINE));
319                    continue;
320                }
321                try {
322                    if (importRecord(record, header)) {
323                        docsCreatedCount++;
324                        getStore().putParameter(id, "status",
325                                new CSVImportStatus(CSVImportStatus.State.RUNNING, docsCreatedCount, total));
326                        if (docsCreatedCount % batchSize == 0) {
327                            commitOrRollbackTransaction();
328                            startTransaction();
329                        }
330                    }
331                } catch (NuxeoException e) {
332                    // try next line
333                    Throwable unwrappedException = unwrapException(e);
334                    logError(getLineNumber(parser), "Error while importing line: %s",
335                            LABEL_CSV_IMPORTER_ERROR_IMPORTING_LINE, unwrappedException.getMessage());
336                    log.debug(unwrappedException, unwrappedException);
337                }
338            }
339
340            try {
341                session.save();
342            } catch (NuxeoException e) {
343                Throwable ue = unwrapException(e);
344                logError(getLineNumber(parser), "Unable to save: %s", LABEL_CSV_IMPORTER_UNABLE_TO_SAVE,
345                        ue.getMessage());
346                log.debug(ue, ue);
347            }
348        } finally {
349            commitOrRollbackTransaction();
350            startTransaction();
351        }
352        log.info(String.format("Done importing CSV file: %s", getBlob().getFilename()));
353    }
354
355    /**
356     * Import a line from the CSV file.
357     *
358     * @return {@code true} if a document has been created or updated, {@code false} otherwise.
359     * @since 6.0
360     */
361    protected boolean importRecord(CSVRecord record, Map<String, Integer> header) {
362        String name = record.get(CSV_NAME_COL);
363        if (StringUtils.isBlank(name)) {
364            log.debug("record.isSet=" + record.isSet(CSV_NAME_COL));
365            logError(getLineNumber(record), "Missing 'name' value", LABEL_CSV_IMPORTER_MISSING_NAME_VALUE);
366            return false;
367        }
368
369        Path targetPath = new Path(parentPath).append(name);
370        name = targetPath.lastSegment();
371        String newParentPath = targetPath.removeLastSegments(1).toString();
372        boolean exists = options.getCSVImporterDocumentFactory().exists(session, newParentPath, name, null);
373
374        DocumentRef docRef = null;
375        String type = null;
376        if (exists) {
377            docRef = new PathRef(targetPath.toString());
378            type = session.getDocument(docRef).getType();
379        } else {
380            if (hasTypeColumn) {
381                type = record.get(CSV_TYPE_COL);
382            }
383            if (StringUtils.isBlank(type)) {
384                log.debug("record.isSet=" + record.isSet(CSV_TYPE_COL));
385                logError(getLineNumber(record), "Missing 'type' value", LABEL_CSV_IMPORTER_MISSING_TYPE_VALUE);
386                return false;
387            }
388        }
389
390        DocumentType docType = Framework.getService(SchemaManager.class).getDocumentType(type);
391        if (docType == null) {
392            logError(getLineNumber(record), "The type '%s' does not exist", LABEL_CSV_IMPORTER_NOT_EXISTING_TYPE, type);
393            return false;
394        }
395        Map<String, Serializable> properties = computePropertiesMap(record, docType, header);
396        if (properties == null) {
397            // skip this line
398            return false;
399        }
400
401        long lineNumber = getLineNumber(record);
402        if (exists) {
403            return updateDocument(lineNumber, docRef, properties);
404        } else {
405            return createDocument(lineNumber, newParentPath, name, type, properties);
406        }
407    }
408
409    // our code expects line numbers to start at 1 for the header and 2 for the line after,
410    // but since commons-csv 1.5 record numbers restart at 1 on the line after the header
411    // thus we need to add 1
412    protected long getLineNumber(CSVRecord record) {
413        return record.getRecordNumber() + 1;
414    }
415
416    protected long getLineNumber(CSVParser parser) {
417        return parser.getRecordNumber() + 1;
418    }
419
420    /**
421     * @since 6.0
422     */
423    protected Map<String, Serializable> computePropertiesMap(CSVRecord record, CompositeType compositeType,
424            Map<String, Integer> header) {
425        Map<String, Serializable> values = new HashMap<>();
426        for (String headerValue : header.keySet()) {
427            String lineValue = record.get(headerValue);
428            lineValue = lineValue.trim();
429            String fieldName = headerValue;
430            if (!CSV_NAME_COL.equals(headerValue) && !CSV_TYPE_COL.equals(headerValue)) {
431                if (AUTHORIZED_HEADERS.contains(headerValue) && !StringUtils.isBlank(lineValue)) {
432                    values.put(headerValue, lineValue);
433                } else {
434                    if (!compositeType.hasField(fieldName)) {
435                        fieldName = fieldName.split(":")[1];
436                    }
437                    if (compositeType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) {
438                        Serializable convertedValue = convertValue(compositeType, fieldName, headerValue, lineValue,
439                                getLineNumber(record));
440                        if (convertedValue == null) {
441                            return null;
442                        }
443                        values.put(headerValue, convertedValue);
444                    }
445                }
446            }
447        }
448        return values;
449    }
450
451    @SuppressWarnings("unchecked")
452    protected Serializable convertValue(CompositeType compositeType, String fieldName, String headerValue,
453            String stringValue, long lineNumber) {
454        if (compositeType.hasField(fieldName)) {
455            Field field = compositeType.getField(fieldName);
456            if (field != null) {
457                try {
458                    Serializable fieldValue = null;
459                    Type fieldType = field.getType();
460                    if (fieldType.isComplexType()) {
461                        if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) {
462                            fieldValue = (Serializable) createBlobFromFilePath(stringValue);
463                            if (fieldValue == null) {
464                                logError(lineNumber, "The file '%s' does not exist",
465                                        LABEL_CSV_IMPORTER_NOT_EXISTING_FILE, stringValue);
466                                return null;
467                            }
468                        } else {
469                            fieldValue = (Serializable) ComplexTypeJSONDecoder.decode((ComplexType) fieldType,
470                                    stringValue);
471                            replaceBlobs((Map<String, Object>) fieldValue);
472                        }
473                    } else {
474                        if (fieldType.isListType()) {
475                            Type listFieldType = ((ListType) fieldType).getFieldType();
476                            if (listFieldType.isSimpleType()) {
477                                /*
478                                 * Array.
479                                 */
480                                fieldValue = stringValue.split(options.getListSeparatorRegex());
481                            } else {
482                                /*
483                                 * Complex list.
484                                 */
485                                fieldValue = (Serializable) ComplexTypeJSONDecoder.decodeList((ListType) fieldType,
486                                        stringValue);
487                                replaceBlobs((List<Object>) fieldValue);
488                            }
489                        } else {
490                            /*
491                             * Primitive type.
492                             */
493                            Type type = field.getType();
494                            if (type instanceof SimpleTypeImpl) {
495                                type = type.getSuperType();
496                            }
497                            if (type.isSimpleType()) {
498                                if (type instanceof StringType) {
499                                    fieldValue = stringValue;
500                                } else if (type instanceof IntegerType) {
501                                    fieldValue = Integer.valueOf(stringValue);
502                                } else if (type instanceof LongType) {
503                                    fieldValue = Long.valueOf(stringValue);
504                                } else if (type instanceof DoubleType) {
505                                    fieldValue = Double.valueOf(stringValue);
506                                } else if (type instanceof BooleanType) {
507                                    fieldValue = Boolean.valueOf(stringValue);
508                                } else if (type instanceof DateType) {
509                                    fieldValue = getDateFormat().parse(stringValue);
510                                }
511                            }
512                        }
513                    }
514                    return fieldValue;
515                } catch (ParseException | NumberFormatException | IOException e) {
516                    logError(lineNumber, "Unable to convert field '%s' with value '%s'",
517                            LABEL_CSV_IMPORTER_CANNOT_CONVERT_FIELD_VALUE, headerValue, stringValue);
518                    log.debug(e, e);
519                }
520            }
521        } else {
522            logError(lineNumber, "Field '%s' does not exist on type '%s'", LABEL_CSV_IMPORTER_NOT_EXISTING_FIELD,
523                    headerValue, compositeType.getName());
524        }
525        return null;
526    }
527
528    /**
529     * Creates a {@code Blob} from a relative file path. The File will be looked up in the folder registered by the
530     * {@code nuxeo.csv.blobs.folder} property.
531     *
532     * @since 9.3
533     */
534    protected Blob createBlobFromFilePath(String fileRelativePath) throws IOException {
535        String blobsFolderPath = Framework.getProperty(NUXEO_CSV_BLOBS_FOLDER);
536        String path = FilenameUtils.normalize(blobsFolderPath + "/" + fileRelativePath);
537        File file = new File(path);
538        if (file.exists()) {
539            return Blobs.createBlob(file, null, null, FilenameUtils.getName(fileRelativePath));
540        } else {
541            return null;
542        }
543    }
544
545    /**
546     * Creates a {@code Blob} from a {@code StringBlob}. Assume that the {@code StringBlob} content is the relative file
547     * path. The File will be looked up in the folder registered by the {@code nuxeo.csv.blobs.folder} property.
548     *
549     * @since 9.3
550     */
551    protected Blob createBlobFromStringBlob(Blob stringBlob) throws IOException {
552        String fileRelativePath = stringBlob.getString();
553        Blob blob = createBlobFromFilePath(fileRelativePath);
554        if (blob == null) {
555            throw new IOException(String.format("File %s does not exist", fileRelativePath));
556        }
557
558        blob.setMimeType(stringBlob.getMimeType());
559        blob.setEncoding(stringBlob.getEncoding());
560        String filename = stringBlob.getFilename();
561        if (filename != null) {
562            blob.setFilename(filename);
563        }
564        return blob;
565    }
566
567    /**
568     * Recursively replaces all {@code Blob}s with {@code Blob}s created from Files stored in the folder registered by
569     * the {@code nuxeo.csv.blobs.folder} property.
570     *
571     * @since 9.3
572     */
573    @SuppressWarnings("unchecked")
574    protected void replaceBlobs(Map<String, Object> map) throws IOException {
575        for (Map.Entry<String, Object> entry : map.entrySet()) {
576            Object value = entry.getValue();
577            if (value instanceof Blob) {
578                Blob blob = (Blob) value;
579                entry.setValue(createBlobFromStringBlob(blob));
580            } else if (value instanceof List) {
581                replaceBlobs((List<Object>) value);
582            } else if (value instanceof Map) {
583                replaceBlobs((Map<String, Object>) value);
584            }
585        }
586    }
587
588    /**
589     * Recursively replaces all {@code Blob}s with {@code Blob}s created from Files stored in the folder registered by
590     * the {@code nuxeo.csv.blobs.folder} property.
591     *
592     * @since 9.3
593     */
594    @SuppressWarnings("unchecked")
595    protected void replaceBlobs(List<Object> list) throws IOException {
596        for (ListIterator<Object> it = list.listIterator(); it.hasNext();) {
597            Object value = it.next();
598            if (value instanceof Blob) {
599                Blob blob = (Blob) value;
600                it.set(createBlobFromStringBlob(blob));
601            } else if (value instanceof List) {
602                replaceBlobs((List<Object>) value);
603            } else if (value instanceof Map) {
604                replaceBlobs((Map<String, Object>) value);
605            }
606        }
607    }
608
609    protected DateFormat getDateFormat() {
610        // transient field so may become null
611        if (dateformat == null) {
612            dateformat = new SimpleDateFormat(options.getDateFormat());
613        }
614        return dateformat;
615    }
616
617    protected boolean createDocument(long lineNumber, String newParentPath, String name, String type,
618            Map<String, Serializable> properties) {
619        try {
620            DocumentRef parentRef = new PathRef(newParentPath);
621            if (session.exists(parentRef)) {
622                DocumentModel parent = session.getDocument(parentRef);
623
624                TypeManager typeManager = Framework.getService(TypeManager.class);
625                if (options.checkAllowedSubTypes() && !typeManager.isAllowedSubType(type, parent.getType())) {
626                    logError(lineNumber, "'%s' type is not allowed in '%s'", LABEL_CSV_IMPORTER_NOT_ALLOWED_SUB_TYPE,
627                            type, parent.getType());
628                } else {
629                    options.getCSVImporterDocumentFactory().createDocument(session, newParentPath, name, type,
630                            properties);
631                    importLogs.add(new CSVImportLog(lineNumber, Status.SUCCESS, "Document created",
632                            LABEL_CSV_IMPORTER_DOCUMENT_CREATED));
633                    return true;
634                }
635            } else {
636                logError(lineNumber, "Parent document '%s' does not exist", LABEL_CSV_IMPORTER_PARENT_DOES_NOT_EXIST,
637                        newParentPath);
638            }
639        } catch (RuntimeException e) {
640            Throwable unwrappedException = unwrapException(e);
641            logError(lineNumber, "Unable to create document: %s", LABEL_CSV_IMPORTER_UNABLE_TO_CREATE,
642                    unwrappedException.getMessage());
643            log.debug(unwrappedException, unwrappedException);
644        }
645        return false;
646    }
647
648    protected boolean updateDocument(long lineNumber, DocumentRef docRef, Map<String, Serializable> properties) {
649        if (options.updateExisting()) {
650            try {
651                options.getCSVImporterDocumentFactory().updateDocument(session, docRef, properties);
652                importLogs.add(new CSVImportLog(lineNumber, Status.SUCCESS, "Document updated",
653                        LABEL_CSV_IMPORTER_DOCUMENT_UPDATED));
654                return true;
655            } catch (RuntimeException e) {
656                Throwable unwrappedException = unwrapException(e);
657                logError(lineNumber, "Unable to update document: %s", LABEL_CSV_IMPORTER_UNABLE_TO_UPDATE,
658                        unwrappedException.getMessage());
659                log.debug(unwrappedException, unwrappedException);
660            }
661        } else {
662            importLogs.add(new CSVImportLog(lineNumber, Status.SKIPPED, "Document already exists",
663                    LABEL_CSV_IMPORTER_DOCUMENT_ALREADY_EXISTS));
664        }
665        return false;
666    }
667
668    protected void logError(long lineNumber, String message, String localizedMessage, String... params) {
669        importLogs.add(new CSVImportLog(lineNumber, ERROR, String.format(message, (Object[]) params), localizedMessage,
670                params));
671        String lineMessage = String.format("Line %d", lineNumber);
672        String errorMessage = String.format(message, (Object[]) params);
673        log.error(String.format("%s: %s", lineMessage, errorMessage));
674        getStore().putParameter(id, "status",
675                new CSVImportStatus(CSVImportStatus.State.ERROR, docsCreatedCount, docsCreatedCount));
676    }
677
678    protected void sendMail() {
679        UserManager userManager = Framework.getService(UserManager.class);
680        NuxeoPrincipal principal = userManager.getPrincipal(username);
681        String email = principal.getEmail();
682        if (email == null) {
683            log.info(String.format("Not sending import result email to '%s', no email configured", username));
684            return;
685        }
686
687        try (OperationContext ctx = new OperationContext(session)) {
688            ctx.setInput(session.getRootDocument());
689
690            CSVImporter csvImporter = Framework.getService(CSVImporter.class);
691            List<CSVImportLog> importerLogs = csvImporter.getImportLogs(getId());
692            CSVImportResult importResult = CSVImportResult.fromImportLogs(importerLogs);
693            List<CSVImportLog> skippedAndErrorImportLogs = csvImporter.getImportLogs(getId(), Status.SKIPPED,
694                    Status.ERROR);
695            ctx.put("importResult", importResult);
696            ctx.put("skippedAndErrorImportLogs", skippedAndErrorImportLogs);
697            ctx.put("csvFilename", getBlob().getFilename());
698            ctx.put("startDate", DateFormat.getInstance().format(startDate));
699            ctx.put("username", username);
700
701            DocumentModel importFolder = session.getDocument(new PathRef(parentPath));
702            String importFolderUrl = getDocumentUrl(importFolder);
703            ctx.put("importFolderTitle", importFolder.getTitle());
704            ctx.put("importFolderUrl", importFolderUrl);
705            ctx.put("userUrl", getUserUrl());
706
707            StringList to = buildRecipientsList(email);
708            Expression from = Scripting.newExpression("Env[\"mail.from\"]");
709            String subject = "CSV Import result of " + getBlob().getFilename();
710            String message = loadTemplate(TEMPLATE_IMPORT_RESULT);
711
712            OperationChain chain = new OperationChain("SendMail");
713            chain.add(SendMail.ID)
714                 .set("from", from)
715                 .set("to", to)
716                 .set("HTML", true)
717                 .set("subject", subject)
718                 .set("message", message);
719            Framework.getService(AutomationService.class).run(ctx, chain);
720        } catch (Exception e) {
721            ExceptionUtils.checkInterrupt(e);
722            log.error(String.format("Unable to notify user '%s' for import result of '%s': %s", username,
723                    getBlob().getFilename(), e.getMessage()));
724            log.debug(e, e);
725            throw ExceptionUtils.runtimeException(e);
726        }
727    }
728
729    /**
730     * @since 9.1
731     */
732    private Blob getBlob() {
733        return getStore().getBlobs(id).get(0);
734    }
735
736    protected String getDocumentUrl(DocumentModel doc) {
737        return MailTemplateHelper.getDocumentUrl(doc, null);
738    }
739
740    protected String getUserUrl() {
741        DocumentViewCodecManager codecService = Framework.getService(DocumentViewCodecManager.class);
742        DocumentViewCodec codec = codecService.getCodec(NotificationEventListener.NOTIFICATION_DOCUMENT_ID_CODEC_NAME);
743        boolean isNotificationCodec = codec != null;
744        boolean isJSFUI = isNotificationCodec
745                && NotificationEventListener.JSF_NOTIFICATION_DOCUMENT_ID_CODEC_PREFIX.equals(codec.getPrefix());
746        StringBuilder userUrl = new StringBuilder();
747        if (isNotificationCodec) {
748
749            userUrl.append(NotificationServiceHelper.getNotificationService().getServerUrlPrefix());
750            if (!isJSFUI) {
751                userUrl.append("ui/");
752                userUrl.append("#!/");
753            }
754            userUrl.append("user/").append(username);
755        }
756        return userUrl.toString();
757    }
758
759    protected StringList buildRecipientsList(String userEmail) {
760        String csvMailTo = Framework.getProperty(NUXEO_CSV_MAIL_TO);
761        if (StringUtils.isBlank(csvMailTo)) {
762            return new StringList(new String[] { userEmail });
763        } else {
764            return new StringList(new String[] { userEmail, csvMailTo });
765        }
766    }
767
768    private static String loadTemplate(String key) {
769        InputStream io = CSVImporterWork.class.getClassLoader().getResourceAsStream(key);
770        if (io != null) {
771            try {
772                return IOUtils.toString(io, UTF_8);
773            } catch (IOException e) {
774                // cannot happen
775                throw new NuxeoException(e);
776            } finally {
777                try {
778                    io.close();
779                } catch (IOException e) {
780                    // nothing to do
781                }
782            }
783        }
784        return null;
785    }
786
787    public static Throwable unwrapException(Throwable t) {
788        Throwable cause = null;
789        if (t != null) {
790            cause = t.getCause();
791        }
792        if (cause == null) {
793            return t;
794        } else {
795            return unwrapException(cause);
796        }
797    }
798
799}