001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 */
018package org.nuxeo.ecm.user.center.profile;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.Reader;
026import java.io.Serializable;
027import java.text.DateFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.Arrays;
031import java.util.Calendar;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Map;
035
036import org.apache.commons.csv.CSVFormat;
037import org.apache.commons.csv.CSVParser;
038import org.apache.commons.csv.CSVRecord;
039import org.apache.commons.io.FilenameUtils;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.nuxeo.common.annotation.Experimental;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.core.api.DocumentModel;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
048import org.nuxeo.ecm.core.schema.DocumentType;
049import org.nuxeo.ecm.core.schema.SchemaManager;
050import org.nuxeo.ecm.core.schema.types.Field;
051import org.nuxeo.ecm.core.schema.types.ListType;
052import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl;
053import org.nuxeo.ecm.core.schema.types.Type;
054import org.nuxeo.ecm.core.schema.types.primitives.BooleanType;
055import org.nuxeo.ecm.core.schema.types.primitives.DateType;
056import org.nuxeo.ecm.core.schema.types.primitives.DoubleType;
057import org.nuxeo.ecm.core.schema.types.primitives.IntegerType;
058import org.nuxeo.ecm.core.schema.types.primitives.LongType;
059import org.nuxeo.ecm.core.schema.types.primitives.StringType;
060import org.nuxeo.runtime.api.Framework;
061import org.nuxeo.runtime.transaction.TransactionHelper;
062
063/**
064 *
065 * @since 7.2
066 */
067@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200")
068public class UserProfileImporter {
069
070    private static final Log log = LogFactory.getLog(UserProfileImporter.class);
071
072    public static final String CONTENT_FILED_TYPE_NAME = "content";
073
074    public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username";
075
076    protected Character escapeCharacter = '\\';
077
078    protected ImporterConfig config;
079
080    protected String dataFileName;
081
082    protected transient DateFormat dateformat;
083
084    protected final Date startDate;
085
086    protected long totalRecords = 0;
087
088    protected long currentRecord = 0;
089
090    public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder";
091
092    public UserProfileImporter() {
093        startDate = new Date();
094    }
095
096    public void doImport(CoreSession session) {
097        UserProfileService ups = Framework.getService(UserProfileService.class);
098
099        config = ups.getImporterConfig();
100        if (config == null) {
101            log.error("No importer configuration could be found");
102            return;
103        }
104
105        dataFileName = config.getDataFileName();
106        if (dataFileName == null) {
107            log.error("No importer dataFileName was supplied");
108            return;
109        }
110
111        InputStream is = getResourceAsStream(dataFileName);
112        if (is == null) {
113            log.error("Error locating CSV data file: " + dataFileName);
114            return;
115        }
116
117        Reader in = new BufferedReader(new InputStreamReader(is));
118        CSVParser parser = null;
119
120        try {
121            parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in);
122            doImport(session, parser, ups);
123        } catch (IOException e) {
124            log.error("Unable to read CSV file", e);
125        } finally {
126            if (parser != null) {
127                try {
128                    parser.close();
129                } catch (IOException e) {
130                    log.debug(e, e);
131                }
132            }
133        }
134
135    }
136
137    protected InputStream getResourceAsStream(String resource) {
138        InputStream is = getClass().getClassLoader().getResourceAsStream(resource);
139        if (is == null) {
140            is = Framework.getResourceLoader().getResourceAsStream(resource);
141            if (is == null) {
142                return null;
143            }
144        }
145        return is;
146    }
147
148    public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService)
149            throws IOException {
150        log.info(String.format("Importing CSV file: %s", dataFileName));
151
152        DocumentType docType = Framework.getService(SchemaManager.class).getDocumentType(
153                UserProfileConstants.USER_PROFILE_DOCTYPE);
154        if (docType == null) {
155            log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist");
156            return;
157        }
158
159        Map<String, Integer> header = parser.getHeaderMap();
160
161        if (header == null) {
162            // empty file?
163            log.error("No header line, empty file?");
164            return;
165        }
166
167        // find the index for the required name and type values
168        Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL);
169        if (nameIndex == null) {
170            log.error("Missing 'username' column");
171            return;
172        }
173
174        long docsUpdatedCount = 0;
175        totalRecords = parser.getRecordNumber();
176        try {
177            int batchSize = config.getBatchSize();
178            long lineNumber = 0;
179
180            for (CSVRecord record : parser.getRecords()) {
181                lineNumber++;
182                currentRecord = lineNumber;
183
184                try {
185                    if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) {
186                        docsUpdatedCount++;
187                        if (docsUpdatedCount % batchSize == 0) {
188                            commitOrRollbackTransaction();
189                            startTransaction();
190                        }
191                    }
192                } catch (NuxeoException e) {
193                    // try next line
194                    Throwable unwrappedException = unwrapException(e);
195                    logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage());
196                    log.debug(unwrappedException, unwrappedException);
197                }
198            }
199
200            session.save();
201        } finally {
202            commitOrRollbackTransaction();
203            startTransaction();
204        }
205        log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName));
206    }
207
208    /**
209     * Import a line from the CSV file.
210     *
211     * @return {@code true} if a document has been created or updated, {@code false} otherwise.
212     */
213    protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType,
214            CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues)
215            {
216        final String name = record.get(nameIndex);
217        if (StringUtils.isBlank(name)) {
218            logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue");
219            return false;
220        }
221
222        Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record);
223        if (values == null) {
224            // skip this line
225            return false;
226        }
227
228        return updateDocument(lineNumber, name, docType, session, userProfileService, values);
229    }
230
231    protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType,
232            Map<String, Integer> headerValues, CSVRecord record) {
233
234        Map<String, Serializable> values = new HashMap<String, Serializable>();
235        for (String headerValue : headerValues.keySet()) {
236            String lineValue = record.get(headerValue);
237            lineValue = lineValue.trim();
238            String fieldName = headerValue;
239            if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) {
240                if (!docType.hasField(fieldName)) {
241                    fieldName = fieldName.split(":")[1];
242                }
243                if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) {
244                    Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber);
245                    if (convertedValue == null) {
246                        return null;
247                    }
248                    values.put(headerValue, convertedValue);
249                }
250            }
251        }
252        return values;
253    }
254
255    protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue,
256            long lineNumber) {
257        if (docType.hasField(fieldName)) {
258            Field field = docType.getField(fieldName);
259            if (field != null) {
260                try {
261                    Serializable fieldValue = null;
262                    Type fieldType = field.getType();
263                    if (fieldType.isComplexType()) {
264                        if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) {
265                            String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY);
266                            String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue);
267                            File file = new File(path);
268                            if (file.exists()) {
269                                FileBlob blob = new FileBlob(file);
270                                blob.setFilename(file.getName());
271                                fieldValue = blob;
272                            } else {
273                                logImportError(lineNumber, "The file '%s' does not exist", stringValue);
274                                return null;
275                            }
276                        }
277                        // other types not supported
278                    } else {
279                        if (fieldType.isListType()) {
280                            Type listFieldType = ((ListType) fieldType).getFieldType();
281                            if (listFieldType.isSimpleType()) {
282                                /*
283                                 * Array.
284                                 */
285                                fieldValue = stringValue.split(config.getListSeparatorRegex());
286                            } else {
287                                /*
288                                 * Complex list.
289                                 */
290                                fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex()));
291                            }
292                        } else {
293                            /*
294                             * Primitive type.
295                             */
296                            Type type = field.getType();
297                            if (type instanceof SimpleTypeImpl) {
298                                type = type.getSuperType();
299                            }
300                            if (type.isSimpleType()) {
301                                if (type instanceof StringType) {
302                                    fieldValue = stringValue;
303                                } else if (type instanceof IntegerType) {
304                                    fieldValue = Integer.valueOf(stringValue);
305                                } else if (type instanceof LongType) {
306                                    fieldValue = Long.valueOf(stringValue);
307                                } else if (type instanceof DoubleType) {
308                                    fieldValue = Double.valueOf(stringValue);
309                                } else if (type instanceof BooleanType) {
310                                    fieldValue = Boolean.valueOf(stringValue);
311                                } else if (type instanceof DateType) {
312                                    fieldValue = getDateFormat().parse(stringValue);
313                                }
314                            }
315                        }
316                    }
317                    return fieldValue;
318                } catch (ParseException pe) {
319                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
320                    log.debug(pe, pe);
321                } catch (NumberFormatException nfe) {
322                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
323                    log.debug(nfe, nfe);
324                }
325            }
326        } else {
327            logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName());
328        }
329        return null;
330    }
331
332    protected DateFormat getDateFormat() {
333        // transient field so may become null
334        if (dateformat == null) {
335            dateformat = new SimpleDateFormat(config.getDateFormat());
336        }
337        return dateformat;
338    }
339
340    protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session,
341            UserProfileService userProfileService, Map<String, Serializable> properties) {
342
343        DocumentModel doc = userProfileService.getUserProfileDocument(name, session);
344        Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created");
345        boolean isCreated = (createdDate.getTime().after(startDate));
346        if (!isCreated && !config.isUpdateExisting()) {
347            logImportInfo(lineNumber, "Document already exists for user: %s", name);
348            return false;
349        }
350
351        for (Map.Entry<String, Serializable> entry : properties.entrySet()) {
352            doc.setPropertyValue(entry.getKey(), entry.getValue());
353        }
354
355        try {
356            session.saveDocument(doc);
357        } catch (NuxeoException e) {
358            Throwable unwrappedException = unwrapException(e);
359            logImportError(lineNumber, "Unable to update document for user: %s: %s", name,
360                    unwrappedException.getMessage());
361            log.debug(unwrappedException, unwrappedException);
362            return false;
363        }
364        return true;
365    }
366
367    /**
368     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
369     * running a long process.
370     */
371    protected void commitOrRollbackTransaction() {
372        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
373            TransactionHelper.commitOrRollbackTransaction();
374        }
375    }
376
377    /**
378     * Starts a new transaction.
379     * <p>
380     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
381     * process.
382     *
383     * @return true if a new transaction was started
384     */
385    protected boolean startTransaction() {
386        return TransactionHelper.startTransaction();
387    }
388
389    protected void logImportError(long lineNumber, String message, String... params) {
390        String lineMessage = String.format("Line %d", lineNumber);
391        String errorMessage = String.format(message, (Object[]) params);
392        log.error(String.format("%s: %s", lineMessage, errorMessage));
393    }
394
395    protected void logImportInfo(long lineNumber, String message, String... params) {
396        String lineMessage = String.format("Line %d", lineNumber);
397        String infoMessage = String.format(message, (Object[]) params);
398        log.info(String.format("%s: %s", lineMessage, infoMessage));
399    }
400
401    public static Throwable unwrapException(Throwable t) {
402        Throwable cause = null;
403
404        if (t != null) {
405            cause = t.getCause();
406        }
407
408        if (cause == null) {
409            return t;
410        } else {
411            return unwrapException(cause);
412        }
413    }
414
415    public long getTotalRecords() {
416        return totalRecords;
417    }
418
419    public long getCurrentRecord() {
420        return currentRecord;
421    }
422
423}