001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.bulk.message; 021 022import java.io.Serializable; 023import java.time.Instant; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Map; 027 028import javax.validation.constraints.NotNull; 029 030import org.apache.avro.reflect.AvroEncode; 031import org.apache.avro.reflect.Nullable; 032import org.apache.commons.lang3.builder.EqualsBuilder; 033import org.apache.commons.lang3.builder.HashCodeBuilder; 034import org.apache.commons.lang3.builder.ToStringBuilder; 035import org.nuxeo.ecm.core.api.AsyncStatus; 036 037/** 038 * A message representing a command status or a change of status (delta). 039 * 040 * @since 10.2 041 */ 042public class BulkStatus implements AsyncStatus<String> { 043 044 private static final long serialVersionUID = 20181021L; 045 046 /** 047 * Possible states of a bulk command: 048 */ 049 public enum State { 050 /** The command or the status is unknown. */ 051 UNKNOWN, 052 053 /** The command has been submitted to the service. */ 054 SCHEDULED, 055 056 /** The scroller is running and materialize the document set. */ 057 SCROLLING_RUNNING, 058 059 /** The scroller has terminated, the size of the document set is known and action is applied. */ 060 RUNNING, 061 062 /** The action has been applied to the document set, the command is completed. */ 063 COMPLETED, 064 065 /** The command has been aborted, the action might have been partially applied on the document set. */ 066 ABORTED 067 } 068 069 protected String commandId; 070 071 @Nullable 072 protected String action; 073 074 @Nullable 075 protected String username; 076 077 protected boolean delta; 078 079 protected long errorCount; 080 081 @Nullable 082 protected String errorMessage; 083 084 @Nullable 085 protected Long processed; 086 087 @Nullable 088 protected State state; 089 090 @Nullable 091 protected Long submitTime; 092 093 @Nullable 094 protected Long scrollStartTime; 095 096 @Nullable 097 protected Long scrollEndTime; 098 099 @Nullable 100 protected Long processingStartTime; 101 102 @Nullable 103 protected Long processingEndTime; 104 105 @Nullable 106 protected Long completedTime; 107 108 @Nullable 109 protected Long total; 110 111 @Nullable 112 protected Long processingDurationMillis; 113 114 @Nullable 115 @AvroEncode(using = MapAsJsonAsStringEncoding.class) 116 protected Map<String, Serializable> result = new HashMap<>(); 117 118 protected BulkStatus() { 119 // Empty constructor for Avro decoder 120 } 121 122 public BulkStatus(@NotNull String commandId) { 123 this.commandId = commandId; 124 } 125 126 /** 127 * Creates a delta status for a command. 128 */ 129 public static BulkStatus deltaOf(@NotNull String commandId) { 130 BulkStatus ret = new BulkStatus(); 131 ret.setId(commandId); 132 ret.delta = true; 133 return ret; 134 } 135 136 /** 137 * Creates a delta status for a command. 138 */ 139 public static BulkStatus unknownOf(@NotNull String commandId) { 140 BulkStatus ret = new BulkStatus(); 141 ret.setId(commandId); 142 ret.delta = true; 143 ret.setState(State.UNKNOWN); 144 return ret; 145 } 146 147 /** 148 * Updates the status with the provided update. 149 * 150 * @since 10.3 151 */ 152 public void merge(@NotNull BulkStatus update) { 153 if (!update.isDelta()) { 154 throw new IllegalArgumentException( 155 String.format("Cannot merge an a full status: %s with %s", this, update)); 156 } 157 if (!getId().equals(update.getId())) { 158 throw new IllegalArgumentException( 159 String.format("Cannot merge different command: %s with %s", this, update)); 160 } 161 if (update.getState() != null && getState() != State.ABORTED) { 162 setState(update.getState()); 163 } 164 if (update.processed != null) { 165 setProcessed(getProcessed() + update.getProcessed()); 166 } 167 if (update.scrollStartTime != null) { 168 scrollStartTime = update.scrollStartTime; 169 } 170 if (update.scrollEndTime != null) { 171 scrollEndTime = update.scrollEndTime; 172 } 173 if (update.submitTime != null) { 174 submitTime = update.submitTime; 175 } 176 if (update.processingStartTime != null 177 && (processingStartTime == null || update.processingStartTime < processingStartTime)) { 178 // we take the minimum 179 processingStartTime = update.processingStartTime; 180 } 181 if (update.processingEndTime != null 182 && (processingEndTime == null || update.processingEndTime > processingEndTime)) { 183 // we take the maximum 184 processingEndTime = update.processingEndTime; 185 } 186 if (update.processingStartTime != null && update.processingEndTime != null) { 187 long deltaDuration = update.processingEndTime - update.processingStartTime; 188 setProcessingDurationMillis(getProcessingDurationMillis() + deltaDuration); 189 } 190 if (update.completedTime != null) { 191 completedTime = update.completedTime; 192 } 193 if (update.total != null) { 194 setTotal(update.getTotal()); 195 } 196 if (update.getAction() != null && getAction() == null) { 197 setAction(update.action); 198 } 199 if (update.getResult() != null) { 200 setResult(update.getResult()); 201 } 202 if (update.getUsername() != null && getUsername() == null) { 203 setUsername(getUsername()); 204 } 205 if (update.errorCount > 0) { 206 errorCount++; 207 } 208 if (update.errorMessage != null && errorMessage == null) { 209 errorMessage = update.errorMessage; 210 } 211 checkForCompletedState(); 212 } 213 214 protected void checkForCompletedState() { 215 if (!isDelta() && getTotal() > 0 && getProcessed() >= getTotal()) { 216 if (getState() != State.COMPLETED && getState() != State.ABORTED) { 217 setState(State.COMPLETED); 218 setCompletedTime(Instant.now()); 219 } 220 } 221 } 222 223 @Override 224 public String getId() { 225 return commandId; 226 } 227 228 public void setId(String id) { 229 this.commandId = id; 230 } 231 232 /** 233 * Gets the state of the command. 234 */ 235 public State getState() { 236 return state; 237 } 238 239 public void setState(State state) { 240 this.state = state; 241 } 242 243 /** 244 * Gets the time when the command was submitted to the Bulk service. 245 */ 246 public Instant getSubmitTime() { 247 return submitTime == null ? null : Instant.ofEpochMilli(submitTime); 248 } 249 250 public void setSubmitTime(@NotNull Instant submitTime) { 251 this.submitTime = submitTime.toEpochMilli(); 252 } 253 254 /** 255 * Gets the time when the scroll computation starts. 256 */ 257 public Instant getScrollStartTime() { 258 return scrollStartTime == null ? null : Instant.ofEpochMilli(scrollStartTime); 259 } 260 261 public void setScrollStartTime(@NotNull Instant scrollStartTime) { 262 this.scrollStartTime = scrollStartTime.toEpochMilli(); 263 } 264 265 /** 266 * Gets the time when the scrolling is completed. 267 */ 268 public Instant getScrollEndTime() { 269 return scrollEndTime == null ? null : Instant.ofEpochMilli(scrollEndTime); 270 } 271 272 public void setScrollEndTime(@NotNull Instant scrollEndTime) { 273 this.scrollEndTime = scrollEndTime.toEpochMilli(); 274 } 275 276 /** 277 * Gets the time when the action computation starts. 278 */ 279 public Instant getProcessingStartTime() { 280 return processingStartTime == null ? null : Instant.ofEpochMilli(processingStartTime); 281 } 282 283 public void setProcessingStartTime(@NotNull Instant processingStartTime) { 284 this.processingStartTime = processingStartTime.toEpochMilli(); 285 } 286 287 /** 288 * Gets the time when the last action computation has terminated. 289 */ 290 public Instant getProcessingEndTime() { 291 return processingEndTime == null ? null : Instant.ofEpochMilli(processingEndTime); 292 } 293 294 public void setProcessingEndTime(@NotNull Instant processingEndTime) { 295 this.processingEndTime = processingEndTime.toEpochMilli(); 296 } 297 298 /** 299 * Gets the time when the command has been detected as completed. 300 */ 301 public Instant getCompletedTime() { 302 return completedTime == null ? null : Instant.ofEpochMilli(completedTime); 303 } 304 305 public void setCompletedTime(@NotNull Instant completedTime) { 306 this.completedTime = completedTime.toEpochMilli(); 307 } 308 309 @Override 310 public boolean isCompleted() { 311 return getState() == State.COMPLETED; 312 } 313 314 /** 315 * For a full status returns the number of documents where the action has been applied so far. 316 */ 317 public long getProcessed() { 318 if (processed == null) { 319 return 0; 320 } 321 return processed; 322 } 323 324 /** 325 * Sets number of processed documents. For a delta this is a relative value that is aggregated during 326 * {@link #merge(BulkStatus)} operation. 327 */ 328 public void setProcessed(long processed) { 329 this.processed = processed; 330 } 331 332 /** 333 * Gets the total number of documents in the document set. Returns 0 when the scroll is not yet completed. 334 */ 335 public long getTotal() { 336 if (total == null) { 337 return 0; 338 } 339 return total; 340 } 341 342 /** 343 * Sets the total number of documents in the document set 344 */ 345 public void setTotal(long count) { 346 this.total = count; 347 } 348 349 /** 350 * Gets action result. 351 * 352 * @return the action result 353 * @since 10.3 354 */ 355 public Map<String, Serializable> getResult() { 356 return Collections.unmodifiableMap(result); 357 } 358 359 /** 360 * Sets action result. 361 * 362 * @param result the action result 363 * @since 10.3 364 */ 365 public void setResult(Map<String, Serializable> result) { 366 this.result = result; 367 } 368 369 /** 370 * This is an update of a status containing only partial information. For a delta the processing start and end time, 371 * and the processed count are also delta. 372 * 373 * @since 10.3 374 */ 375 public boolean isDelta() { 376 return delta; 377 } 378 379 /** 380 * Gets the action name of the command. 381 */ 382 public String getAction() { 383 return action; 384 } 385 386 public void setAction(String action) { 387 this.action = action; 388 } 389 390 /** 391 * Gets the username of the user running the command. 392 */ 393 public String getUsername() { 394 return username; 395 } 396 397 public void setUsername(String username) { 398 this.username = username; 399 } 400 401 /** 402 * Gets the accumulated processing time in milliseconds. 403 */ 404 public long getProcessingDurationMillis() { 405 if (processingDurationMillis == null) { 406 return 0; 407 } 408 return processingDurationMillis; 409 } 410 411 public void setProcessingDurationMillis(long processingDurationMillis) { 412 this.processingDurationMillis = processingDurationMillis; 413 } 414 415 public boolean hasError() { 416 return errorCount > 0; 417 } 418 419 /** 420 * Returns the first error message if any or null. 421 */ 422 public String getErrorMessage() { 423 return errorMessage; 424 } 425 426 /** 427 * Returns the number of errors encountered 428 */ 429 public long getErrorCount() { 430 return errorCount; 431 } 432 433 public void setErrorCount(long errorCount) { 434 this.errorCount = errorCount; 435 } 436 437 /** 438 * An error occurred during the processing 439 */ 440 public void inError(String message) { 441 if (isDelta()) { 442 errorCount = 1; 443 } else { 444 errorCount++; 445 } 446 this.errorMessage = message; 447 } 448 449 @Override 450 public int hashCode() { 451 return HashCodeBuilder.reflectionHashCode(this); 452 } 453 454 @Override 455 public boolean equals(Object o) { 456 return EqualsBuilder.reflectionEquals(this, o); 457 } 458 459 @Override 460 public String toString() { 461 return ToStringBuilder.reflectionToString(this); 462 } 463 464}