001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 */
016package org.nuxeo.ecm.user.center.profile;
017
018import java.io.BufferedReader;
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.Reader;
024import java.io.Serializable;
025import java.text.DateFormat;
026import java.text.ParseException;
027import java.text.SimpleDateFormat;
028import java.util.Arrays;
029import java.util.Calendar;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.Map;
033
034import org.apache.commons.csv.CSVFormat;
035import org.apache.commons.csv.CSVParser;
036import org.apache.commons.csv.CSVRecord;
037import org.apache.commons.io.FilenameUtils;
038import org.apache.commons.lang.StringUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.common.annotation.Experimental;
042import org.nuxeo.ecm.core.api.CoreSession;
043import org.nuxeo.ecm.core.api.DocumentModel;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
046import org.nuxeo.ecm.core.schema.DocumentType;
047import org.nuxeo.ecm.core.schema.SchemaManager;
048import org.nuxeo.ecm.core.schema.types.Field;
049import org.nuxeo.ecm.core.schema.types.ListType;
050import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl;
051import org.nuxeo.ecm.core.schema.types.Type;
052import org.nuxeo.ecm.core.schema.types.primitives.BooleanType;
053import org.nuxeo.ecm.core.schema.types.primitives.DateType;
054import org.nuxeo.ecm.core.schema.types.primitives.DoubleType;
055import org.nuxeo.ecm.core.schema.types.primitives.IntegerType;
056import org.nuxeo.ecm.core.schema.types.primitives.LongType;
057import org.nuxeo.ecm.core.schema.types.primitives.StringType;
058import org.nuxeo.runtime.api.Framework;
059import org.nuxeo.runtime.transaction.TransactionHelper;
060
061/**
062 *
063 * @since 7.2
064 */
065@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200")
066public class UserProfileImporter {
067
068    private static final Log log = LogFactory.getLog(UserProfileImporter.class);
069
070    public static final String CONTENT_FILED_TYPE_NAME = "content";
071
072    public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username";
073
074    protected Character escapeCharacter = '\\';
075
076    protected ImporterConfig config;
077
078    protected String dataFileName;
079
080    protected transient DateFormat dateformat;
081
082    protected final Date startDate;
083
084    protected long totalRecords = 0;
085
086    protected long currentRecord = 0;
087
088    public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder";
089
090    public UserProfileImporter() {
091        startDate = new Date();
092    }
093
094    public void doImport(CoreSession session) {
095        UserProfileService ups = Framework.getLocalService(UserProfileService.class);
096
097        config = ups.getImporterConfig();
098        if (config == null) {
099            log.error("No importer configuration could be found");
100            return;
101        }
102
103        dataFileName = config.getDataFileName();
104        if (dataFileName == null) {
105            log.error("No importer dataFileName was supplied");
106            return;
107        }
108
109        InputStream is = getResourceAsStream(dataFileName);
110        if (is == null) {
111            log.error("Error locating CSV data file: " + dataFileName);
112            return;
113        }
114
115        Reader in = new BufferedReader(new InputStreamReader(is));
116        CSVParser parser = null;
117
118        try {
119            parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in);
120            doImport(session, parser, ups);
121        } catch (IOException e) {
122            log.error("Unable to read CSV file", e);
123        } finally {
124            if (parser != null) {
125                try {
126                    parser.close();
127                } catch (IOException e) {
128                    log.debug(e, e);
129                }
130            }
131        }
132
133    }
134
135    protected InputStream getResourceAsStream(String resource) {
136        InputStream is = getClass().getClassLoader().getResourceAsStream(resource);
137        if (is == null) {
138            is = Framework.getResourceLoader().getResourceAsStream(resource);
139            if (is == null) {
140                return null;
141            }
142        }
143        return is;
144    }
145
146    public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService)
147            throws IOException {
148        log.info(String.format("Importing CSV file: %s", dataFileName));
149
150        DocumentType docType = Framework.getLocalService(SchemaManager.class).getDocumentType(
151                UserProfileConstants.USER_PROFILE_DOCTYPE);
152        if (docType == null) {
153            log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist");
154            return;
155        }
156
157        Map<String, Integer> header = parser.getHeaderMap();
158
159        if (header == null) {
160            // empty file?
161            log.error("No header line, empty file?");
162            return;
163        }
164
165        // find the index for the required name and type values
166        Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL);
167        if (nameIndex == null) {
168            log.error("Missing 'username' column");
169            return;
170        }
171
172        long docsUpdatedCount = 0;
173        totalRecords = parser.getRecordNumber();
174        try {
175            int batchSize = config.getBatchSize();
176            long lineNumber = 0;
177
178            for (CSVRecord record : parser.getRecords()) {
179                lineNumber++;
180                currentRecord = lineNumber;
181
182                try {
183                    if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) {
184                        docsUpdatedCount++;
185                        if (docsUpdatedCount % batchSize == 0) {
186                            commitOrRollbackTransaction();
187                            startTransaction();
188                        }
189                    }
190                } catch (NuxeoException e) {
191                    // try next line
192                    Throwable unwrappedException = unwrapException(e);
193                    logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage());
194                    log.debug(unwrappedException, unwrappedException);
195                }
196            }
197
198            session.save();
199        } finally {
200            commitOrRollbackTransaction();
201            startTransaction();
202        }
203        log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName));
204    }
205
206    /**
207     * Import a line from the CSV file.
208     *
209     * @param userProfileService
210     * @param docType
211     * @param session
212     * @return {@code true} if a document has been created or updated, {@code false} otherwise.
213     */
214    protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType,
215            CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues)
216            {
217        final String name = record.get(nameIndex);
218        if (StringUtils.isBlank(name)) {
219            logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue");
220            return false;
221        }
222
223        Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record);
224        if (values == null) {
225            // skip this line
226            return false;
227        }
228
229        return updateDocument(lineNumber, name, docType, session, userProfileService, values);
230    }
231
232    protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType,
233            Map<String, Integer> headerValues, CSVRecord record) {
234
235        Map<String, Serializable> values = new HashMap<String, Serializable>();
236        for (String headerValue : headerValues.keySet()) {
237            String lineValue = record.get(headerValue);
238            lineValue = lineValue.trim();
239            String fieldName = headerValue;
240            if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) {
241                if (!docType.hasField(fieldName)) {
242                    fieldName = fieldName.split(":")[1];
243                }
244                if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) {
245                    Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber);
246                    if (convertedValue == null) {
247                        return null;
248                    }
249                    values.put(headerValue, convertedValue);
250                }
251            }
252        }
253        return values;
254    }
255
256    protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue,
257            long lineNumber) {
258        if (docType.hasField(fieldName)) {
259            Field field = docType.getField(fieldName);
260            if (field != null) {
261                try {
262                    Serializable fieldValue = null;
263                    Type fieldType = field.getType();
264                    if (fieldType.isComplexType()) {
265                        if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) {
266                            String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY);
267                            String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue);
268                            File file = new File(path);
269                            if (file.exists()) {
270                                FileBlob blob = new FileBlob(file);
271                                blob.setFilename(file.getName());
272                                fieldValue = blob;
273                            } else {
274                                logImportError(lineNumber, "The file '%s' does not exist", stringValue);
275                                return null;
276                            }
277                        }
278                        // other types not supported
279                    } else {
280                        if (fieldType.isListType()) {
281                            Type listFieldType = ((ListType) fieldType).getFieldType();
282                            if (listFieldType.isSimpleType()) {
283                                /*
284                                 * Array.
285                                 */
286                                fieldValue = stringValue.split(config.getListSeparatorRegex());
287                            } else {
288                                /*
289                                 * Complex list.
290                                 */
291                                fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex()));
292                            }
293                        } else {
294                            /*
295                             * Primitive type.
296                             */
297                            Type type = field.getType();
298                            if (type instanceof SimpleTypeImpl) {
299                                type = type.getSuperType();
300                            }
301                            if (type.isSimpleType()) {
302                                if (type instanceof StringType) {
303                                    fieldValue = stringValue;
304                                } else if (type instanceof IntegerType) {
305                                    fieldValue = Integer.valueOf(stringValue);
306                                } else if (type instanceof LongType) {
307                                    fieldValue = Long.valueOf(stringValue);
308                                } else if (type instanceof DoubleType) {
309                                    fieldValue = Double.valueOf(stringValue);
310                                } else if (type instanceof BooleanType) {
311                                    fieldValue = Boolean.valueOf(stringValue);
312                                } else if (type instanceof DateType) {
313                                    fieldValue = getDateFormat().parse(stringValue);
314                                }
315                            }
316                        }
317                    }
318                    return fieldValue;
319                } catch (ParseException pe) {
320                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
321                    log.debug(pe, pe);
322                } catch (NumberFormatException nfe) {
323                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
324                    log.debug(nfe, nfe);
325                }
326            }
327        } else {
328            logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName());
329        }
330        return null;
331    }
332
333    protected DateFormat getDateFormat() {
334        // transient field so may become null
335        if (dateformat == null) {
336            dateformat = new SimpleDateFormat(config.getDateFormat());
337        }
338        return dateformat;
339    }
340
341    protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session,
342            UserProfileService userProfileService, Map<String, Serializable> properties) {
343
344        DocumentModel doc = userProfileService.getUserProfileDocument(name, session);
345        Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created");
346        boolean isCreated = (createdDate.getTime().after(startDate));
347        if (!isCreated && !config.isUpdateExisting()) {
348            logImportInfo(lineNumber, "Document already exists for user: %s", name);
349            return false;
350        }
351
352        for (Map.Entry<String, Serializable> entry : properties.entrySet()) {
353            doc.setPropertyValue(entry.getKey(), entry.getValue());
354        }
355
356        try {
357            session.saveDocument(doc);
358        } catch (NuxeoException e) {
359            Throwable unwrappedException = unwrapException(e);
360            logImportError(lineNumber, "Unable to update document for user: %s: %s", name,
361                    unwrappedException.getMessage());
362            log.debug(unwrappedException, unwrappedException);
363            return false;
364        }
365        return true;
366    }
367
368    /**
369     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
370     * running a long process.
371     */
372    protected void commitOrRollbackTransaction() {
373        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
374            TransactionHelper.commitOrRollbackTransaction();
375        }
376    }
377
378    /**
379     * Starts a new transaction.
380     * <p>
381     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
382     * process.
383     *
384     * @return true if a new transaction was started
385     */
386    protected boolean startTransaction() {
387        return TransactionHelper.startTransaction();
388    }
389
390    protected void logImportError(long lineNumber, String message, String... params) {
391        String lineMessage = String.format("Line %d", lineNumber);
392        String errorMessage = String.format(message, (Object[]) params);
393        log.error(String.format("%s: %s", lineMessage, errorMessage));
394    }
395
396    protected void logImportInfo(long lineNumber, String message, String... params) {
397        String lineMessage = String.format("Line %d", lineNumber);
398        String infoMessage = String.format(message, (Object[]) params);
399        log.info(String.format("%s: %s", lineMessage, infoMessage));
400    }
401
402    public static Throwable unwrapException(Throwable t) {
403        Throwable cause = null;
404
405        if (t != null) {
406            cause = t.getCause();
407        }
408
409        if (cause == null) {
410            return t;
411        } else {
412            return unwrapException(cause);
413        }
414    }
415
416    public long getTotalRecords() {
417        return totalRecords;
418    }
419
420    public long getCurrentRecord() {
421        return currentRecord;
422    }
423
424}