001/* 002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 */ 016package org.nuxeo.ecm.user.center.profile; 017 018import java.io.BufferedReader; 019import java.io.File; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.Reader; 024import java.io.Serializable; 025import java.text.DateFormat; 026import java.text.ParseException; 027import java.text.SimpleDateFormat; 028import java.util.Arrays; 029import java.util.Calendar; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.Map; 033 034import org.apache.commons.csv.CSVFormat; 035import org.apache.commons.csv.CSVParser; 036import org.apache.commons.csv.CSVRecord; 037import org.apache.commons.io.FilenameUtils; 038import org.apache.commons.lang.StringUtils; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.common.annotation.Experimental; 042import org.nuxeo.ecm.core.api.CoreSession; 043import org.nuxeo.ecm.core.api.DocumentModel; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 046import org.nuxeo.ecm.core.schema.DocumentType; 047import org.nuxeo.ecm.core.schema.SchemaManager; 048import org.nuxeo.ecm.core.schema.types.Field; 049import org.nuxeo.ecm.core.schema.types.ListType; 050import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl; 051import org.nuxeo.ecm.core.schema.types.Type; 052import org.nuxeo.ecm.core.schema.types.primitives.BooleanType; 053import org.nuxeo.ecm.core.schema.types.primitives.DateType; 054import org.nuxeo.ecm.core.schema.types.primitives.DoubleType; 055import org.nuxeo.ecm.core.schema.types.primitives.IntegerType; 056import org.nuxeo.ecm.core.schema.types.primitives.LongType; 057import org.nuxeo.ecm.core.schema.types.primitives.StringType; 058import org.nuxeo.runtime.api.Framework; 059import org.nuxeo.runtime.transaction.TransactionHelper; 060 061/** 062 * 063 * @since 7.2 064 */ 065@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200") 066public class UserProfileImporter { 067 068 private static final Log log = LogFactory.getLog(UserProfileImporter.class); 069 070 public static final String CONTENT_FILED_TYPE_NAME = "content"; 071 072 public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username"; 073 074 protected Character escapeCharacter = '\\'; 075 076 protected ImporterConfig config; 077 078 protected String dataFileName; 079 080 protected transient DateFormat dateformat; 081 082 protected final Date startDate; 083 084 protected long totalRecords = 0; 085 086 protected long currentRecord = 0; 087 088 public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder"; 089 090 public UserProfileImporter() { 091 startDate = new Date(); 092 } 093 094 public void doImport(CoreSession session) { 095 UserProfileService ups = Framework.getLocalService(UserProfileService.class); 096 097 config = ups.getImporterConfig(); 098 if (config == null) { 099 log.error("No importer configuration could be found"); 100 return; 101 } 102 103 dataFileName = config.getDataFileName(); 104 if (dataFileName == null) { 105 log.error("No importer dataFileName was supplied"); 106 return; 107 } 108 109 InputStream is = getResourceAsStream(dataFileName); 110 if (is == null) { 111 log.error("Error locating CSV data file: " + dataFileName); 112 return; 113 } 114 115 Reader in = new BufferedReader(new InputStreamReader(is)); 116 CSVParser parser = null; 117 118 try { 119 parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in); 120 doImport(session, parser, ups); 121 } catch (IOException e) { 122 log.error("Unable to read CSV file", e); 123 } finally { 124 if (parser != null) { 125 try { 126 parser.close(); 127 } catch (IOException e) { 128 log.debug(e, e); 129 } 130 } 131 } 132 133 } 134 135 protected InputStream getResourceAsStream(String resource) { 136 InputStream is = getClass().getClassLoader().getResourceAsStream(resource); 137 if (is == null) { 138 is = Framework.getResourceLoader().getResourceAsStream(resource); 139 if (is == null) { 140 return null; 141 } 142 } 143 return is; 144 } 145 146 public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService) 147 throws IOException { 148 log.info(String.format("Importing CSV file: %s", dataFileName)); 149 150 DocumentType docType = Framework.getLocalService(SchemaManager.class).getDocumentType( 151 UserProfileConstants.USER_PROFILE_DOCTYPE); 152 if (docType == null) { 153 log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist"); 154 return; 155 } 156 157 Map<String, Integer> header = parser.getHeaderMap(); 158 159 if (header == null) { 160 // empty file? 161 log.error("No header line, empty file?"); 162 return; 163 } 164 165 // find the index for the required name and type values 166 Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL); 167 if (nameIndex == null) { 168 log.error("Missing 'username' column"); 169 return; 170 } 171 172 long docsUpdatedCount = 0; 173 totalRecords = parser.getRecordNumber(); 174 try { 175 int batchSize = config.getBatchSize(); 176 long lineNumber = 0; 177 178 for (CSVRecord record : parser.getRecords()) { 179 lineNumber++; 180 currentRecord = lineNumber; 181 182 try { 183 if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) { 184 docsUpdatedCount++; 185 if (docsUpdatedCount % batchSize == 0) { 186 commitOrRollbackTransaction(); 187 startTransaction(); 188 } 189 } 190 } catch (NuxeoException e) { 191 // try next line 192 Throwable unwrappedException = unwrapException(e); 193 logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage()); 194 log.debug(unwrappedException, unwrappedException); 195 } 196 } 197 198 session.save(); 199 } finally { 200 commitOrRollbackTransaction(); 201 startTransaction(); 202 } 203 log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName)); 204 } 205 206 /** 207 * Import a line from the CSV file. 208 * 209 * @param userProfileService 210 * @param docType 211 * @param session 212 * @return {@code true} if a document has been created or updated, {@code false} otherwise. 213 */ 214 protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType, 215 CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues) 216 { 217 final String name = record.get(nameIndex); 218 if (StringUtils.isBlank(name)) { 219 logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue"); 220 return false; 221 } 222 223 Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record); 224 if (values == null) { 225 // skip this line 226 return false; 227 } 228 229 return updateDocument(lineNumber, name, docType, session, userProfileService, values); 230 } 231 232 protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType, 233 Map<String, Integer> headerValues, CSVRecord record) { 234 235 Map<String, Serializable> values = new HashMap<String, Serializable>(); 236 for (String headerValue : headerValues.keySet()) { 237 String lineValue = record.get(headerValue); 238 lineValue = lineValue.trim(); 239 String fieldName = headerValue; 240 if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) { 241 if (!docType.hasField(fieldName)) { 242 fieldName = fieldName.split(":")[1]; 243 } 244 if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) { 245 Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber); 246 if (convertedValue == null) { 247 return null; 248 } 249 values.put(headerValue, convertedValue); 250 } 251 } 252 } 253 return values; 254 } 255 256 protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue, 257 long lineNumber) { 258 if (docType.hasField(fieldName)) { 259 Field field = docType.getField(fieldName); 260 if (field != null) { 261 try { 262 Serializable fieldValue = null; 263 Type fieldType = field.getType(); 264 if (fieldType.isComplexType()) { 265 if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) { 266 String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY); 267 String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue); 268 File file = new File(path); 269 if (file.exists()) { 270 FileBlob blob = new FileBlob(file); 271 blob.setFilename(file.getName()); 272 fieldValue = blob; 273 } else { 274 logImportError(lineNumber, "The file '%s' does not exist", stringValue); 275 return null; 276 } 277 } 278 // other types not supported 279 } else { 280 if (fieldType.isListType()) { 281 Type listFieldType = ((ListType) fieldType).getFieldType(); 282 if (listFieldType.isSimpleType()) { 283 /* 284 * Array. 285 */ 286 fieldValue = stringValue.split(config.getListSeparatorRegex()); 287 } else { 288 /* 289 * Complex list. 290 */ 291 fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex())); 292 } 293 } else { 294 /* 295 * Primitive type. 296 */ 297 Type type = field.getType(); 298 if (type instanceof SimpleTypeImpl) { 299 type = type.getSuperType(); 300 } 301 if (type.isSimpleType()) { 302 if (type instanceof StringType) { 303 fieldValue = stringValue; 304 } else if (type instanceof IntegerType) { 305 fieldValue = Integer.valueOf(stringValue); 306 } else if (type instanceof LongType) { 307 fieldValue = Long.valueOf(stringValue); 308 } else if (type instanceof DoubleType) { 309 fieldValue = Double.valueOf(stringValue); 310 } else if (type instanceof BooleanType) { 311 fieldValue = Boolean.valueOf(stringValue); 312 } else if (type instanceof DateType) { 313 fieldValue = getDateFormat().parse(stringValue); 314 } 315 } 316 } 317 } 318 return fieldValue; 319 } catch (ParseException pe) { 320 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 321 log.debug(pe, pe); 322 } catch (NumberFormatException nfe) { 323 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 324 log.debug(nfe, nfe); 325 } 326 } 327 } else { 328 logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName()); 329 } 330 return null; 331 } 332 333 protected DateFormat getDateFormat() { 334 // transient field so may become null 335 if (dateformat == null) { 336 dateformat = new SimpleDateFormat(config.getDateFormat()); 337 } 338 return dateformat; 339 } 340 341 protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session, 342 UserProfileService userProfileService, Map<String, Serializable> properties) { 343 344 DocumentModel doc = userProfileService.getUserProfileDocument(name, session); 345 Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created"); 346 boolean isCreated = (createdDate.getTime().after(startDate)); 347 if (!isCreated && !config.isUpdateExisting()) { 348 logImportInfo(lineNumber, "Document already exists for user: %s", name); 349 return false; 350 } 351 352 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 353 doc.setPropertyValue(entry.getKey(), entry.getValue()); 354 } 355 356 try { 357 session.saveDocument(doc); 358 } catch (NuxeoException e) { 359 Throwable unwrappedException = unwrapException(e); 360 logImportError(lineNumber, "Unable to update document for user: %s: %s", name, 361 unwrappedException.getMessage()); 362 log.debug(unwrappedException, unwrappedException); 363 return false; 364 } 365 return true; 366 } 367 368 /** 369 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 370 * running a long process. 371 */ 372 protected void commitOrRollbackTransaction() { 373 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 374 TransactionHelper.commitOrRollbackTransaction(); 375 } 376 } 377 378 /** 379 * Starts a new transaction. 380 * <p> 381 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 382 * process. 383 * 384 * @return true if a new transaction was started 385 */ 386 protected boolean startTransaction() { 387 return TransactionHelper.startTransaction(); 388 } 389 390 protected void logImportError(long lineNumber, String message, String... params) { 391 String lineMessage = String.format("Line %d", lineNumber); 392 String errorMessage = String.format(message, (Object[]) params); 393 log.error(String.format("%s: %s", lineMessage, errorMessage)); 394 } 395 396 protected void logImportInfo(long lineNumber, String message, String... params) { 397 String lineMessage = String.format("Line %d", lineNumber); 398 String infoMessage = String.format(message, (Object[]) params); 399 log.info(String.format("%s: %s", lineMessage, infoMessage)); 400 } 401 402 public static Throwable unwrapException(Throwable t) { 403 Throwable cause = null; 404 405 if (t != null) { 406 cause = t.getCause(); 407 } 408 409 if (cause == null) { 410 return t; 411 } else { 412 return unwrapException(cause); 413 } 414 } 415 416 public long getTotalRecords() { 417 return totalRecords; 418 } 419 420 public long getCurrentRecord() { 421 return currentRecord; 422 } 423 424}