001/* 002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 */ 018package org.nuxeo.ecm.user.center.profile; 019 020import java.io.BufferedReader; 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.Reader; 026import java.io.Serializable; 027import java.text.DateFormat; 028import java.text.ParseException; 029import java.text.SimpleDateFormat; 030import java.util.Arrays; 031import java.util.Calendar; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.Map; 035 036import org.apache.commons.csv.CSVFormat; 037import org.apache.commons.csv.CSVParser; 038import org.apache.commons.csv.CSVRecord; 039import org.apache.commons.io.FilenameUtils; 040import org.apache.commons.lang.StringUtils; 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.nuxeo.common.annotation.Experimental; 044import org.nuxeo.ecm.core.api.CoreSession; 045import org.nuxeo.ecm.core.api.DocumentModel; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 048import org.nuxeo.ecm.core.schema.DocumentType; 049import org.nuxeo.ecm.core.schema.SchemaManager; 050import org.nuxeo.ecm.core.schema.types.Field; 051import org.nuxeo.ecm.core.schema.types.ListType; 052import org.nuxeo.ecm.core.schema.types.SimpleTypeImpl; 053import org.nuxeo.ecm.core.schema.types.Type; 054import org.nuxeo.ecm.core.schema.types.primitives.BooleanType; 055import org.nuxeo.ecm.core.schema.types.primitives.DateType; 056import org.nuxeo.ecm.core.schema.types.primitives.DoubleType; 057import org.nuxeo.ecm.core.schema.types.primitives.IntegerType; 058import org.nuxeo.ecm.core.schema.types.primitives.LongType; 059import org.nuxeo.ecm.core.schema.types.primitives.StringType; 060import org.nuxeo.runtime.api.Framework; 061import org.nuxeo.runtime.transaction.TransactionHelper; 062 063/** 064 * 065 * @since 7.2 066 */ 067@Experimental(comment="https://jira.nuxeo.com/browse/NXP-12200") 068public class UserProfileImporter { 069 070 private static final Log log = LogFactory.getLog(UserProfileImporter.class); 071 072 public static final String CONTENT_FILED_TYPE_NAME = "content"; 073 074 public static final String USER_PROFILE_IMPORTER_USERNAME_COL = "username"; 075 076 protected Character escapeCharacter = '\\'; 077 078 protected ImporterConfig config; 079 080 protected String dataFileName; 081 082 protected transient DateFormat dateformat; 083 084 protected final Date startDate; 085 086 protected long totalRecords = 0; 087 088 protected long currentRecord = 0; 089 090 public static final String BLOB_FOLDER_PROPERTY = "nuxeo.csv.blobs.folder"; 091 092 public UserProfileImporter() { 093 startDate = new Date(); 094 } 095 096 public void doImport(CoreSession session) { 097 UserProfileService ups = Framework.getLocalService(UserProfileService.class); 098 099 config = ups.getImporterConfig(); 100 if (config == null) { 101 log.error("No importer configuration could be found"); 102 return; 103 } 104 105 dataFileName = config.getDataFileName(); 106 if (dataFileName == null) { 107 log.error("No importer dataFileName was supplied"); 108 return; 109 } 110 111 InputStream is = getResourceAsStream(dataFileName); 112 if (is == null) { 113 log.error("Error locating CSV data file: " + dataFileName); 114 return; 115 } 116 117 Reader in = new BufferedReader(new InputStreamReader(is)); 118 CSVParser parser = null; 119 120 try { 121 parser = CSVFormat.DEFAULT.withEscape(escapeCharacter).withHeader().parse(in); 122 doImport(session, parser, ups); 123 } catch (IOException e) { 124 log.error("Unable to read CSV file", e); 125 } finally { 126 if (parser != null) { 127 try { 128 parser.close(); 129 } catch (IOException e) { 130 log.debug(e, e); 131 } 132 } 133 } 134 135 } 136 137 protected InputStream getResourceAsStream(String resource) { 138 InputStream is = getClass().getClassLoader().getResourceAsStream(resource); 139 if (is == null) { 140 is = Framework.getResourceLoader().getResourceAsStream(resource); 141 if (is == null) { 142 return null; 143 } 144 } 145 return is; 146 } 147 148 public void doImport(CoreSession session, CSVParser parser, UserProfileService userProfileService) 149 throws IOException { 150 log.info(String.format("Importing CSV file: %s", dataFileName)); 151 152 DocumentType docType = Framework.getLocalService(SchemaManager.class).getDocumentType( 153 UserProfileConstants.USER_PROFILE_DOCTYPE); 154 if (docType == null) { 155 log.error("The type " + UserProfileConstants.USER_PROFILE_DOCTYPE + " does not exist"); 156 return; 157 } 158 159 Map<String, Integer> header = parser.getHeaderMap(); 160 161 if (header == null) { 162 // empty file? 163 log.error("No header line, empty file?"); 164 return; 165 } 166 167 // find the index for the required name and type values 168 Integer nameIndex = header.get(UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL); 169 if (nameIndex == null) { 170 log.error("Missing 'username' column"); 171 return; 172 } 173 174 long docsUpdatedCount = 0; 175 totalRecords = parser.getRecordNumber(); 176 try { 177 int batchSize = config.getBatchSize(); 178 long lineNumber = 0; 179 180 for (CSVRecord record : parser.getRecords()) { 181 lineNumber++; 182 currentRecord = lineNumber; 183 184 try { 185 if (importLine(record, lineNumber, nameIndex, docType, session, userProfileService, header)) { 186 docsUpdatedCount++; 187 if (docsUpdatedCount % batchSize == 0) { 188 commitOrRollbackTransaction(); 189 startTransaction(); 190 } 191 } 192 } catch (NuxeoException e) { 193 // try next line 194 Throwable unwrappedException = unwrapException(e); 195 logImportError(lineNumber, "Error while importing line: %s", unwrappedException.getMessage()); 196 log.debug(unwrappedException, unwrappedException); 197 } 198 } 199 200 session.save(); 201 } finally { 202 commitOrRollbackTransaction(); 203 startTransaction(); 204 } 205 log.info(String.format("Done importing %s entries from CSV file: %s", docsUpdatedCount, dataFileName)); 206 } 207 208 /** 209 * Import a line from the CSV file. 210 * 211 * @param userProfileService 212 * @param docType 213 * @param session 214 * @return {@code true} if a document has been created or updated, {@code false} otherwise. 215 */ 216 protected boolean importLine(CSVRecord record, final long lineNumber, Integer nameIndex, DocumentType docType, 217 CoreSession session, UserProfileService userProfileService, Map<String, Integer> headerValues) 218 { 219 final String name = record.get(nameIndex); 220 if (StringUtils.isBlank(name)) { 221 logImportError(lineNumber, "Missing 'name' value", "label.csv.importer.missingNameValue"); 222 return false; 223 } 224 225 Map<String, Serializable> values = computePropertiesMap(lineNumber, docType, headerValues, record); 226 if (values == null) { 227 // skip this line 228 return false; 229 } 230 231 return updateDocument(lineNumber, name, docType, session, userProfileService, values); 232 } 233 234 protected Map<String, Serializable> computePropertiesMap(long lineNumber, DocumentType docType, 235 Map<String, Integer> headerValues, CSVRecord record) { 236 237 Map<String, Serializable> values = new HashMap<String, Serializable>(); 238 for (String headerValue : headerValues.keySet()) { 239 String lineValue = record.get(headerValue); 240 lineValue = lineValue.trim(); 241 String fieldName = headerValue; 242 if (!UserProfileImporter.USER_PROFILE_IMPORTER_USERNAME_COL.equals(headerValue)) { 243 if (!docType.hasField(fieldName)) { 244 fieldName = fieldName.split(":")[1]; 245 } 246 if (docType.hasField(fieldName) && !StringUtils.isBlank(lineValue)) { 247 Serializable convertedValue = convertValue(docType, fieldName, headerValue, lineValue, lineNumber); 248 if (convertedValue == null) { 249 return null; 250 } 251 values.put(headerValue, convertedValue); 252 } 253 } 254 } 255 return values; 256 } 257 258 protected Serializable convertValue(DocumentType docType, String fieldName, String headerValue, String stringValue, 259 long lineNumber) { 260 if (docType.hasField(fieldName)) { 261 Field field = docType.getField(fieldName); 262 if (field != null) { 263 try { 264 Serializable fieldValue = null; 265 Type fieldType = field.getType(); 266 if (fieldType.isComplexType()) { 267 if (fieldType.getName().equals(CONTENT_FILED_TYPE_NAME)) { 268 String blobsFolderPath = Framework.getProperty(BLOB_FOLDER_PROPERTY); 269 String path = FilenameUtils.normalize(blobsFolderPath + "/" + stringValue); 270 File file = new File(path); 271 if (file.exists()) { 272 FileBlob blob = new FileBlob(file); 273 blob.setFilename(file.getName()); 274 fieldValue = blob; 275 } else { 276 logImportError(lineNumber, "The file '%s' does not exist", stringValue); 277 return null; 278 } 279 } 280 // other types not supported 281 } else { 282 if (fieldType.isListType()) { 283 Type listFieldType = ((ListType) fieldType).getFieldType(); 284 if (listFieldType.isSimpleType()) { 285 /* 286 * Array. 287 */ 288 fieldValue = stringValue.split(config.getListSeparatorRegex()); 289 } else { 290 /* 291 * Complex list. 292 */ 293 fieldValue = (Serializable) Arrays.asList(stringValue.split(config.getListSeparatorRegex())); 294 } 295 } else { 296 /* 297 * Primitive type. 298 */ 299 Type type = field.getType(); 300 if (type instanceof SimpleTypeImpl) { 301 type = type.getSuperType(); 302 } 303 if (type.isSimpleType()) { 304 if (type instanceof StringType) { 305 fieldValue = stringValue; 306 } else if (type instanceof IntegerType) { 307 fieldValue = Integer.valueOf(stringValue); 308 } else if (type instanceof LongType) { 309 fieldValue = Long.valueOf(stringValue); 310 } else if (type instanceof DoubleType) { 311 fieldValue = Double.valueOf(stringValue); 312 } else if (type instanceof BooleanType) { 313 fieldValue = Boolean.valueOf(stringValue); 314 } else if (type instanceof DateType) { 315 fieldValue = getDateFormat().parse(stringValue); 316 } 317 } 318 } 319 } 320 return fieldValue; 321 } catch (ParseException pe) { 322 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 323 log.debug(pe, pe); 324 } catch (NumberFormatException nfe) { 325 logImportError(lineNumber, "Unable to convert field '%s' with value '%s'", headerValue, stringValue); 326 log.debug(nfe, nfe); 327 } 328 } 329 } else { 330 logImportError(lineNumber, "Field '%s' does not exist on type '%s'", headerValue, docType.getName()); 331 } 332 return null; 333 } 334 335 protected DateFormat getDateFormat() { 336 // transient field so may become null 337 if (dateformat == null) { 338 dateformat = new SimpleDateFormat(config.getDateFormat()); 339 } 340 return dateformat; 341 } 342 343 protected boolean updateDocument(long lineNumber, String name, DocumentType docType, CoreSession session, 344 UserProfileService userProfileService, Map<String, Serializable> properties) { 345 346 DocumentModel doc = userProfileService.getUserProfileDocument(name, session); 347 Calendar createdDate = (Calendar) doc.getPropertyValue("dc:created"); 348 boolean isCreated = (createdDate.getTime().after(startDate)); 349 if (!isCreated && !config.isUpdateExisting()) { 350 logImportInfo(lineNumber, "Document already exists for user: %s", name); 351 return false; 352 } 353 354 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 355 doc.setPropertyValue(entry.getKey(), entry.getValue()); 356 } 357 358 try { 359 session.saveDocument(doc); 360 } catch (NuxeoException e) { 361 Throwable unwrappedException = unwrapException(e); 362 logImportError(lineNumber, "Unable to update document for user: %s: %s", name, 363 unwrappedException.getMessage()); 364 log.debug(unwrappedException, unwrappedException); 365 return false; 366 } 367 return true; 368 } 369 370 /** 371 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 372 * running a long process. 373 */ 374 protected void commitOrRollbackTransaction() { 375 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 376 TransactionHelper.commitOrRollbackTransaction(); 377 } 378 } 379 380 /** 381 * Starts a new transaction. 382 * <p> 383 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 384 * process. 385 * 386 * @return true if a new transaction was started 387 */ 388 protected boolean startTransaction() { 389 return TransactionHelper.startTransaction(); 390 } 391 392 protected void logImportError(long lineNumber, String message, String... params) { 393 String lineMessage = String.format("Line %d", lineNumber); 394 String errorMessage = String.format(message, (Object[]) params); 395 log.error(String.format("%s: %s", lineMessage, errorMessage)); 396 } 397 398 protected void logImportInfo(long lineNumber, String message, String... params) { 399 String lineMessage = String.format("Line %d", lineNumber); 400 String infoMessage = String.format(message, (Object[]) params); 401 log.info(String.format("%s: %s", lineMessage, infoMessage)); 402 } 403 404 public static Throwable unwrapException(Throwable t) { 405 Throwable cause = null; 406 407 if (t != null) { 408 cause = t.getCause(); 409 } 410 411 if (cause == null) { 412 return t; 413 } else { 414 return unwrapException(cause); 415 } 416 } 417 418 public long getTotalRecords() { 419 return totalRecords; 420 } 421 422 public long getCurrentRecord() { 423 return currentRecord; 424 } 425 426}