001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Funsho David
018 */
019
020package org.nuxeo.ecm.core.bulk.message;
021
022import java.io.Serializable;
023import java.time.Instant;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Map;
027
028import javax.validation.constraints.NotNull;
029
030import org.apache.avro.reflect.AvroDefault;
031import org.apache.avro.reflect.AvroEncode;
032import org.apache.avro.reflect.Nullable;
033import org.apache.commons.lang3.builder.EqualsBuilder;
034import org.apache.commons.lang3.builder.HashCodeBuilder;
035import org.apache.commons.lang3.builder.ToStringBuilder;
036import org.nuxeo.ecm.core.api.AsyncStatus;
037
038/**
039 * A message representing a command status or a change of status (delta).
040 *
041 * @since 10.2
042 */
043public class BulkStatus implements AsyncStatus<String> {
044
045    private static final long serialVersionUID = 20181021L;
046
047    /**
048     * Possible states of a bulk command:
049     */
050    public enum State {
051        /** The command or the status is unknown. */
052        UNKNOWN,
053
054        /** The command has been submitted to the service. */
055        SCHEDULED,
056
057        /** The scroller is running and materialize the document set. */
058        SCROLLING_RUNNING,
059
060        /** The scroller has terminated, the size of the document set is known and action is applied. */
061        RUNNING,
062
063        /** The action has been applied to the document set, the command is completed. */
064        COMPLETED,
065
066        /** The command has been aborted, the action might have been partially applied on the document set. */
067        ABORTED
068    }
069
070    protected String commandId;
071
072    @Nullable
073    protected String action;
074
075    @Nullable
076    protected String username;
077
078    protected boolean delta;
079
080    protected long errorCount;
081
082    @Nullable
083    protected String errorMessage;
084
085    @Nullable
086    protected Long processed;
087
088    @Nullable
089    protected State state;
090
091    @Nullable
092    protected Long submitTime;
093
094    @Nullable
095    protected Long scrollStartTime;
096
097    @Nullable
098    protected Long scrollEndTime;
099
100    @Nullable
101    protected Long processingStartTime;
102
103    @Nullable
104    protected Long processingEndTime;
105
106    @Nullable
107    protected Long completedTime;
108
109    @Nullable
110    protected Long total;
111
112    @Nullable
113    protected Long processingDurationMillis;
114
115    @AvroDefault("false")
116    protected boolean queryLimitReached;
117
118    @Nullable
119    @AvroEncode(using = MapAsJsonAsStringEncoding.class)
120    protected Map<String, Serializable> result = new HashMap<>();
121
122    protected BulkStatus() {
123        // Empty constructor for Avro decoder
124    }
125
126    public BulkStatus(@NotNull String commandId) {
127        this.commandId = commandId;
128    }
129
130    /**
131     * Creates a delta status for a command.
132     */
133    public static BulkStatus deltaOf(@NotNull String commandId) {
134        BulkStatus ret = new BulkStatus();
135        ret.setId(commandId);
136        ret.delta = true;
137        return ret;
138    }
139
140    /**
141     * Creates a delta status for a command.
142     */
143    public static BulkStatus unknownOf(@NotNull String commandId) {
144        BulkStatus ret = new BulkStatus();
145        ret.setId(commandId);
146        ret.delta = true;
147        ret.setState(State.UNKNOWN);
148        return ret;
149    }
150
151    /**
152     * Updates the status with the provided update.
153     *
154     * @since 10.3
155     */
156    public void merge(@NotNull BulkStatus update) {
157        if (!update.isDelta()) {
158            throw new IllegalArgumentException(
159                    String.format("Cannot merge an a full status: %s with %s", this, update));
160        }
161        if (!getId().equals(update.getId())) {
162            throw new IllegalArgumentException(
163                    String.format("Cannot merge different command: %s with %s", this, update));
164        }
165        if (update.getState() != null && getState() != State.ABORTED) {
166            setState(update.getState());
167        }
168        if (update.processed != null) {
169            setProcessed(getProcessed() + update.getProcessed());
170        }
171        if (update.scrollStartTime != null) {
172            scrollStartTime = update.scrollStartTime;
173        }
174        if (update.scrollEndTime != null) {
175            scrollEndTime = update.scrollEndTime;
176        }
177        if (update.submitTime != null) {
178            submitTime = update.submitTime;
179        }
180        if (update.processingStartTime != null
181                && (processingStartTime == null || update.processingStartTime < processingStartTime)) {
182            // we take the minimum
183            processingStartTime = update.processingStartTime;
184        }
185        if (update.processingEndTime != null
186                && (processingEndTime == null || update.processingEndTime > processingEndTime)) {
187            // we take the maximum
188            processingEndTime = update.processingEndTime;
189        }
190        if (update.processingStartTime != null && update.processingEndTime != null) {
191            long deltaDuration = update.processingEndTime - update.processingStartTime;
192            setProcessingDurationMillis(getProcessingDurationMillis() + deltaDuration);
193        }
194        if (update.completedTime != null) {
195            completedTime = update.completedTime;
196        }
197        if (update.total != null) {
198            setTotal(update.getTotal());
199        }
200        if (update.getAction() != null && getAction() == null) {
201            setAction(update.action);
202        }
203        if (update.getResult() != null) {
204            setResult(update.getResult());
205        }
206        if (update.getUsername() != null && getUsername() == null) {
207            setUsername(getUsername());
208        }
209        if (update.errorCount > 0) {
210            errorCount += update.errorCount;
211        }
212        if (update.errorMessage != null && errorMessage == null) {
213            errorMessage = update.errorMessage;
214        }
215        if (update.queryLimitReached) {
216            queryLimitReached = true;
217        }
218        checkForCompletedState();
219    }
220
221    protected void checkForCompletedState() {
222        if (!isDelta() && getTotal() > 0 && getProcessed() >= getTotal()) {
223            if (getState() != State.COMPLETED && getState() != State.ABORTED) {
224                setState(State.COMPLETED);
225                setCompletedTime(Instant.now());
226            }
227        }
228    }
229
230    @Override
231    public String getId() {
232        return commandId;
233    }
234
235    public void setId(String id) {
236        this.commandId = id;
237    }
238
239    /**
240     * Gets the state of the command.
241     */
242    public State getState() {
243        return state;
244    }
245
246    public void setState(State state) {
247        this.state = state;
248    }
249
250    /**
251     * Gets the time when the command was submitted to the Bulk service.
252     */
253    public Instant getSubmitTime() {
254        return submitTime == null ? null : Instant.ofEpochMilli(submitTime);
255    }
256
257    public void setSubmitTime(@NotNull Instant submitTime) {
258        this.submitTime = submitTime.toEpochMilli();
259    }
260
261    /**
262     * Gets the time when the scroll computation starts.
263     */
264    public Instant getScrollStartTime() {
265        return scrollStartTime == null ? null : Instant.ofEpochMilli(scrollStartTime);
266    }
267
268    public void setScrollStartTime(@NotNull Instant scrollStartTime) {
269        this.scrollStartTime = scrollStartTime.toEpochMilli();
270    }
271
272    /**
273     * Gets the time when the scrolling is completed.
274     */
275    public Instant getScrollEndTime() {
276        return scrollEndTime == null ? null : Instant.ofEpochMilli(scrollEndTime);
277    }
278
279    public void setScrollEndTime(@NotNull Instant scrollEndTime) {
280        this.scrollEndTime = scrollEndTime.toEpochMilli();
281    }
282
283    /**
284     * Gets the time when the action computation starts.
285     */
286    public Instant getProcessingStartTime() {
287        return processingStartTime == null ? null : Instant.ofEpochMilli(processingStartTime);
288    }
289
290    public void setProcessingStartTime(@NotNull Instant processingStartTime) {
291        this.processingStartTime = processingStartTime.toEpochMilli();
292    }
293
294    /**
295     * Gets the time when the last action computation has terminated.
296     */
297    public Instant getProcessingEndTime() {
298        return processingEndTime == null ? null : Instant.ofEpochMilli(processingEndTime);
299    }
300
301    public void setProcessingEndTime(@NotNull Instant processingEndTime) {
302        this.processingEndTime = processingEndTime.toEpochMilli();
303    }
304
305    /**
306     * Gets the time when the command has been detected as completed.
307     */
308    public Instant getCompletedTime() {
309        return completedTime == null ? null : Instant.ofEpochMilli(completedTime);
310    }
311
312    public void setCompletedTime(@NotNull Instant completedTime) {
313        this.completedTime = completedTime.toEpochMilli();
314    }
315
316    /**
317     * Returns true if the query used by the scroller has been limited.
318     *
319     * @since 11.4
320     */
321    public boolean isQueryLimitReached() {
322        return queryLimitReached;
323    }
324
325    /**
326     * @since 11.4
327     */
328    public void setQueryLimitReached(boolean queryLimitReached) {
329        this.queryLimitReached = queryLimitReached;
330    }
331
332    @Override
333    public boolean isCompleted() {
334        return getState() == State.COMPLETED;
335    }
336
337    /**
338     * For a full status returns the number of documents where the action has been applied so far.
339     */
340    public long getProcessed() {
341        if (processed == null) {
342            return 0;
343        }
344        return processed;
345    }
346
347    /**
348     * Sets number of processed documents. For a delta this is a relative value that is aggregated during
349     * {@link #merge(BulkStatus)} operation.
350     */
351    public void setProcessed(long processed) {
352        this.processed = processed;
353    }
354
355    /**
356     * Gets the total number of documents in the document set. Returns 0 when the scroll is not yet completed.
357     */
358    public long getTotal() {
359        if (total == null) {
360            return 0;
361        }
362        return total;
363    }
364
365    /**
366     * Sets the total number of documents in the document set
367     */
368    public void setTotal(long count) {
369        this.total = count;
370    }
371
372    /**
373     * Gets action result.
374     *
375     * @return the action result
376     * @since 10.3
377     */
378    public Map<String, Serializable> getResult() {
379        return Collections.unmodifiableMap(result);
380    }
381
382    /**
383     * Sets action result.
384     *
385     * @param result the action result
386     * @since 10.3
387     */
388    public void setResult(Map<String, Serializable> result) {
389        this.result = result;
390    }
391
392    /**
393     * This is an update of a status containing only partial information. For a delta the processing start and end time,
394     * and the processed count are also delta.
395     *
396     * @since 10.3
397     */
398    public boolean isDelta() {
399        return delta;
400    }
401
402    /**
403     * Gets the action name of the command.
404     */
405    public String getAction() {
406        return action;
407    }
408
409    public void setAction(String action) {
410        this.action = action;
411    }
412
413    /**
414     * Gets the username of the user running the command.
415     */
416    public String getUsername() {
417        return username;
418    }
419
420    public void setUsername(String username) {
421        this.username = username;
422    }
423
424    /**
425     * Gets the accumulated processing time in milliseconds.
426     */
427    public long getProcessingDurationMillis() {
428        if (processingDurationMillis == null) {
429            return 0;
430        }
431        return processingDurationMillis;
432    }
433
434    public void setProcessingDurationMillis(long processingDurationMillis) {
435        this.processingDurationMillis = processingDurationMillis;
436    }
437
438    public boolean hasError() {
439        return errorCount > 0;
440    }
441
442    /**
443     * Returns the first error message if any or null.
444     */
445    public String getErrorMessage() {
446        return errorMessage;
447    }
448
449    /**
450     * Returns the number of errors encountered
451     */
452    public long getErrorCount() {
453        return errorCount;
454    }
455
456    public void setErrorCount(long errorCount) {
457        this.errorCount = errorCount;
458    }
459
460    /**
461     * An error occurred during the processing
462     */
463    public void inError(String message) {
464        if (isDelta()) {
465            errorCount = 1;
466        } else {
467            errorCount++;
468        }
469        this.errorMessage = message;
470    }
471
472    @Override
473    public int hashCode() {
474        return HashCodeBuilder.reflectionHashCode(this);
475    }
476
477    @Override
478    public boolean equals(Object o) {
479        return EqualsBuilder.reflectionEquals(this, o);
480    }
481
482    @Override
483    public String toString() {
484        return ToStringBuilder.reflectionToString(this);
485    }
486
487}