001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 */ 018package org.nuxeo.ecm.user.center.profile; 019 020import java.io.BufferedReader; 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.Reader; 026import java.io.Serializable; 027import java.text.DateFormat; 028import java.text.ParseException; 029import java.text.SimpleDateFormat; 030import java.util.Arrays; 031import java.util.Calendar; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.Map; 035 036import org.apache.commons.csv.CSVFormat; 037import org.apache.commons.csv.CSVParser; 038import org.apache.commons.csv.CSVRecord; 039import org.apache.commons.io.FilenameUtils; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.nuxeo.common.annotation.Experimental; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.core.api.DocumentModel; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 048import org.nuxeo.ecm.core.schema.DocumentType; 049import org.nuxeo.ecm.core.schema.SchemaManager; 050import org.nuxeo.ecm.core.schema.types.Field; 051import org.nuxeo.ecm.core.schema.types.ListType; 052import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl; 053import org.nuxeo.ecm.core.schema.types.Type; 054import org.nuxeo.ecm.core.schema.types.primitives.BooleanType; 055import org.nuxeo.ecm.core.schema.types.primitives.DateType; 056import org.nuxeo.ecm.core.schema.types.primitives.DoubleType; 057import org.nuxeo.ecm.core.schema.types.primitives.IntegerType; 058import org.nuxeo.ecm.core.schema.types.primitives.LongType; 059import org.nuxeo.ecm.core.schema.types.primitives.StringType; 060import org.nuxeo.runtime.api.Framework; 061import org.nuxeo.runtime.transaction.TransactionHelper; 062 063/** 064 * 065 * @since 7.2 066 */ 067@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200") 068public class UserProfileImporter { 069 070 private static final Log log = LogFactory.getLog(UserProfileImporter.class); 071 072 public static final String CONTENT_FILED_TYPE_NAME = "content"; 073 074 public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username"; 075 076 protected Character escapeCharacter = '\\'; 077 078 protected ImporterConfig config; 079 080 protected String dataFileName; 081 082 protected transient DateFormat dateformat; 083 084 protected final Date startDate; 085 086 protected long totalRecords = 0; 087 088 protected long currentRecord = 0; 089 090 public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder"; 091 092 public UserProfileImporter() { 093 startDate = new Date(); 094 } 095 096 public void doImport(CoreSession session) { 097 UserProfileService ups = Framework.getService(UserProfileService.class); 098 099 config = ups.getImporterConfig(); 100 if (config == null) { 101 log.error("No importer configuration could be found"); 102 return; 103 } 104 105 dataFileName = config.getDataFileName(); 106 if (dataFileName == null) { 107 log.error("No importer dataFileName was supplied"); 108 return; 109 } 110 111 try (InputStream is = getResourceAsStream(dataFileName)) { 112 if (is == null) { 113 log.error("Error locating CSV data file: " + dataFileName); 114 return; 115 } 116 117 try (Reader in = new BufferedReader(new InputStreamReader(is)); 118 CSVParser parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in)) { 119 doImport(session, parser, ups); 120 } 121 } catch (IOException e) { 122 log.error("Unable to read CSV file", e); 123 } 124 125 } 126 127 @SuppressWarnings("resource") // closed by caller 128 protected InputStream getResourceAsStream(String resource) { 129 InputStream is = getClass().getClassLoader().getResourceAsStream(resource); 130 if (is == null) { 131 is = Framework.getResourceLoader().getResourceAsStream(resource); 132 if (is == null) { 133 return null; 134 } 135 } 136 return is; 137 } 138 139 public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService) 140 throws IOException { 141 log.info(String.format("Importing CSV file: %s", dataFileName)); 142 143 DocumentType docType = Framework.getService(SchemaManager.class).getDocumentType( 144 UserProfileConstants.USER_PROFILE_DOCTYPE); 145 if (docType == null) { 146 log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist"); 147 return; 148 } 149 150 Map<String, Integer> header = parser.getHeaderMap(); 151 152 if (header == null) { 153 // empty file? 154 log.error("No header line, empty file?"); 155 return; 156 } 157 158 // find the index for the required name and type values 159 Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL); 160 if (nameIndex == null) { 161 log.error("Missing 'username' column"); 162 return; 163 } 164 165 long docsUpdatedCount = 0; 166 totalRecords = parser.getRecordNumber(); 167 try { 168 int batchSize = config.getBatchSize(); 169 long lineNumber = 0; 170 171 for (CSVRecord record : parser.getRecords()) { 172 lineNumber++; 173 currentRecord = lineNumber; 174 175 try { 176 if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) { 177 docsUpdatedCount++; 178 if (docsUpdatedCount % batchSize == 0) { 179 commitOrRollbackTransaction(); 180 startTransaction(); 181 } 182 } 183 } catch (NuxeoException e) { 184 // try next line 185 Throwable unwrappedException = unwrapException(e); 186 logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage()); 187 log.debug(unwrappedException, unwrappedException); 188 } 189 } 190 191 session.save(); 192 } finally { 193 commitOrRollbackTransaction(); 194 startTransaction(); 195 } 196 log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName)); 197 } 198 199 /** 200 * Import a line from the CSV file. 201 * 202 * @return {@code true} if a document has been created or updated, {@code false} otherwise. 203 */ 204 protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType, 205 CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues) 206 { 207 final String name = record.get(nameIndex); 208 if (StringUtils.isBlank(name)) { 209 logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue"); 210 return false; 211 } 212 213 Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record); 214 if (values == null) { 215 // skip this line 216 return false; 217 } 218 219 return updateDocument(lineNumber, name, docType, session, userProfileService, values); 220 } 221 222 protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType, 223 Map<String, Integer> headerValues, CSVRecord record) { 224 225 Map<String, Serializable> values = new HashMap<>(); 226 for (String headerValue : headerValues.keySet()) { 227 String lineValue = record.get(headerValue); 228 lineValue = lineValue.trim(); 229 String fieldName = headerValue; 230 if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) { 231 if (!docType.hasField(fieldName)) { 232 fieldName = fieldName.split(":")[1]; 233 } 234 if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) { 235 Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber); 236 if (convertedValue == null) { 237 return null; 238 } 239 values.put(headerValue, convertedValue); 240 } 241 } 242 } 243 return values; 244 } 245 246 protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue, 247 long lineNumber) { 248 if (docType.hasField(fieldName)) { 249 Field field = docType.getField(fieldName); 250 if (field != null) { 251 try { 252 Serializable fieldValue = null; 253 Type fieldType = field.getType(); 254 if (fieldType.isComplexType()) { 255 if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) { 256 String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY); 257 String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue); 258 File file = new File(path); 259 if (file.exists()) { 260 FileBlob blob = new FileBlob(file); 261 blob.setFilename(file.getName()); 262 fieldValue = blob; 263 } else { 264 logImportError(lineNumber, "The file '%s' does not exist", stringValue); 265 return null; 266 } 267 } 268 // other types not supported 269 } else { 270 if (fieldType.isListType()) { 271 Type listFieldType = ((ListType) fieldType).getFieldType(); 272 if (listFieldType.isSimpleType()) { 273 /* 274 * Array. 275 */ 276 fieldValue = stringValue.split(config.getListSeparatorRegex()); 277 } else { 278 /* 279 * Complex list. 280 */ 281 fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex())); 282 } 283 } else { 284 /* 285 * Primitive type. 286 */ 287 Type type = field.getType(); 288 if (type instanceof SimpleTypeImpl) { 289 type = type.getSuperType(); 290 } 291 if (type.isSimpleType()) { 292 if (type instanceof StringType) { 293 fieldValue = stringValue; 294 } else if (type instanceof IntegerType) { 295 fieldValue = Integer.valueOf(stringValue); 296 } else if (type instanceof LongType) { 297 fieldValue = Long.valueOf(stringValue); 298 } else if (type instanceof DoubleType) { 299 fieldValue = Double.valueOf(stringValue); 300 } else if (type instanceof BooleanType) { 301 fieldValue = Boolean.valueOf(stringValue); 302 } else if (type instanceof DateType) { 303 fieldValue = getDateFormat().parse(stringValue); 304 } 305 } 306 } 307 } 308 return fieldValue; 309 } catch (ParseException pe) { 310 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 311 log.debug(pe, pe); 312 } catch (NumberFormatException nfe) { 313 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 314 log.debug(nfe, nfe); 315 } 316 } 317 } else { 318 logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName()); 319 } 320 return null; 321 } 322 323 protected DateFormat getDateFormat() { 324 // transient field so may become null 325 if (dateformat == null) { 326 dateformat = new SimpleDateFormat(config.getDateFormat()); 327 } 328 return dateformat; 329 } 330 331 protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session, 332 UserProfileService userProfileService, Map<String, Serializable> properties) { 333 334 DocumentModel doc = userProfileService.getUserProfileDocument(name, session); 335 Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created"); 336 boolean isCreated = (createdDate.getTime().after(startDate)); 337 if (!isCreated && !config.isUpdateExisting()) { 338 logImportInfo(lineNumber, "Document already exists for user: %s", name); 339 return false; 340 } 341 342 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 343 doc.setPropertyValue(entry.getKey(), entry.getValue()); 344 } 345 346 try { 347 session.saveDocument(doc); 348 } catch (NuxeoException e) { 349 Throwable unwrappedException = unwrapException(e); 350 logImportError(lineNumber, "Unable to update document for user: %s: %s", name, 351 unwrappedException.getMessage()); 352 log.debug(unwrappedException, unwrappedException); 353 return false; 354 } 355 return true; 356 } 357 358 /** 359 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 360 * running a long process. 361 */ 362 protected void commitOrRollbackTransaction() { 363 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 364 TransactionHelper.commitOrRollbackTransaction(); 365 } 366 } 367 368 /** 369 * Starts a new transaction. 370 * <p> 371 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 372 * process. 373 * 374 * @return true if a new transaction was started 375 */ 376 protected boolean startTransaction() { 377 return TransactionHelper.startTransaction(); 378 } 379 380 protected void logImportError(long lineNumber, String message, String... params) { 381 String lineMessage = String.format("Line %d", lineNumber); 382 String errorMessage = String.format(message, (Object[]) params); 383 log.error(String.format("%s: %s", lineMessage, errorMessage)); 384 } 385 386 protected void logImportInfo(long lineNumber, String message, String... params) { 387 String lineMessage = String.format("Line %d", lineNumber); 388 String infoMessage = String.format(message, (Object[]) params); 389 log.info(String.format("%s: %s", lineMessage, infoMessage)); 390 } 391 392 public static Throwable unwrapException(Throwable t) { 393 Throwable cause = null; 394 395 if (t != null) { 396 cause = t.getCause(); 397 } 398 399 if (cause == null) { 400 return t; 401 } else { 402 return unwrapException(cause); 403 } 404 } 405 406 public long getTotalRecords() { 407 return totalRecords; 408 } 409 410 public long getCurrentRecord() { 411 return currentRecord; 412 } 413 414}