001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 */
018package org.nuxeo.ecm.user.center.profile;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.Reader;
026import java.io.Serializable;
027import java.text.DateFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.Arrays;
031import java.util.Calendar;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Map;
035
036import org.apache.commons.csv.CSVFormat;
037import org.apache.commons.csv.CSVParser;
038import org.apache.commons.csv.CSVRecord;
039import org.apache.commons.io.FilenameUtils;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.nuxeo.common.annotation.Experimental;
044import org.nuxeo.ecm.core.api.CoreSession;
045import org.nuxeo.ecm.core.api.DocumentModel;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
048import org.nuxeo.ecm.core.schema.DocumentType;
049import org.nuxeo.ecm.core.schema.SchemaManager;
050import org.nuxeo.ecm.core.schema.types.Field;
051import org.nuxeo.ecm.core.schema.types.ListType;
052import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl;
053import org.nuxeo.ecm.core.schema.types.Type;
054import org.nuxeo.ecm.core.schema.types.primitives.BooleanType;
055import org.nuxeo.ecm.core.schema.types.primitives.DateType;
056import org.nuxeo.ecm.core.schema.types.primitives.DoubleType;
057import org.nuxeo.ecm.core.schema.types.primitives.IntegerType;
058import org.nuxeo.ecm.core.schema.types.primitives.LongType;
059import org.nuxeo.ecm.core.schema.types.primitives.StringType;
060import org.nuxeo.runtime.api.Framework;
061import org.nuxeo.runtime.transaction.TransactionHelper;
062
063/**
064 *
065 * @since 7.2
066 */
067@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200")
068public class UserProfileImporter {
069
070    private static final Log log = LogFactory.getLog(UserProfileImporter.class);
071
072    public static final String CONTENT_FILED_TYPE_NAME = "content";
073
074    public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username";
075
076    protected Character escapeCharacter = '\\';
077
078    protected ImporterConfig config;
079
080    protected String dataFileName;
081
082    protected transient DateFormat dateformat;
083
084    protected final Date startDate;
085
086    protected long totalRecords = 0;
087
088    protected long currentRecord = 0;
089
090    public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder";
091
092    public UserProfileImporter() {
093        startDate = new Date();
094    }
095
096    public void doImport(CoreSession session) {
097        UserProfileService ups = Framework.getService(UserProfileService.class);
098
099        config = ups.getImporterConfig();
100        if (config == null) {
101            log.error("No importer configuration could be found");
102            return;
103        }
104
105        dataFileName = config.getDataFileName();
106        if (dataFileName == null) {
107            log.error("No importer dataFileName was supplied");
108            return;
109        }
110
111        try (InputStream is = getResourceAsStream(dataFileName)) {
112            if (is == null) {
113                log.error("Error locating CSV data file: " + dataFileName);
114                return;
115            }
116
117            try (Reader in = new BufferedReader(new InputStreamReader(is));
118                    CSVParser parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in)) {
119                doImport(session, parser, ups);
120            }
121        } catch (IOException e) {
122            log.error("Unable to read CSV file", e);
123        }
124
125    }
126
127    @SuppressWarnings("resource") // closed by caller
128    protected InputStream getResourceAsStream(String resource) {
129        InputStream is = getClass().getClassLoader().getResourceAsStream(resource);
130        if (is == null) {
131            is = Framework.getResourceLoader().getResourceAsStream(resource);
132            if (is == null) {
133                return null;
134            }
135        }
136        return is;
137    }
138
139    public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService)
140            throws IOException {
141        log.info(String.format("Importing CSV file: %s", dataFileName));
142
143        DocumentType docType = Framework.getService(SchemaManager.class).getDocumentType(
144                UserProfileConstants.USER_PROFILE_DOCTYPE);
145        if (docType == null) {
146            log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist");
147            return;
148        }
149
150        Map<String, Integer> header = parser.getHeaderMap();
151
152        if (header == null) {
153            // empty file?
154            log.error("No header line, empty file?");
155            return;
156        }
157
158        // find the index for the required name and type values
159        Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL);
160        if (nameIndex == null) {
161            log.error("Missing 'username' column");
162            return;
163        }
164
165        long docsUpdatedCount = 0;
166        totalRecords = parser.getRecordNumber();
167        try {
168            int batchSize = config.getBatchSize();
169            long lineNumber = 0;
170
171            for (CSVRecord record : parser.getRecords()) {
172                lineNumber++;
173                currentRecord = lineNumber;
174
175                try {
176                    if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) {
177                        docsUpdatedCount++;
178                        if (docsUpdatedCount % batchSize == 0) {
179                            commitOrRollbackTransaction();
180                            startTransaction();
181                        }
182                    }
183                } catch (NuxeoException e) {
184                    // try next line
185                    Throwable unwrappedException = unwrapException(e);
186                    logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage());
187                    log.debug(unwrappedException, unwrappedException);
188                }
189            }
190
191            session.save();
192        } finally {
193            commitOrRollbackTransaction();
194            startTransaction();
195        }
196        log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName));
197    }
198
199    /**
200     * Import a line from the CSV file.
201     *
202     * @return {@code true} if a document has been created or updated, {@code false} otherwise.
203     */
204    protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType,
205            CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues)
206            {
207        final String name = record.get(nameIndex);
208        if (StringUtils.isBlank(name)) {
209            logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue");
210            return false;
211        }
212
213        Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record);
214        if (values == null) {
215            // skip this line
216            return false;
217        }
218
219        return updateDocument(lineNumber, name, docType, session, userProfileService, values);
220    }
221
222    protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType,
223            Map<String, Integer> headerValues, CSVRecord record) {
224
225        Map<String, Serializable> values = new HashMap<>();
226        for (String headerValue : headerValues.keySet()) {
227            String lineValue = record.get(headerValue);
228            lineValue = lineValue.trim();
229            String fieldName = headerValue;
230            if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) {
231                if (!docType.hasField(fieldName)) {
232                    fieldName = fieldName.split(":")[1];
233                }
234                if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) {
235                    Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber);
236                    if (convertedValue == null) {
237                        return null;
238                    }
239                    values.put(headerValue, convertedValue);
240                }
241            }
242        }
243        return values;
244    }
245
246    protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue,
247            long lineNumber) {
248        if (docType.hasField(fieldName)) {
249            Field field = docType.getField(fieldName);
250            if (field != null) {
251                try {
252                    Serializable fieldValue = null;
253                    Type fieldType = field.getType();
254                    if (fieldType.isComplexType()) {
255                        if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) {
256                            String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY);
257                            String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue);
258                            File file = new File(path);
259                            if (file.exists()) {
260                                FileBlob blob = new FileBlob(file);
261                                blob.setFilename(file.getName());
262                                fieldValue = blob;
263                            } else {
264                                logImportError(lineNumber, "The file '%s' does not exist", stringValue);
265                                return null;
266                            }
267                        }
268                        // other types not supported
269                    } else {
270                        if (fieldType.isListType()) {
271                            Type listFieldType = ((ListType) fieldType).getFieldType();
272                            if (listFieldType.isSimpleType()) {
273                                /*
274                                 * Array.
275                                 */
276                                fieldValue = stringValue.split(config.getListSeparatorRegex());
277                            } else {
278                                /*
279                                 * Complex list.
280                                 */
281                                fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex()));
282                            }
283                        } else {
284                            /*
285                             * Primitive type.
286                             */
287                            Type type = field.getType();
288                            if (type instanceof SimpleTypeImpl) {
289                                type = type.getSuperType();
290                            }
291                            if (type.isSimpleType()) {
292                                if (type instanceof StringType) {
293                                    fieldValue = stringValue;
294                                } else if (type instanceof IntegerType) {
295                                    fieldValue = Integer.valueOf(stringValue);
296                                } else if (type instanceof LongType) {
297                                    fieldValue = Long.valueOf(stringValue);
298                                } else if (type instanceof DoubleType) {
299                                    fieldValue = Double.valueOf(stringValue);
300                                } else if (type instanceof BooleanType) {
301                                    fieldValue = Boolean.valueOf(stringValue);
302                                } else if (type instanceof DateType) {
303                                    fieldValue = getDateFormat().parse(stringValue);
304                                }
305                            }
306                        }
307                    }
308                    return fieldValue;
309                } catch (ParseException pe) {
310                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
311                    log.debug(pe, pe);
312                } catch (NumberFormatException nfe) {
313                    logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue);
314                    log.debug(nfe, nfe);
315                }
316            }
317        } else {
318            logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName());
319        }
320        return null;
321    }
322
323    protected DateFormat getDateFormat() {
324        // transient field so may become null
325        if (dateformat == null) {
326            dateformat = new SimpleDateFormat(config.getDateFormat());
327        }
328        return dateformat;
329    }
330
331    protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session,
332            UserProfileService userProfileService, Map<String, Serializable> properties) {
333
334        DocumentModel doc = userProfileService.getUserProfileDocument(name, session);
335        Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created");
336        boolean isCreated = (createdDate.getTime().after(startDate));
337        if (!isCreated && !config.isUpdateExisting()) {
338            logImportInfo(lineNumber, "Document already exists for user: %s", name);
339            return false;
340        }
341
342        for (Map.Entry<String, Serializable> entry : properties.entrySet()) {
343            doc.setPropertyValue(entry.getKey(), entry.getValue());
344        }
345
346        try {
347            session.saveDocument(doc);
348        } catch (NuxeoException e) {
349            Throwable unwrappedException = unwrapException(e);
350            logImportError(lineNumber, "Unable to update document for user: %s: %s", name,
351                    unwrappedException.getMessage());
352            log.debug(unwrappedException, unwrappedException);
353            return false;
354        }
355        return true;
356    }
357
358    /**
359     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
360     * running a long process.
361     */
362    protected void commitOrRollbackTransaction() {
363        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
364            TransactionHelper.commitOrRollbackTransaction();
365        }
366    }
367
368    /**
369     * Starts a new transaction.
370     * <p>
371     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
372     * process.
373     *
374     * @return true if a new transaction was started
375     */
376    protected boolean startTransaction() {
377        return TransactionHelper.startTransaction();
378    }
379
380    protected void logImportError(long lineNumber, String message, String... params) {
381        String lineMessage = String.format("Line %d", lineNumber);
382        String errorMessage = String.format(message, (Object[]) params);
383        log.error(String.format("%s: %s", lineMessage, errorMessage));
384    }
385
386    protected void logImportInfo(long lineNumber, String message, String... params) {
387        String lineMessage = String.format("Line %d", lineNumber);
388        String infoMessage = String.format(message, (Object[]) params);
389        log.info(String.format("%s: %s", lineMessage, infoMessage));
390    }
391
392    public static Throwable unwrapException(Throwable t) {
393        Throwable cause = null;
394
395        if (t != null) {
396            cause = t.getCause();
397        }
398
399        if (cause == null) {
400            return t;
401        } else {
402            return unwrapException(cause);
403        }
404    }
405
406    public long getTotalRecords() {
407        return totalRecords;
408    }
409
410    public long getCurrentRecord() {
411        return currentRecord;
412    }
413
414}