001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019
020package org.nuxeo.ecm.core.bulk.message;
021
022import java.io.Serializable;
023import java.time.Instant;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Map;
027
028import javax.validation.constraints.NotNull;
029
030import org.apache.avro.reflect.AvroEncode;
031import org.apache.avro.reflect.Nullable;
032import org.apache.commons.lang3.builder.EqualsBuilder;
033import org.apache.commons.lang3.builder.HashCodeBuilder;
034import org.apache.commons.lang3.builder.ToStringBuilder;
035import org.nuxeo.ecm.core.api.AsyncStatus;
036
037/**
038 * A message representing a command status or a change of status (delta).
039 *
040 * @since 10.2
041 */
042public class BulkStatus implements AsyncStatus<String> {
043
044    private static final long serialVersionUID = 20181021L;
045
046    /**
047     * Possible states of a bulk command:
048     */
049    public enum State {
050        /** The command or the status is unknown. */
051        UNKNOWN,
052
053        /** The command has been submitted to the service. */
054        SCHEDULED,
055
056        /** The scroller is running and materialize the document set. */
057        SCROLLING_RUNNING,
058
059        /** The scroller has terminated, the size of the document set is known and action is applied. */
060        RUNNING,
061
062        /** The action has been applied to the document set, the command is completed. */
063        COMPLETED,
064
065        /** The command has been aborted, the action might have been partially applied on the document set. */
066        ABORTED
067    }
068
069    protected String commandId;
070
071    @Nullable
072    protected String action;
073
074    @Nullable
075    protected String username;
076
077    protected boolean delta;
078
079    protected long errorCount;
080
081    @Nullable
082    protected String errorMessage;
083
084    @Nullable
085    protected Long processed;
086
087    @Nullable
088    protected State state;
089
090    @Nullable
091    protected Long submitTime;
092
093    @Nullable
094    protected Long scrollStartTime;
095
096    @Nullable
097    protected Long scrollEndTime;
098
099    @Nullable
100    protected Long processingStartTime;
101
102    @Nullable
103    protected Long processingEndTime;
104
105    @Nullable
106    protected Long completedTime;
107
108    @Nullable
109    protected Long total;
110
111    @Nullable
112    protected Long processingDurationMillis;
113
114    @Nullable
115    @AvroEncode(using = MapAsJsonAsStringEncoding.class)
116    protected Map<String, Serializable> result = new HashMap<>();
117
118    protected BulkStatus() {
119        // Empty constructor for Avro decoder
120    }
121
122    public BulkStatus(@NotNull String commandId) {
123        this.commandId = commandId;
124    }
125
126    /**
127     * Creates a delta status for a command.
128     */
129    public static BulkStatus deltaOf(@NotNull String commandId) {
130        BulkStatus ret = new BulkStatus();
131        ret.setId(commandId);
132        ret.delta = true;
133        return ret;
134    }
135
136    /**
137     * Creates a delta status for a command.
138     */
139    public static BulkStatus unknownOf(@NotNull String commandId) {
140        BulkStatus ret = new BulkStatus();
141        ret.setId(commandId);
142        ret.delta = true;
143        ret.setState(State.UNKNOWN);
144        return ret;
145    }
146
147    /**
148     * Updates the status with the provided update.
149     *
150     * @since 10.3
151     */
152    public void merge(@NotNull BulkStatus update) {
153        if (!update.isDelta()) {
154            throw new IllegalArgumentException(
155                    String.format("Cannot merge an a full status: %s with %s", this, update));
156        }
157        if (!getId().equals(update.getId())) {
158            throw new IllegalArgumentException(
159                    String.format("Cannot merge different command: %s with %s", this, update));
160        }
161        if (update.getState() != null && getState() != State.ABORTED) {
162            setState(update.getState());
163        }
164        if (update.processed != null) {
165            setProcessed(getProcessed() + update.getProcessed());
166        }
167        if (update.scrollStartTime != null) {
168            scrollStartTime = update.scrollStartTime;
169        }
170        if (update.scrollEndTime != null) {
171            scrollEndTime = update.scrollEndTime;
172        }
173        if (update.submitTime != null) {
174            submitTime = update.submitTime;
175        }
176        if (update.processingStartTime != null
177                && (processingStartTime == null || update.processingStartTime < processingStartTime)) {
178            // we take the minimum
179            processingStartTime = update.processingStartTime;
180        }
181        if (update.processingEndTime != null
182                && (processingEndTime == null || update.processingEndTime > processingEndTime)) {
183            // we take the maximum
184            processingEndTime = update.processingEndTime;
185        }
186        if (update.processingStartTime != null && update.processingEndTime != null) {
187            long deltaDuration = update.processingEndTime - update.processingStartTime;
188            setProcessingDurationMillis(getProcessingDurationMillis() + deltaDuration);
189        }
190        if (update.completedTime != null) {
191            completedTime = update.completedTime;
192        }
193        if (update.total != null) {
194            setTotal(update.getTotal());
195        }
196        if (update.getAction() != null && getAction() == null) {
197            setAction(update.action);
198        }
199        if (update.getResult() != null) {
200            setResult(update.getResult());
201        }
202        if (update.getUsername() != null && getUsername() == null) {
203            setUsername(getUsername());
204        }
205        if (update.errorCount > 0) {
206            errorCount++;
207        }
208        if (update.errorMessage != null && errorMessage == null) {
209            errorMessage = update.errorMessage;
210        }
211        checkForCompletedState();
212    }
213
214    protected void checkForCompletedState() {
215        if (!isDelta() && getTotal() > 0 && getProcessed() >= getTotal()) {
216            if (getState() != State.COMPLETED && getState() != State.ABORTED) {
217                setState(State.COMPLETED);
218                setCompletedTime(Instant.now());
219            }
220        }
221    }
222
223    @Override
224    public String getId() {
225        return commandId;
226    }
227
228    public void setId(String id) {
229        this.commandId = id;
230    }
231
232    /**
233     * Gets the state of the command.
234     */
235    public State getState() {
236        return state;
237    }
238
239    public void setState(State state) {
240        this.state = state;
241    }
242
243    /**
244     * Gets the time when the command was submitted to the Bulk service.
245     */
246    public Instant getSubmitTime() {
247        return submitTime == null ? null : Instant.ofEpochMilli(submitTime);
248    }
249
250    public void setSubmitTime(@NotNull Instant submitTime) {
251        this.submitTime = submitTime.toEpochMilli();
252    }
253
254    /**
255     * Gets the time when the scroll computation starts.
256     */
257    public Instant getScrollStartTime() {
258        return scrollStartTime == null ? null : Instant.ofEpochMilli(scrollStartTime);
259    }
260
261    public void setScrollStartTime(@NotNull Instant scrollStartTime) {
262        this.scrollStartTime = scrollStartTime.toEpochMilli();
263    }
264
265    /**
266     * Gets the time when the scrolling is completed.
267     */
268    public Instant getScrollEndTime() {
269        return scrollEndTime == null ? null : Instant.ofEpochMilli(scrollEndTime);
270    }
271
272    public void setScrollEndTime(@NotNull Instant scrollEndTime) {
273        this.scrollEndTime = scrollEndTime.toEpochMilli();
274    }
275
276    /**
277     * Gets the time when the action computation starts.
278     */
279    public Instant getProcessingStartTime() {
280        return processingStartTime == null ? null : Instant.ofEpochMilli(processingStartTime);
281    }
282
283    public void setProcessingStartTime(@NotNull Instant processingStartTime) {
284        this.processingStartTime = processingStartTime.toEpochMilli();
285    }
286
287    /**
288     * Gets the time when the last action computation has terminated.
289     */
290    public Instant getProcessingEndTime() {
291        return processingEndTime == null ? null : Instant.ofEpochMilli(processingEndTime);
292    }
293
294    public void setProcessingEndTime(@NotNull Instant processingEndTime) {
295        this.processingEndTime = processingEndTime.toEpochMilli();
296    }
297
298    /**
299     * Gets the time when the command has been detected as completed.
300     */
301    public Instant getCompletedTime() {
302        return completedTime == null ? null : Instant.ofEpochMilli(completedTime);
303    }
304
305    public void setCompletedTime(@NotNull Instant completedTime) {
306        this.completedTime = completedTime.toEpochMilli();
307    }
308
309    @Override
310    public boolean isCompleted() {
311        return getState() == State.COMPLETED;
312    }
313
314    /**
315     * For a full status returns the number of documents where the action has been applied so far.
316     */
317    public long getProcessed() {
318        if (processed == null) {
319            return 0;
320        }
321        return processed;
322    }
323
324    /**
325     * Sets number of processed documents. For a delta this is a relative value that is aggregated during
326     * {@link #merge(BulkStatus)} operation.
327     */
328    public void setProcessed(long processed) {
329        this.processed = processed;
330    }
331
332    /**
333     * Gets the total number of documents in the document set. Returns 0 when the scroll is not yet completed.
334     */
335    public long getTotal() {
336        if (total == null) {
337            return 0;
338        }
339        return total;
340    }
341
342    /**
343     * Sets the total number of documents in the document set
344     */
345    public void setTotal(long count) {
346        this.total = count;
347    }
348
349    /**
350     * Gets action result.
351     *
352     * @return the action result
353     * @since 10.3
354     */
355    public Map<String, Serializable> getResult() {
356        return Collections.unmodifiableMap(result);
357    }
358
359    /**
360     * Sets action result.
361     *
362     * @param result the action result
363     * @since 10.3
364     */
365    public void setResult(Map<String, Serializable> result) {
366        this.result = result;
367    }
368
369    /**
370     * This is an update of a status containing only partial information. For a delta the processing start and end time,
371     * and the processed count are also delta.
372     *
373     * @since 10.3
374     */
375    public boolean isDelta() {
376        return delta;
377    }
378
379    /**
380     * Gets the action name of the command.
381     */
382    public String getAction() {
383        return action;
384    }
385
386    public void setAction(String action) {
387        this.action = action;
388    }
389
390    /**
391     * Gets the username of the user running the command.
392     */
393    public String getUsername() {
394        return username;
395    }
396
397    public void setUsername(String username) {
398        this.username = username;
399    }
400
401    /**
402     * Gets the accumulated processing time in milliseconds.
403     */
404    public long getProcessingDurationMillis() {
405        if (processingDurationMillis == null) {
406            return 0;
407        }
408        return processingDurationMillis;
409    }
410
411    public void setProcessingDurationMillis(long processingDurationMillis) {
412        this.processingDurationMillis = processingDurationMillis;
413    }
414
415    public boolean hasError() {
416        return errorCount > 0;
417    }
418
419    /**
420     * Returns the first error message if any or null.
421     */
422    public String getErrorMessage() {
423        return errorMessage;
424    }
425
426    /**
427     * Returns the number of errors encountered
428     */
429    public long getErrorCount() {
430        return errorCount;
431    }
432
433    public void setErrorCount(long errorCount) {
434        this.errorCount = errorCount;
435    }
436
437    /**
438     * An error occurred during the processing
439     */
440    public void inError(String message) {
441        if (isDelta()) {
442            errorCount = 1;
443        } else {
444            errorCount++;
445        }
446        this.errorMessage = message;
447    }
448
449    @Override
450    public int hashCode() {
451        return HashCodeBuilder.reflectionHashCode(this);
452    }
453
454    @Override
455    public boolean equals(Object o) {
456        return EqualsBuilder.reflectionEquals(this, o);
457    }
458
459    @Override
460    public String toString() {
461        return ToStringBuilder.reflectionToString(this);
462    }
463
464}