001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 */ 018package org.nuxeo.ecm.user.center.profile; 019 020import java.io.BufferedReader; 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.Reader; 026import java.io.Serializable; 027import java.text.DateFormat; 028import java.text.ParseException; 029import java.text.SimpleDateFormat; 030import java.util.Arrays; 031import java.util.Calendar; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.Map; 035 036import org.apache.commons.csv.CSVFormat; 037import org.apache.commons.csv.CSVParser; 038import org.apache.commons.csv.CSVRecord; 039import org.apache.commons.io.FilenameUtils; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.nuxeo.common.annotation.Experimental; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.core.api.DocumentModel; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 048import org.nuxeo.ecm.core.schema.DocumentType; 049import org.nuxeo.ecm.core.schema.SchemaManager; 050import org.nuxeo.ecm.core.schema.types.Field; 051import org.nuxeo.ecm.core.schema.types.ListType; 052import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl; 053import org.nuxeo.ecm.core.schema.types.Type; 054import org.nuxeo.ecm.core.schema.types.primitives.BooleanType; 055import org.nuxeo.ecm.core.schema.types.primitives.DateType; 056import org.nuxeo.ecm.core.schema.types.primitives.DoubleType; 057import org.nuxeo.ecm.core.schema.types.primitives.IntegerType; 058import org.nuxeo.ecm.core.schema.types.primitives.LongType; 059import org.nuxeo.ecm.core.schema.types.primitives.StringType; 060import org.nuxeo.runtime.api.Framework; 061import org.nuxeo.runtime.transaction.TransactionHelper; 062 063/** 064 * 065 * @since 7.2 066 */ 067@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200") 068public class UserProfileImporter { 069 070 private static final Log log = LogFactory.getLog(UserProfileImporter.class); 071 072 public static final String CONTENT_FILED_TYPE_NAME = "content"; 073 074 public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username"; 075 076 protected Character escapeCharacter = '\\'; 077 078 protected ImporterConfig config; 079 080 protected String dataFileName; 081 082 protected transient DateFormat dateformat; 083 084 protected final Date startDate; 085 086 protected long totalRecords = 0; 087 088 protected long currentRecord = 0; 089 090 public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder"; 091 092 public UserProfileImporter() { 093 startDate = new Date(); 094 } 095 096 public void doImport(CoreSession session) { 097 UserProfileService ups = Framework.getService(UserProfileService.class); 098 099 config = ups.getImporterConfig(); 100 if (config == null) { 101 log.error("No importer configuration could be found"); 102 return; 103 } 104 105 dataFileName = config.getDataFileName(); 106 if (dataFileName == null) { 107 log.error("No importer dataFileName was supplied"); 108 return; 109 } 110 111 InputStream is = getResourceAsStream(dataFileName); 112 if (is == null) { 113 log.error("Error locating CSV data file: " + dataFileName); 114 return; 115 } 116 117 Reader in = new BufferedReader(new InputStreamReader(is)); 118 CSVParser parser = null; 119 120 try { 121 parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in); 122 doImport(session, parser, ups); 123 } catch (IOException e) { 124 log.error("Unable to read CSV file", e); 125 } finally { 126 if (parser != null) { 127 try { 128 parser.close(); 129 } catch (IOException e) { 130 log.debug(e, e); 131 } 132 } 133 } 134 135 } 136 137 protected InputStream getResourceAsStream(String resource) { 138 InputStream is = getClass().getClassLoader().getResourceAsStream(resource); 139 if (is == null) { 140 is = Framework.getResourceLoader().getResourceAsStream(resource); 141 if (is == null) { 142 return null; 143 } 144 } 145 return is; 146 } 147 148 public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService) 149 throws IOException { 150 log.info(String.format("Importing CSV file: %s", dataFileName)); 151 152 DocumentType docType = Framework.getService(SchemaManager.class).getDocumentType( 153 UserProfileConstants.USER_PROFILE_DOCTYPE); 154 if (docType == null) { 155 log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist"); 156 return; 157 } 158 159 Map<String, Integer> header = parser.getHeaderMap(); 160 161 if (header == null) { 162 // empty file? 163 log.error("No header line, empty file?"); 164 return; 165 } 166 167 // find the index for the required name and type values 168 Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL); 169 if (nameIndex == null) { 170 log.error("Missing 'username' column"); 171 return; 172 } 173 174 long docsUpdatedCount = 0; 175 totalRecords = parser.getRecordNumber(); 176 try { 177 int batchSize = config.getBatchSize(); 178 long lineNumber = 0; 179 180 for (CSVRecord record : parser.getRecords()) { 181 lineNumber++; 182 currentRecord = lineNumber; 183 184 try { 185 if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) { 186 docsUpdatedCount++; 187 if (docsUpdatedCount % batchSize == 0) { 188 commitOrRollbackTransaction(); 189 startTransaction(); 190 } 191 } 192 } catch (NuxeoException e) { 193 // try next line 194 Throwable unwrappedException = unwrapException(e); 195 logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage()); 196 log.debug(unwrappedException, unwrappedException); 197 } 198 } 199 200 session.save(); 201 } finally { 202 commitOrRollbackTransaction(); 203 startTransaction(); 204 } 205 log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName)); 206 } 207 208 /** 209 * Import a line from the CSV file. 210 * 211 * @return {@code true} if a document has been created or updated, {@code false} otherwise. 212 */ 213 protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType, 214 CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues) 215 { 216 final String name = record.get(nameIndex); 217 if (StringUtils.isBlank(name)) { 218 logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue"); 219 return false; 220 } 221 222 Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record); 223 if (values == null) { 224 // skip this line 225 return false; 226 } 227 228 return updateDocument(lineNumber, name, docType, session, userProfileService, values); 229 } 230 231 protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType, 232 Map<String, Integer> headerValues, CSVRecord record) { 233 234 Map<String, Serializable> values = new HashMap<String, Serializable>(); 235 for (String headerValue : headerValues.keySet()) { 236 String lineValue = record.get(headerValue); 237 lineValue = lineValue.trim(); 238 String fieldName = headerValue; 239 if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) { 240 if (!docType.hasField(fieldName)) { 241 fieldName = fieldName.split(":")[1]; 242 } 243 if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) { 244 Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber); 245 if (convertedValue == null) { 246 return null; 247 } 248 values.put(headerValue, convertedValue); 249 } 250 } 251 } 252 return values; 253 } 254 255 protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue, 256 long lineNumber) { 257 if (docType.hasField(fieldName)) { 258 Field field = docType.getField(fieldName); 259 if (field != null) { 260 try { 261 Serializable fieldValue = null; 262 Type fieldType = field.getType(); 263 if (fieldType.isComplexType()) { 264 if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) { 265 String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY); 266 String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue); 267 File file = new File(path); 268 if (file.exists()) { 269 FileBlob blob = new FileBlob(file); 270 blob.setFilename(file.getName()); 271 fieldValue = blob; 272 } else { 273 logImportError(lineNumber, "The file '%s' does not exist", stringValue); 274 return null; 275 } 276 } 277 // other types not supported 278 } else { 279 if (fieldType.isListType()) { 280 Type listFieldType = ((ListType) fieldType).getFieldType(); 281 if (listFieldType.isSimpleType()) { 282 /* 283 * Array. 284 */ 285 fieldValue = stringValue.split(config.getListSeparatorRegex()); 286 } else { 287 /* 288 * Complex list. 289 */ 290 fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex())); 291 } 292 } else { 293 /* 294 * Primitive type. 295 */ 296 Type type = field.getType(); 297 if (type instanceof SimpleTypeImpl) { 298 type = type.getSuperType(); 299 } 300 if (type.isSimpleType()) { 301 if (type instanceof StringType) { 302 fieldValue = stringValue; 303 } else if (type instanceof IntegerType) { 304 fieldValue = Integer.valueOf(stringValue); 305 } else if (type instanceof LongType) { 306 fieldValue = Long.valueOf(stringValue); 307 } else if (type instanceof DoubleType) { 308 fieldValue = Double.valueOf(stringValue); 309 } else if (type instanceof BooleanType) { 310 fieldValue = Boolean.valueOf(stringValue); 311 } else if (type instanceof DateType) { 312 fieldValue = getDateFormat().parse(stringValue); 313 } 314 } 315 } 316 } 317 return fieldValue; 318 } catch (ParseException pe) { 319 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 320 log.debug(pe, pe); 321 } catch (NumberFormatException nfe) { 322 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 323 log.debug(nfe, nfe); 324 } 325 } 326 } else { 327 logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName()); 328 } 329 return null; 330 } 331 332 protected DateFormat getDateFormat() { 333 // transient field so may become null 334 if (dateformat == null) { 335 dateformat = new SimpleDateFormat(config.getDateFormat()); 336 } 337 return dateformat; 338 } 339 340 protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session, 341 UserProfileService userProfileService, Map<String, Serializable> properties) { 342 343 DocumentModel doc = userProfileService.getUserProfileDocument(name, session); 344 Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created"); 345 boolean isCreated = (createdDate.getTime().after(startDate)); 346 if (!isCreated && !config.isUpdateExisting()) { 347 logImportInfo(lineNumber, "Document already exists for user: %s", name); 348 return false; 349 } 350 351 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 352 doc.setPropertyValue(entry.getKey(), entry.getValue()); 353 } 354 355 try { 356 session.saveDocument(doc); 357 } catch (NuxeoException e) { 358 Throwable unwrappedException = unwrapException(e); 359 logImportError(lineNumber, "Unable to update document for user: %s: %s", name, 360 unwrappedException.getMessage()); 361 log.debug(unwrappedException, unwrappedException); 362 return false; 363 } 364 return true; 365 } 366 367 /** 368 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 369 * running a long process. 370 */ 371 protected void commitOrRollbackTransaction() { 372 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 373 TransactionHelper.commitOrRollbackTransaction(); 374 } 375 } 376 377 /** 378 * Starts a new transaction. 379 * <p> 380 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 381 * process. 382 * 383 * @return true if a new transaction was started 384 */ 385 protected boolean startTransaction() { 386 return TransactionHelper.startTransaction(); 387 } 388 389 protected void logImportError(long lineNumber, String message, String... params) { 390 String lineMessage = String.format("Line %d", lineNumber); 391 String errorMessage = String.format(message, (Object[]) params); 392 log.error(String.format("%s: %s", lineMessage, errorMessage)); 393 } 394 395 protected void logImportInfo(long lineNumber, String message, String... params) { 396 String lineMessage = String.format("Line %d", lineNumber); 397 String infoMessage = String.format(message, (Object[]) params); 398 log.info(String.format("%s: %s", lineMessage, infoMessage)); 399 } 400 401 public static Throwable unwrapException(Throwable t) { 402 Throwable cause = null; 403 404 if (t != null) { 405 cause = t.getCause(); 406 } 407 408 if (cause == null) { 409 return t; 410 } else { 411 return unwrapException(cause); 412 } 413 } 414 415 public long getTotalRecords() { 416 return totalRecords; 417 } 418 419 public long getCurrentRecord() { 420 return currentRecord; 421 } 422 423}