001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 */
018package org.nuxeo.ecm.user.center.profile;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.Reader;
026import java.io.Serializable;
027import java.text.DateFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.Arrays;
031import java.util.Calendar;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Map;
035
036import org.apache.commons.csv.CSVFormat;
037import org.apache.commons.csv.CSVParser;
038import org.apache.commons.csv.CSVRecord;
039import org.apache.commons.io.FilenameUtils;
040import org.apache.commons.lang.StringUtils;
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.nuxeo.common.annotation.Experimental;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.core.api.DocumentModel;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
048import org.nuxeo.ecm.core.schema.DocumentType;
049import org.nuxeo.ecm.core.schema.SchemaManager;
050import org.nuxeo.ecm.core.schema.types.Field;
051import org.nuxeo.ecm.core.schema.types.ListType;
052import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl;
053import org.nuxeo.ecm.core.schema.types.Type;
054import org.nuxeo.ecm.core.schema.types.primitives.BooleanType;
055import org.nuxeo.ecm.core.schema.types.primitives.DateType;
056import org.nuxeo.ecm.core.schema.types.primitives.DoubleType;
057import org.nuxeo.ecm.core.schema.types.primitives.IntegerType;
058import org.nuxeo.ecm.core.schema.types.primitives.LongType;
059import org.nuxeo.ecm.core.schema.types.primitives.StringType;
060import org.nuxeo.runtime.api.Framework;
061import org.nuxeo.runtime.transaction.TransactionHelper;
062
063/**
064 *
065 * @since 7.2
066 */
067@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200")
068public class UserProfileImporter {
069
070    private static final Log log = LogFactory.getLog(UserProfileImporter.class);
071
072    public static final String CONTENT_FILED_TYPE_NAME = "content";
073
074    public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username";
075
076    protected Character escapeCharacter = '\\';
077
078    protected ImporterConfig config;
079
080    protected String dataFileName;
081
082    protected transient DateFormat dateformat;
083
084    protected final Date startDate;
085
086    protected long totalRecords = 0;
087
088    protected long currentRecord = 0;
089
090    public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder";
091
092    public UserProfileImporter() {
093        startDate = new Date();
094    }
095
096    public void doImport(CoreSession session) {
097        UserProfileService ups = Framework.getLocalService(UserProfileService.class);
098
099        config = ups.getImporterConfig();
100        if (config == null) {
101            log.error("No importer configuration could be found");
102            return;
103        }
104
105        dataFileName = config.getDataFileName();
106        if (dataFileName == null) {
107            log.error("No importer dataFileName was supplied");
108            return;
109        }
110
111        InputStream is = getResourceAsStream(dataFileName);
112        if (is == null) {
113            log.error("Error locating CSV data file: " + dataFileName);
114            return;
115        }
116
117        Reader in = new BufferedReader(new InputStreamReader(is));
118        CSVParser parser = null;
119
120        try {
121            parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in);
122            doImport(session, parser, ups);
123        } catch (IOException e) {
124            log.error("Unable to read CSV file", e);
125        } finally {
126            if (parser != null) {
127                try {
128                    parser.close();
129                } catch (IOException e) {
130                    log.debug(e, e);
131                }
132            }
133        }
134
135    }
136
137    protected InputStream getResourceAsStream(String resource) {
138        InputStream is = getClass().getClassLoader().getResourceAsStream(resource);
139        if (is == null) {
140            is = Framework.getResourceLoader().getResourceAsStream(resource);
141            if (is == null) {
142                return null;
143            }
144        }
145        return is;
146    }
147
148    public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService)
149            throws IOException {
150        log.info(String.format("Importing CSV file: %s", dataFileName));
151
152        DocumentType docType = Framework.getLocalService(SchemaManager.class).getDocumentType(
153                UserProfileConstants.USER_PROFILE_DOCTYPE);
154        if (docType == null) {
155            log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist");
156            return;
157        }
158
159        Map<String, Integer> header = parser.getHeaderMap();
160
161        if (header == null) {
162            // empty file?
163            log.error("No header line, empty file?");
164            return;
165        }
166
167        // find the index for the required name and type values
168        Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL);
169        if (nameIndex == null) {
170            log.error("Missing 'username' column");
171            return;
172        }
173
174        long docsUpdatedCount = 0;
175        totalRecords = parser.getRecordNumber();
176        try {
177            int batchSize = config.getBatchSize();
178            long lineNumber = 0;
179
180            for (CSVRecord record : parser.getRecords()) {
181                lineNumber++;
182                currentRecord = lineNumber;
183
184                try {
185                    if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) {
186                        docsUpdatedCount++;
187                        if (docsUpdatedCount % batchSize == 0) {
188                            commitOrRollbackTransaction();
189                            startTransaction();
190                        }
191                    }
192                } catch (NuxeoException e) {
193                    // try next line
194                    Throwable unwrappedException = unwrapException(e);
195                    logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage());
196                    log.debug(unwrappedException, unwrappedException);
197                }
198            }
199
200            session.save();
201        } finally {
202            commitOrRollbackTransaction();
203            startTransaction();
204        }
205        log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName));
206    }
207
208    /**
209     * Import a line from the CSV file.
210     *
211     * @param userProfileService
212     * @param docType
213     * @param session
214     * @return {@code true} if a document has been created or updated, {@code false} otherwise.
215     */
216    protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType,
217            CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues)
218            {
219        final String name = record.get(nameIndex);
220        if (StringUtils.isBlank(name)) {
221            logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue");
222            return false;
223        }
224
225        Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record);
226        if (values == null) {
227            // skip this line
228            return false;
229        }
230
231        return updateDocument(lineNumber, name, docType, session, userProfileService, values);
232    }
233
234    protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType,
235            Map<String, Integer> headerValues, CSVRecord record) {
236
237        Map<String, Serializable> values = new HashMap<String, Serializable>();
238        for (String headerValue : headerValues.keySet()) {
239            String lineValue = record.get(headerValue);
240            lineValue = lineValue.trim();
241            String fieldName = headerValue;
242            if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) {
243                if (!docType.hasField(fieldName)) {
244                    fieldName = fieldName.split(":")[1];
245                }
246                if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) {
247                    Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber);
248                    if (convertedValue == null) {
249                        return null;
250                    }
251                    values.put(headerValue, convertedValue);
252                }
253            }
254        }
255        return values;
256    }
257
258    protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue,
259            long lineNumber) {
260        if (docType.hasField(fieldName)) {
261            Field field = docType.getField(fieldName);
262            if (field != null) {
263                try {
264                    Serializable fieldValue = null;
265                    Type fieldType = field.getType();
266                    if (fieldType.isComplexType()) {
267                        if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) {
268                            String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY);
269                            String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue);
270                            File file = new File(path);
271                            if (file.exists()) {
272                                FileBlob blob = new FileBlob(file);
273                                blob.setFilename(file.getName());
274                                fieldValue = blob;
275                            } else {
276                                logImportError(lineNumber, "The file '%s' does not exist", stringValue);
277                                return null;
278                            }
279                        }
280                        // other types not supported
281                    } else {
282                        if (fieldType.isListType()) {
283                            Type listFieldType = ((ListType) fieldType).getFieldType();
284                            if (listFieldType.isSimpleType()) {
285                                /*
286                                 * Array.
287                                 */
288                                fieldValue = stringValue.split(config.getListSeparatorRegex());
289                            } else {
290                                /*
291                                 * Complex list.
292                                 */
293                                fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex()));
294                            }
295                        } else {
296                            /*
297                             * Primitive type.
298                             */
299                            Type type = field.getType();
300                            if (type instanceof SimpleTypeImpl) {
301                                type = type.getSuperType();
302                            }
303                            if (type.isSimpleType()) {
304                                if (type instanceof StringType) {
305                                    fieldValue = stringValue;
306                                } else if (type instanceof IntegerType) {
307                                    fieldValue = Integer.valueOf(stringValue);
308                                } else if (type instanceof LongType) {
309                                    fieldValue = Long.valueOf(stringValue);
310                                } else if (type instanceof DoubleType) {
311                                    fieldValue = Double.valueOf(stringValue);
312                                } else if (type instanceof BooleanType) {
313                                    fieldValue = Boolean.valueOf(stringValue);
314                                } else if (type instanceof DateType) {
315                                    fieldValue = getDateFormat().parse(stringValue);
316                                }
317                            }
318                        }
319                    }
320                    return fieldValue;
321                } catch (ParseException pe) {
322                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
323                    log.debug(pe, pe);
324                } catch (NumberFormatException nfe) {
325                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
326                    log.debug(nfe, nfe);
327                }
328            }
329        } else {
330            logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName());
331        }
332        return null;
333    }
334
335    protected DateFormat getDateFormat() {
336        // transient field so may become null
337        if (dateformat == null) {
338            dateformat = new SimpleDateFormat(config.getDateFormat());
339        }
340        return dateformat;
341    }
342
343    protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session,
344            UserProfileService userProfileService, Map<String, Serializable> properties) {
345
346        DocumentModel doc = userProfileService.getUserProfileDocument(name, session);
347        Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created");
348        boolean isCreated = (createdDate.getTime().after(startDate));
349        if (!isCreated && !config.isUpdateExisting()) {
350            logImportInfo(lineNumber, "Document already exists for user: %s", name);
351            return false;
352        }
353
354        for (Map.Entry<String, Serializable> entry : properties.entrySet()) {
355            doc.setPropertyValue(entry.getKey(), entry.getValue());
356        }
357
358        try {
359            session.saveDocument(doc);
360        } catch (NuxeoException e) {
361            Throwable unwrappedException = unwrapException(e);
362            logImportError(lineNumber, "Unable to update document for user: %s: %s", name,
363                    unwrappedException.getMessage());
364            log.debug(unwrappedException, unwrappedException);
365            return false;
366        }
367        return true;
368    }
369
370    /**
371     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
372     * running a long process.
373     */
374    protected void commitOrRollbackTransaction() {
375        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
376            TransactionHelper.commitOrRollbackTransaction();
377        }
378    }
379
380    /**
381     * Starts a new transaction.
382     * <p>
383     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
384     * process.
385     *
386     * @return true if a new transaction was started
387     */
388    protected boolean startTransaction() {
389        return TransactionHelper.startTransaction();
390    }
391
392    protected void logImportError(long lineNumber, String message, String... params) {
393        String lineMessage = String.format("Line %d", lineNumber);
394        String errorMessage = String.format(message, (Object[]) params);
395        log.error(String.format("%s: %s", lineMessage, errorMessage));
396    }
397
398    protected void logImportInfo(long lineNumber, String message, String... params) {
399        String lineMessage = String.format("Line %d", lineNumber);
400        String infoMessage = String.format(message, (Object[]) params);
401        log.info(String.format("%s: %s", lineMessage, infoMessage));
402    }
403
404    public static Throwable unwrapException(Throwable t) {
405        Throwable cause = null;
406
407        if (t != null) {
408            cause = t.getCause();
409        }
410
411        if (cause == null) {
412            return t;
413        } else {
414            return unwrapException(cause);
415        }
416    }
417
418    public long getTotalRecords() {
419        return totalRecords;
420    }
421
422    public long getCurrentRecord() {
423        return currentRecord;
424    }
425
426}