001/*
002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.api.event.CoreEventConstants.REPOSITORY_NAME;
022import static org.nuxeo.ecm.core.work.WorkManagerImpl.DEAD_LETTER_QUEUE;
023import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
024
025import java.io.Serializable;
026import java.security.SecureRandom;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033
034import javax.security.auth.login.LoginException;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.common.utils.ExceptionUtils;
039import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
040import org.nuxeo.ecm.core.api.CoreInstance;
041import org.nuxeo.ecm.core.api.CoreSession;
042import org.nuxeo.ecm.core.api.DocumentLocation;
043import org.nuxeo.ecm.core.api.IdRef;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
046import org.nuxeo.ecm.core.event.Event;
047import org.nuxeo.ecm.core.event.EventContext;
048import org.nuxeo.ecm.core.event.EventService;
049import org.nuxeo.ecm.core.event.impl.EventContextImpl;
050import org.nuxeo.ecm.core.event.impl.EventImpl;
051import org.nuxeo.ecm.core.work.api.Work;
052import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
053import org.nuxeo.lib.stream.computation.Record;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.api.login.NuxeoLoginContext;
056import org.nuxeo.runtime.metrics.MetricsService;
057import org.nuxeo.runtime.stream.StreamService;
058import org.nuxeo.runtime.transaction.TransactionHelper;
059
060import io.dropwizard.metrics5.MetricRegistry;
061import io.dropwizard.metrics5.SharedMetricRegistries;
062import io.opencensus.common.Scope;
063import io.opencensus.trace.AttributeValue;
064import io.opencensus.trace.BlankSpan;
065import io.opencensus.trace.Link;
066import io.opencensus.trace.Span;
067import io.opencensus.trace.SpanContext;
068import io.opencensus.trace.Status;
069import io.opencensus.trace.Tracer;
070import io.opencensus.trace.Tracing;
071import io.opencensus.trace.propagation.BinaryFormat;
072import io.opencensus.trace.propagation.SpanContextParseException;
073
074/**
075 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
076 * <p>
077 * It also deals with transaction management, and prevents running work instances that are suspending.
078 * <p>
079 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
080 * available.
081 * <p>
082 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
083 * state and call {@link #suspended()}.
084 * <p>
085 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
086 *
087 * @since 5.6
088 */
089public abstract class AbstractWork implements Work {
090
091    private static final long serialVersionUID = 1L;
092
093    private static final Log log = LogFactory.getLog(AbstractWork.class);
094
095    protected static final Random RANDOM = new SecureRandom();
096
097    public static final String WORK_FAILED_EVENT = "workFailed";
098
099    public static final String WORK_INSTANCE = "workInstance";
100
101    public static final String FAILURE_MSG = "failureMsg";
102
103    public static final String FAILURE_EXCEPTION = "failureException";
104
105    protected String id;
106
107    /** Suspend requested by the work manager. */
108    protected transient volatile boolean suspending;
109
110    /** Suspend acknowledged by the work instance. */
111    protected transient volatile boolean suspended;
112
113    protected State state;
114
115    protected Progress progress;
116
117    /** Repository name for the Work instance, if relevant. */
118    protected String repositoryName;
119
120    /**
121     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
122     * instance will act.
123     * <p>
124     * Either docId or docIds is set. Not both.
125     */
126    protected String docId;
127
128    /**
129     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
130     * instance will act.
131     * <p>
132     * Either docId or docIds is set. Not both.
133     */
134    protected List<String> docIds;
135
136    /**
137     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
138     */
139    protected boolean isTree;
140
141    /**
142     * The originating username to use when opening the {@link CoreSession}.
143     *
144     * @since 8.1
145     */
146    protected String originatingUsername;
147
148    protected String status;
149
150    protected long schedulingTime;
151
152    protected long startTime;
153
154    protected long completionTime;
155
156    protected transient CoreSession session;
157
158    protected transient NuxeoLoginContext loginContext;
159
160    protected WorkSchedulePath schedulePath;
161
162    protected String callerThread;
163
164    // @since 11.1
165    public static final String GLOBAL_DLQ_COUNT_REGISTRY_NAME = MetricRegistry.name("nuxeo", "works", "dlq").getKey();
166
167    static {
168        // Initialize the metric so it is reported as 0 from start.
169        SharedMetricRegistries.getOrCreate(MetricsService.class.getName()).counter(GLOBAL_DLQ_COUNT_REGISTRY_NAME);
170    }
171
172    protected byte[] traceContext;
173
174    /**
175     * Constructs a {@link Work} instance with a unique id.
176     */
177    public AbstractWork() {
178        // we user RANDOM to deal with these cases:
179        // - several calls in the time granularity of nanoTime()
180        // - several concurrent calls on different servers
181        this(System.nanoTime() + "." + (RANDOM.nextInt() & 0x7fffffff));
182    }
183
184    public AbstractWork(String id) {
185        this.id = id;
186        progress = PROGRESS_INDETERMINATE;
187        schedulingTime = System.currentTimeMillis();
188        callerThread = Thread.currentThread().getName();
189        traceContext = Tracing.getPropagationComponent()
190                              .getBinaryFormat()
191                              .toByteArray(Tracing.getTracer().getCurrentSpan().getContext());
192    }
193
194    @Override
195    public String getId() {
196        return id;
197    }
198
199    @Override
200    public WorkSchedulePath getSchedulePath() {
201        // schedulePath is transient so will become null after deserialization
202        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
203    }
204
205    @Override
206    public void setSchedulePath(WorkSchedulePath path) {
207        schedulePath = path;
208    }
209
210    public void setDocument(String repositoryName, String docId, boolean isTree) {
211        this.repositoryName = repositoryName;
212        this.docId = docId;
213        docIds = null;
214        this.isTree = isTree;
215    }
216
217    public void setDocument(String repositoryName, String docId) {
218        setDocument(repositoryName, docId, false);
219    }
220
221    public void setDocuments(String repositoryName, List<String> docIds) {
222        this.repositoryName = repositoryName;
223        docId = null;
224        this.docIds = new ArrayList<>(docIds);
225    }
226
227    /**
228     * @since 8.1
229     */
230    public void setOriginatingUsername(String originatingUsername) {
231        this.originatingUsername = originatingUsername;
232    }
233
234    @Override
235    public void setWorkInstanceSuspending() {
236        suspending = true;
237    }
238
239    @Override
240    public boolean isSuspending() {
241        return suspending;
242    }
243
244    @Override
245    public void suspended() {
246        suspended = true;
247    }
248
249    @Override
250    public boolean isWorkInstanceSuspended() {
251        return suspended;
252    }
253
254    @Override
255    public void setWorkInstanceState(State state) {
256        this.state = state;
257        if (log.isTraceEnabled()) {
258            log.trace(this + " state=" + state);
259        }
260    }
261
262    @Override
263    public State getWorkInstanceState() {
264        return state;
265    }
266
267    @Override
268    public void setProgress(Progress progress) {
269        this.progress = progress;
270        if (log.isTraceEnabled()) {
271            log.trace(String.valueOf(this));
272        }
273    }
274
275    @Override
276    public Progress getProgress() {
277        return progress;
278    }
279
280    /**
281     * Sets a human-readable status for this work instance.
282     *
283     * @param status the status
284     */
285    public void setStatus(String status) {
286        this.status = status;
287    }
288
289    @Override
290    public String getStatus() {
291        return status;
292    }
293
294    /**
295     * May be called by implementing classes to open a session on the repository.
296     *
297     * @return the session (also available in {@code session} field)
298     * @deprecated since 8.1. Use {@link #openSystemSession()}.
299     */
300    @Deprecated
301    public CoreSession initSession() {
302        return initSession(repositoryName);
303    }
304
305    /**
306     * May be called by implementing classes to open a System session on the repository.
307     *
308     * @since 8.1
309     */
310    public void openSystemSession() {
311        loginContext = Framework.loginSystem(originatingUsername);
312        session = CoreInstance.getCoreSessionSystem(repositoryName, originatingUsername);
313    }
314
315    /**
316     * May be called by implementing classes to open a Use session on the repository.
317     * <p>
318     * It uses the set {@link #originatingUsername} to open the session.
319     *
320     * @since 8.1
321     */
322    public void openUserSession() {
323        if (originatingUsername == null) {
324            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
325        }
326
327        try {
328            loginContext = Framework.loginUser(originatingUsername);
329        } catch (LoginException e) {
330            throw new NuxeoException(e);
331        }
332
333        session = CoreInstance.getCoreSession(repositoryName);
334    }
335
336    /**
337     * May be called by implementing classes to open a session on the given repository.
338     *
339     * @param repositoryName the repository name
340     * @return the session (also available in {@code session} field)
341     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
342     *             otherwise use {@link CoreInstance#getCoreSessionSystem(String)}.
343     */
344    @Deprecated
345    public CoreSession initSession(String repositoryName) {
346        session = CoreInstance.getCoreSessionSystem(repositoryName, originatingUsername);
347        return session;
348    }
349
350    /**
351     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
352     *
353     * @since 5.8
354     */
355    public void closeSession() {
356        // loginContext may be null in tests
357        if (loginContext != null) {
358            loginContext.close();
359        }
360    }
361
362    @Override
363    public void run() {
364        if (isSuspending()) {
365            // don't run anything if we're being started while a suspend
366            // has been requested
367            suspended();
368            return;
369        }
370        Span span = getSpanFromContext(traceContext);
371        try (Scope scope = Tracing.getTracer().withSpan(span)) {
372            RuntimeException suppressed = null;
373            int retryCount = getRetryCount(); // may be 0
374            for (int i = 0; i <= retryCount; i++) {
375                if (i > 0) {
376                    span.addAnnotation("AbstractWork#run Retrying " + i);
377                    log.debug("Retrying work due to concurrent update (" + i + "): " + this);
378                    log.trace("Concurrent update", suppressed);
379                }
380                if (ExceptionUtils.hasInterruptedCause(suppressed)) {
381                    log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down");
382                    break;
383                }
384                try {
385                    runWorkWithTransaction();
386                    span.setStatus(Status.OK);
387                    return;
388                } catch (RuntimeException e) {
389                    span.addAnnotation("AbstractSession#run Failure: " + e.getMessage());
390                    span.setStatus(Status.UNKNOWN);
391                    if (suppressed == null) {
392                        suppressed = e;
393                    } else {
394                        suppressed.addSuppressed(e);
395                    }
396                }
397            }
398            workFailed(suppressed);
399        } finally {
400            span.end();
401        }
402    }
403
404    protected Span getSpanFromContext(byte[] traceContext) {
405        if (traceContext == null || traceContext.length == 0) {
406            return BlankSpan.INSTANCE;
407        }
408        Tracer tracer = Tracing.getTracer();
409        BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat();
410        try {
411            // followsFrom relationship
412            SpanContext spanContext = binaryFormat.fromByteArray(traceContext);
413            Span span = tracer.spanBuilderWithRemoteParent("work/" + getClass().getSimpleName(), spanContext)
414                              .startSpan();
415            span.addLink(Link.fromSpanContext(spanContext, Link.Type.PARENT_LINKED_SPAN));
416            HashMap<String, AttributeValue> map = new HashMap<>();
417            map.put("tx.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName()));
418            map.put("work.id", AttributeValue.stringAttributeValue(getId()));
419            map.put("work.category", AttributeValue.stringAttributeValue(getCategory()));
420            String title = getTitle();
421            if (title != null) {
422                map.put("work.title", AttributeValue.stringAttributeValue(title));
423            }
424            map.put("work.parent_path", AttributeValue.stringAttributeValue(getSchedulePath().getParentPath()));
425            map.put("work.caller_thread", AttributeValue.stringAttributeValue(callerThread));
426            map.put("work.to_string", AttributeValue.stringAttributeValue(toString()));
427            if (docId != null) {
428                map.put("work.doc_id", AttributeValue.stringAttributeValue(docId));
429            }
430            if (docIds != null && !docIds.isEmpty()) {
431                map.put("work.doc_count", AttributeValue.longAttributeValue(docIds.size()));
432            }
433            span.putAttributes(map);
434            return span;
435        } catch (SpanContextParseException e) {
436            log.warn("No span context " + traceContext.length);
437            return BlankSpan.INSTANCE;
438        }
439    }
440
441    /**
442     * Builds failure event properties. Work implementations can override this method to inject
443     * more event properties than the default.
444     * @since 10.1
445     */
446    public Map<String, Serializable> buildWorkFailureEventProps(RuntimeException exception) {
447
448        Map<String, Serializable> eventProps = new HashMap<>();
449        eventProps.put(WORK_INSTANCE, this);  // Work objects are serializable so send the whole thing
450
451        if (session != null) {
452            eventProps.put(REPOSITORY_NAME, session.getRepositoryName());
453        }
454
455        if (exception != null) {
456            eventProps.put(FAILURE_MSG, exception.getMessage());
457            eventProps.put(FAILURE_EXCEPTION, exception.getClass().getName());
458        }
459        return eventProps;
460    }
461
462    /**
463     * Called when the worker failed to run successfully even after retrying.
464     * @since 10.1
465     * @param exception the exception that occurred
466     */
467    public void workFailed(RuntimeException exception) {
468        EventService service = Framework.getService(EventService.class);
469        EventContext eventContext = new EventContextImpl(null, session != null ? session.getPrincipal() : null);
470        eventContext.setProperties(buildWorkFailureEventProps(exception));
471        Event event = new EventImpl(WORK_FAILED_EVENT, eventContext);
472        event.setIsCommitEvent(true);
473        service.fireEvent(event);
474        if (exception != null) {
475            appendWorkToDeadLetterQueue();
476            String msg = "Work failed after " + getRetryCount() + " " + (getRetryCount() == 1 ? "retry" : "retries") + ", class="
477                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
478            // all retries have been done, throw the exception
479            throw new NuxeoException(msg, exception);
480        }
481    }
482
483    protected void appendWorkToDeadLetterQueue() {
484        if (!State.RUNNING.equals(getWorkInstanceState())) {
485            // DLQ is only for Works executed by a WorkManager, in this case they are in RUNNING state.
486            return;
487        }
488        try {
489            String key = getCategory() + ":" + getId();
490            StreamService service = Framework.getService(StreamService.class);
491            if (service != null) {
492                service.getLogManager()
493                       .getAppender(DEAD_LETTER_QUEUE)
494                       .append(key, Record.of(key, WorkComputation.serialize(this)));
495                MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
496                registry.counter(GLOBAL_DLQ_COUNT_REGISTRY_NAME).inc();
497            }
498        } catch (IllegalArgumentException e) {
499            log.debug("No default log manager, don't save work in failure to a dead letter queue");
500        } catch (Exception e) {
501            String message = "Failed to save work: " + getId() + " in dead letter queue";
502            if (ExceptionUtils.hasInterruptedCause(e)) {
503                // During hot reload or forced shutdown the StreamService might be unavailable
504                // using warn level to prevent CI build to fail
505                log.warn(message, e);
506            } else {
507                log.error(message, e);
508            }
509        }
510    }
511
512    private String getTitleOr(String defaultTitle) {
513        try {
514            return getTitle();
515        } catch (Exception e) {
516            return defaultTitle;
517        }
518    }
519
520    /**
521     * Does work under a transaction.
522     *
523     * @since 5.9.4
524     */
525    protected void runWorkWithTransaction() {
526        TransactionHelper.startTransaction();
527        boolean ok = false;
528        Exception exc = null;
529        try {
530            WorkSchedulePath.handleEnter(this);
531            // --- do work
532            setStartTime();
533            work(); // may throw ConcurrentUpdateException
534            if (isGroupJoin() && WorkStateHelper.removeGroupJoinWork(getPartitionKey())) {
535                if (log.isDebugEnabled()) {
536                    log.debug(String.format("Detecting GroupJoin %s completion Work: %s", getPartitionKey(), getId()));
537                }
538                onGroupJoinCompletion();
539            }
540            ok = true;
541            // --- end work
542        } catch (Exception e) {
543            exc = e;
544            if (e instanceof RuntimeException) {
545                throw (RuntimeException) e;
546            } else if (e instanceof InterruptedException) {
547                // restore interrupted status for the thread pool worker
548                Thread.currentThread().interrupt();
549            }
550            throw new RuntimeException(e);
551        } finally {
552            WorkSchedulePath.handleReturn();
553            try {
554                setCompletionTime();
555                cleanUp(ok, exc);
556            } finally {
557                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
558                    if (!ok || isSuspending()) {
559                        log.trace(this + " is suspending, rollbacking");
560                        TransactionHelper.setTransactionRollbackOnly();
561                    }
562                    TransactionHelper.commitOrRollbackTransaction();
563                }
564            }
565        }
566    }
567
568    @Override
569    public abstract void work();
570
571    /**
572     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
573     *
574     * @return 0 for no retry, or more if some retries are possible
575     * @see #work
576     * @since 5.8
577     */
578    public int getRetryCount() {
579        return 0;
580    }
581
582    /**
583     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
584     * error or was interrupted.
585     *
586     * @param ok {@code true} if the work completed normally
587     * @param e the exception, if available
588     */
589    @Override
590    public void cleanUp(boolean ok, Exception e) {
591        if (!ok) {
592            if (ExceptionUtils.hasInterruptedCause(e)) {
593                log.debug("Interrupted work: " + this);
594            } else {
595                if (!(e instanceof ConcurrentUpdateException)) {
596                    if (!isSuspending()) {
597                        log.error("Exception during work: " + this, e);
598                        if (WorkSchedulePath.isCaptureStackEnabled()) {
599                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
600                        }
601                    }
602                }
603            }
604        }
605        closeSession();
606    }
607
608    @Override
609    public String getOriginatingUsername() {
610        return originatingUsername;
611    }
612
613    @Override
614    public long getSchedulingTime() {
615        return schedulingTime;
616    }
617
618    @Override
619    public long getStartTime() {
620        return startTime;
621    }
622
623    @Override
624    public long getCompletionTime() {
625        return completionTime;
626    }
627
628    @Override
629    public void setStartTime() {
630        startTime = System.currentTimeMillis();
631    }
632
633    protected void setCompletionTime() {
634        completionTime = System.currentTimeMillis();
635    }
636
637    @Override
638    public String getCategory() {
639        return getClass().getSimpleName();
640    }
641
642    @Override
643    public String toString() {
644        StringBuilder sb = new StringBuilder();
645        sb.append(getClass().getSimpleName());
646        sb.append('(');
647        if (docId != null) {
648            sb.append(docId);
649            sb.append(", ");
650        } else if (docIds != null && docIds.size() > 0) {
651            sb.append(docIds.get(0));
652            sb.append("..., ");
653        }
654        sb.append(getSchedulePath().getParentPath());
655        sb.append(", ");
656        sb.append(getProgress());
657        sb.append(", ");
658        sb.append(getStatus());
659        sb.append(')');
660        return sb.toString();
661    }
662
663    @Override
664    public DocumentLocation getDocument() {
665        if (docId != null) {
666            return newDocumentLocation(docId);
667        }
668        return null;
669    }
670
671    @Override
672    public List<DocumentLocation> getDocuments() {
673        if (docIds != null) {
674            List<DocumentLocation> res = new ArrayList<>(docIds.size());
675            for (String docId : docIds) {
676                res.add(newDocumentLocation(docId));
677            }
678            return res;
679        }
680        if (docId != null) {
681            return Collections.singletonList(newDocumentLocation(docId));
682        }
683        return Collections.emptyList();
684    }
685
686    protected DocumentLocation newDocumentLocation(String docId) {
687        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
688    }
689
690    @Override
691    public boolean isDocumentTree() {
692        return isTree;
693    }
694
695    /**
696     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
697     * running a long process.
698     */
699    public void commitOrRollbackTransaction() {
700        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
701            TransactionHelper.commitOrRollbackTransaction();
702        }
703    }
704
705    /**
706     * Starts a new transaction.
707     * <p>
708     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
709     * process.
710     *
711     * @return true if a new transaction was started
712     */
713    public boolean startTransaction() {
714        return TransactionHelper.startTransaction();
715    }
716
717    @Override
718    public boolean equals(Object other) {
719        if (!(other instanceof Work)) {
720            return false;
721        }
722        return ((Work) other).getId().equals(getId());
723    }
724
725    @Override
726    public int hashCode() {
727        return getId().hashCode();
728    }
729
730    @Override
731    public String getPartitionKey() {
732        if (docId != null) {
733            return docId;
734        } else if (docIds != null && !docIds.isEmpty()) {
735            return docIds.get(0);
736        }
737        return getId();
738    }
739}