001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.bulk.message; 021 022import java.io.Serializable; 023import java.time.Instant; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Map; 027 028import javax.validation.constraints.NotNull; 029 030import org.apache.avro.reflect.AvroDefault; 031import org.apache.avro.reflect.AvroEncode; 032import org.apache.avro.reflect.Nullable; 033import org.apache.commons.lang3.builder.EqualsBuilder; 034import org.apache.commons.lang3.builder.HashCodeBuilder; 035import org.apache.commons.lang3.builder.ToStringBuilder; 036import org.nuxeo.ecm.core.api.AsyncStatus; 037 038/** 039 * A message representing a command status or a change of status (delta). 040 * 041 * @since 10.2 042 */ 043public class BulkStatus implements AsyncStatus<String> { 044 045 private static final long serialVersionUID = 20181021L; 046 047 /** 048 * Possible states of a bulk command: 049 */ 050 public enum State { 051 /** The command or the status is unknown. */ 052 UNKNOWN, 053 054 /** The command has been submitted to the service. */ 055 SCHEDULED, 056 057 /** The scroller is running and materialize the document set. */ 058 SCROLLING_RUNNING, 059 060 /** The scroller has terminated, the size of the document set is known and action is applied. */ 061 RUNNING, 062 063 /** The action has been applied to the document set, the command is completed. */ 064 COMPLETED, 065 066 /** The command has been aborted, the action might have been partially applied on the document set. */ 067 ABORTED 068 } 069 070 protected String commandId; 071 072 @Nullable 073 protected String action; 074 075 @Nullable 076 protected String username; 077 078 protected boolean delta; 079 080 protected long errorCount; 081 082 @Nullable 083 protected String errorMessage; 084 085 @Nullable 086 protected Long processed; 087 088 @Nullable 089 protected State state; 090 091 @Nullable 092 protected Long submitTime; 093 094 @Nullable 095 protected Long scrollStartTime; 096 097 @Nullable 098 protected Long scrollEndTime; 099 100 @Nullable 101 protected Long processingStartTime; 102 103 @Nullable 104 protected Long processingEndTime; 105 106 @Nullable 107 protected Long completedTime; 108 109 @Nullable 110 protected Long total; 111 112 @Nullable 113 protected Long processingDurationMillis; 114 115 @AvroDefault("false") 116 protected boolean queryLimitReached; 117 118 @Nullable 119 @AvroEncode(using = MapAsJsonAsStringEncoding.class) 120 protected Map<String, Serializable> result = new HashMap<>(); 121 122 protected BulkStatus() { 123 // Empty constructor for Avro decoder 124 } 125 126 public BulkStatus(@NotNull String commandId) { 127 this.commandId = commandId; 128 } 129 130 /** 131 * Creates a delta status for a command. 132 */ 133 public static BulkStatus deltaOf(@NotNull String commandId) { 134 BulkStatus ret = new BulkStatus(); 135 ret.setId(commandId); 136 ret.delta = true; 137 return ret; 138 } 139 140 /** 141 * Creates a delta status for a command. 142 */ 143 public static BulkStatus unknownOf(@NotNull String commandId) { 144 BulkStatus ret = new BulkStatus(); 145 ret.setId(commandId); 146 ret.delta = true; 147 ret.setState(State.UNKNOWN); 148 return ret; 149 } 150 151 /** 152 * Updates the status with the provided update. 153 * 154 * @since 10.3 155 */ 156 public void merge(@NotNull BulkStatus update) { 157 if (!update.isDelta()) { 158 throw new IllegalArgumentException( 159 String.format("Cannot merge an a full status: %s with %s", this, update)); 160 } 161 if (!getId().equals(update.getId())) { 162 throw new IllegalArgumentException( 163 String.format("Cannot merge different command: %s with %s", this, update)); 164 } 165 if (update.getState() != null && getState() != State.ABORTED) { 166 setState(update.getState()); 167 } 168 if (update.processed != null) { 169 setProcessed(getProcessed() + update.getProcessed()); 170 } 171 if (update.scrollStartTime != null) { 172 scrollStartTime = update.scrollStartTime; 173 } 174 if (update.scrollEndTime != null) { 175 scrollEndTime = update.scrollEndTime; 176 } 177 if (update.submitTime != null) { 178 submitTime = update.submitTime; 179 } 180 if (update.processingStartTime != null 181 && (processingStartTime == null || update.processingStartTime < processingStartTime)) { 182 // we take the minimum 183 processingStartTime = update.processingStartTime; 184 } 185 if (update.processingEndTime != null 186 && (processingEndTime == null || update.processingEndTime > processingEndTime)) { 187 // we take the maximum 188 processingEndTime = update.processingEndTime; 189 } 190 if (update.processingStartTime != null && update.processingEndTime != null) { 191 long deltaDuration = update.processingEndTime - update.processingStartTime; 192 setProcessingDurationMillis(getProcessingDurationMillis() + deltaDuration); 193 } 194 if (update.completedTime != null) { 195 completedTime = update.completedTime; 196 } 197 if (update.total != null) { 198 setTotal(update.getTotal()); 199 } 200 if (update.getAction() != null && getAction() == null) { 201 setAction(update.action); 202 } 203 if (update.getResult() != null) { 204 setResult(update.getResult()); 205 } 206 if (update.getUsername() != null && getUsername() == null) { 207 setUsername(getUsername()); 208 } 209 if (update.errorCount > 0) { 210 errorCount += update.errorCount; 211 } 212 if (update.errorMessage != null && errorMessage == null) { 213 errorMessage = update.errorMessage; 214 } 215 if (update.queryLimitReached) { 216 queryLimitReached = true; 217 } 218 checkForCompletedState(); 219 } 220 221 protected void checkForCompletedState() { 222 if (!isDelta() && getTotal() > 0 && getProcessed() >= getTotal()) { 223 if (getState() != State.COMPLETED && getState() != State.ABORTED) { 224 setState(State.COMPLETED); 225 setCompletedTime(Instant.now()); 226 } 227 } 228 } 229 230 @Override 231 public String getId() { 232 return commandId; 233 } 234 235 public void setId(String id) { 236 this.commandId = id; 237 } 238 239 /** 240 * Gets the state of the command. 241 */ 242 public State getState() { 243 return state; 244 } 245 246 public void setState(State state) { 247 this.state = state; 248 } 249 250 /** 251 * Gets the time when the command was submitted to the Bulk service. 252 */ 253 public Instant getSubmitTime() { 254 return submitTime == null ? null : Instant.ofEpochMilli(submitTime); 255 } 256 257 public void setSubmitTime(@NotNull Instant submitTime) { 258 this.submitTime = submitTime.toEpochMilli(); 259 } 260 261 /** 262 * Gets the time when the scroll computation starts. 263 */ 264 public Instant getScrollStartTime() { 265 return scrollStartTime == null ? null : Instant.ofEpochMilli(scrollStartTime); 266 } 267 268 public void setScrollStartTime(@NotNull Instant scrollStartTime) { 269 this.scrollStartTime = scrollStartTime.toEpochMilli(); 270 } 271 272 /** 273 * Gets the time when the scrolling is completed. 274 */ 275 public Instant getScrollEndTime() { 276 return scrollEndTime == null ? null : Instant.ofEpochMilli(scrollEndTime); 277 } 278 279 public void setScrollEndTime(@NotNull Instant scrollEndTime) { 280 this.scrollEndTime = scrollEndTime.toEpochMilli(); 281 } 282 283 /** 284 * Gets the time when the action computation starts. 285 */ 286 public Instant getProcessingStartTime() { 287 return processingStartTime == null ? null : Instant.ofEpochMilli(processingStartTime); 288 } 289 290 public void setProcessingStartTime(@NotNull Instant processingStartTime) { 291 this.processingStartTime = processingStartTime.toEpochMilli(); 292 } 293 294 /** 295 * Gets the time when the last action computation has terminated. 296 */ 297 public Instant getProcessingEndTime() { 298 return processingEndTime == null ? null : Instant.ofEpochMilli(processingEndTime); 299 } 300 301 public void setProcessingEndTime(@NotNull Instant processingEndTime) { 302 this.processingEndTime = processingEndTime.toEpochMilli(); 303 } 304 305 /** 306 * Gets the time when the command has been detected as completed. 307 */ 308 public Instant getCompletedTime() { 309 return completedTime == null ? null : Instant.ofEpochMilli(completedTime); 310 } 311 312 public void setCompletedTime(@NotNull Instant completedTime) { 313 this.completedTime = completedTime.toEpochMilli(); 314 } 315 316 /** 317 * Returns true if the query used by the scroller has been limited. 318 * 319 * @since 11.4 320 */ 321 public boolean isQueryLimitReached() { 322 return queryLimitReached; 323 } 324 325 /** 326 * @since 11.4 327 */ 328 public void setQueryLimitReached(boolean queryLimitReached) { 329 this.queryLimitReached = queryLimitReached; 330 } 331 332 @Override 333 public boolean isCompleted() { 334 return getState() == State.COMPLETED; 335 } 336 337 /** 338 * For a full status returns the number of documents where the action has been applied so far. 339 */ 340 public long getProcessed() { 341 if (processed == null) { 342 return 0; 343 } 344 return processed; 345 } 346 347 /** 348 * Sets number of processed documents. For a delta this is a relative value that is aggregated during 349 * {@link #merge(BulkStatus)} operation. 350 */ 351 public void setProcessed(long processed) { 352 this.processed = processed; 353 } 354 355 /** 356 * Gets the total number of documents in the document set. Returns 0 when the scroll is not yet completed. 357 */ 358 public long getTotal() { 359 if (total == null) { 360 return 0; 361 } 362 return total; 363 } 364 365 /** 366 * Sets the total number of documents in the document set 367 */ 368 public void setTotal(long count) { 369 this.total = count; 370 } 371 372 /** 373 * Gets action result. 374 * 375 * @return the action result 376 * @since 10.3 377 */ 378 public Map<String, Serializable> getResult() { 379 return Collections.unmodifiableMap(result); 380 } 381 382 /** 383 * Sets action result. 384 * 385 * @param result the action result 386 * @since 10.3 387 */ 388 public void setResult(Map<String, Serializable> result) { 389 this.result = result; 390 } 391 392 /** 393 * This is an update of a status containing only partial information. For a delta the processing start and end time, 394 * and the processed count are also delta. 395 * 396 * @since 10.3 397 */ 398 public boolean isDelta() { 399 return delta; 400 } 401 402 /** 403 * Gets the action name of the command. 404 */ 405 public String getAction() { 406 return action; 407 } 408 409 public void setAction(String action) { 410 this.action = action; 411 } 412 413 /** 414 * Gets the username of the user running the command. 415 */ 416 public String getUsername() { 417 return username; 418 } 419 420 public void setUsername(String username) { 421 this.username = username; 422 } 423 424 /** 425 * Gets the accumulated processing time in milliseconds. 426 */ 427 public long getProcessingDurationMillis() { 428 if (processingDurationMillis == null) { 429 return 0; 430 } 431 return processingDurationMillis; 432 } 433 434 public void setProcessingDurationMillis(long processingDurationMillis) { 435 this.processingDurationMillis = processingDurationMillis; 436 } 437 438 public boolean hasError() { 439 return errorCount > 0; 440 } 441 442 /** 443 * Returns the first error message if any or null. 444 */ 445 public String getErrorMessage() { 446 return errorMessage; 447 } 448 449 /** 450 * Returns the number of errors encountered 451 */ 452 public long getErrorCount() { 453 return errorCount; 454 } 455 456 public void setErrorCount(long errorCount) { 457 this.errorCount = errorCount; 458 } 459 460 /** 461 * An error occurred during the processing 462 */ 463 public void inError(String message) { 464 if (isDelta()) { 465 errorCount = 1; 466 } else { 467 errorCount++; 468 } 469 this.errorMessage = message; 470 } 471 472 @Override 473 public int hashCode() { 474 return HashCodeBuilder.reflectionHashCode(this); 475 } 476 477 @Override 478 public boolean equals(Object o) { 479 return EqualsBuilder.reflectionEquals(this, o); 480 } 481 482 @Override 483 public String toString() { 484 return ToStringBuilder.reflectionToString(this); 485 } 486 487}