001/*
002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.api.event.CoreEventConstants.REPOSITORY_NAME;
022import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
023
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031
032import javax.security.auth.login.LoginContext;
033import javax.security.auth.login.LoginException;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.common.logging.SequenceTracer;
038import org.nuxeo.common.utils.ExceptionUtils;
039import org.nuxeo.ecm.core.api.CloseableCoreSession;
040import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
041import org.nuxeo.ecm.core.api.CoreInstance;
042import org.nuxeo.ecm.core.api.CoreSession;
043import org.nuxeo.ecm.core.api.DocumentLocation;
044import org.nuxeo.ecm.core.api.IdRef;
045import org.nuxeo.ecm.core.api.NuxeoException;
046import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
047import org.nuxeo.ecm.core.event.Event;
048import org.nuxeo.ecm.core.event.EventContext;
049import org.nuxeo.ecm.core.event.EventService;
050import org.nuxeo.ecm.core.event.impl.EventContextImpl;
051import org.nuxeo.ecm.core.event.impl.EventImpl;
052import org.nuxeo.ecm.core.work.api.Work;
053import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.transaction.TransactionHelper;
056
057/**
058 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
059 * <p>
060 * It also deals with transaction management, and prevents running work instances that are suspending.
061 * <p>
062 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
063 * available.
064 * <p>
065 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
066 * state and call {@link #suspended()}.
067 * <p>
068 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
069 *
070 * @since 5.6
071 */
072public abstract class AbstractWork implements Work {
073
074    private static final long serialVersionUID = 1L;
075
076    private static final Log log = LogFactory.getLog(AbstractWork.class);
077
078    protected static final Random RANDOM = new Random();
079
080    public static final String WORK_FAILED_EVENT = "workFailed";
081
082    public static final String WORK_INSTANCE = "workInstance";
083
084    public static final String FAILURE_MSG = "failureMsg";
085
086    public static final String FAILURE_EXCEPTION = "failureException";
087
088    protected String id;
089
090    /** Suspend requested by the work manager. */
091    protected transient volatile boolean suspending;
092
093    /** Suspend acknowledged by the work instance. */
094    protected transient volatile boolean suspended;
095
096    protected State state;
097
098    protected Progress progress;
099
100    /** Repository name for the Work instance, if relevant. */
101    protected String repositoryName;
102
103    /**
104     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
105     * instance will act.
106     * <p>
107     * Either docId or docIds is set. Not both.
108     */
109    protected String docId;
110
111    /**
112     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
113     * instance will act.
114     * <p>
115     * Either docId or docIds is set. Not both.
116     */
117    protected List<String> docIds;
118
119    /**
120     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
121     */
122    protected boolean isTree;
123
124    /**
125     * The originating username to use when opening the {@link CoreSession}.
126     *
127     * @since 8.1
128     */
129    protected String originatingUsername;
130
131    protected String status;
132
133    protected long schedulingTime;
134
135    protected long startTime;
136
137    protected long completionTime;
138
139    protected transient CoreSession session;
140
141    protected transient LoginContext loginContext;
142
143    protected WorkSchedulePath schedulePath;
144
145    protected String callerThread;
146
147    /**
148     * Constructs a {@link Work} instance with a unique id.
149     */
150    public AbstractWork() {
151        // we user RANDOM to deal with these cases:
152        // - several calls in the time granularity of nanoTime()
153        // - several concurrent calls on different servers
154        this(System.nanoTime() + "." + (RANDOM.nextInt() & 0x7fffffff));
155        callerThread = SequenceTracer.getThreadName();
156    }
157
158    public AbstractWork(String id) {
159        this.id = id;
160        progress = PROGRESS_INDETERMINATE;
161        schedulingTime = System.currentTimeMillis();
162        callerThread = SequenceTracer.getThreadName();
163    }
164
165    @Override
166    public String getId() {
167        return id;
168    }
169
170    @Override
171    public WorkSchedulePath getSchedulePath() {
172        // schedulePath is transient so will become null after deserialization
173        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
174    }
175
176    @Override
177    public void setSchedulePath(WorkSchedulePath path) {
178        schedulePath = path;
179    }
180
181    public void setDocument(String repositoryName, String docId, boolean isTree) {
182        this.repositoryName = repositoryName;
183        this.docId = docId;
184        docIds = null;
185        this.isTree = isTree;
186    }
187
188    public void setDocument(String repositoryName, String docId) {
189        setDocument(repositoryName, docId, false);
190    }
191
192    public void setDocuments(String repositoryName, List<String> docIds) {
193        this.repositoryName = repositoryName;
194        docId = null;
195        this.docIds = new ArrayList<>(docIds);
196    }
197
198    /**
199     * @since 8.1
200     */
201    public void setOriginatingUsername(String originatingUsername) {
202        this.originatingUsername = originatingUsername;
203    }
204
205    @Override
206    public void setWorkInstanceSuspending() {
207        suspending = true;
208    }
209
210    @Override
211    public boolean isSuspending() {
212        return suspending;
213    }
214
215    @Override
216    public void suspended() {
217        suspended = true;
218    }
219
220    @Override
221    public boolean isWorkInstanceSuspended() {
222        return suspended;
223    }
224
225    @Override
226    public void setWorkInstanceState(State state) {
227        this.state = state;
228        if (log.isTraceEnabled()) {
229            log.trace(this + " state=" + state);
230        }
231    }
232
233    @Override
234    public State getWorkInstanceState() {
235        return state;
236    }
237
238    @Override
239    public void setProgress(Progress progress) {
240        this.progress = progress;
241        if (log.isTraceEnabled()) {
242            log.trace(String.valueOf(this));
243        }
244    }
245
246    @Override
247    public Progress getProgress() {
248        return progress;
249    }
250
251    /**
252     * Sets a human-readable status for this work instance.
253     *
254     * @param status the status
255     */
256    public void setStatus(String status) {
257        this.status = status;
258    }
259
260    @Override
261    public String getStatus() {
262        return status;
263    }
264
265    /**
266     * May be called by implementing classes to open a session on the repository.
267     *
268     * @return the session (also available in {@code session} field)
269     * @deprecated since 8.1. Use {@link #openSystemSession()}.
270     */
271    @Deprecated
272    public CoreSession initSession() {
273        return initSession(repositoryName);
274    }
275
276    /**
277     * May be called by implementing classes to open a System session on the repository.
278     *
279     * @since 8.1
280     */
281    public void openSystemSession() {
282        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
283    }
284
285    /**
286     * May be called by implementing classes to open a Use session on the repository.
287     * <p>
288     * It uses the set {@link #originatingUsername} to open the session.
289     *
290     * @since 8.1
291     */
292    public void openUserSession() {
293        if (originatingUsername == null) {
294            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
295        }
296
297        try {
298            loginContext = Framework.loginAsUser(originatingUsername);
299        } catch (LoginException e) {
300            throw new NuxeoException(e);
301        }
302
303        session = CoreInstance.openCoreSession(repositoryName);
304    }
305
306    /**
307     * May be called by implementing classes to open a session on the given repository.
308     *
309     * @param repositoryName the repository name
310     * @return the session (also available in {@code session} field)
311     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
312     *             otherwise use {@link CoreInstance#openCoreSessionSystem(String)}.
313     */
314    @Deprecated
315    public CoreSession initSession(String repositoryName) {
316        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
317        return session;
318    }
319
320    /**
321     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
322     *
323     * @since 5.8
324     */
325    public void closeSession() {
326        if (session != null) {
327            ((CloseableCoreSession) session).close();
328            session = null;
329        }
330    }
331
332    @Override
333    public void run() {
334        if (isSuspending()) {
335            // don't run anything if we're being started while a suspend
336            // has been requested
337            suspended();
338            return;
339        }
340        if (SequenceTracer.isEnabled()) {
341            SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9");
342        }
343        RuntimeException suppressed = null;
344        int retryCount = getRetryCount(); // may be 0
345        for (int i = 0; i <= retryCount; i++) {
346            if (i > 0) {
347                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
348                log.trace("Concurrent update", suppressed);
349            }
350            if (ExceptionUtils.hasInterruptedCause(suppressed)) {
351                // if we're here suppressed != null so we destroy SequenceTracer
352                log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down");
353                break;
354            }
355            try {
356                runWorkWithTransaction();
357                SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms");
358                return;
359            } catch (RuntimeException e) {
360                if (suppressed == null) {
361                    suppressed = e;
362                } else {
363                    suppressed.addSuppressed(e);
364                }
365            }
366        }
367
368        workFailed(suppressed);
369    }
370
371    /**
372     * Builds failure event properties. Work implementations can override this method to inject
373     * more event properties than the default.
374     * @since 10.1
375     */
376    public Map<String, Serializable> buildWorkFailureEventProps(RuntimeException exception) {
377
378        Map<String, Serializable> eventProps = new HashMap<>();
379        eventProps.put(WORK_INSTANCE, this);  // Work objects are serializable so send the whole thing
380
381        if (session != null) {
382            eventProps.put(REPOSITORY_NAME, session.getRepositoryName());
383        }
384
385        if (exception != null) {
386            eventProps.put(FAILURE_MSG, exception.getMessage());
387            eventProps.put(FAILURE_EXCEPTION, exception.getClass().getName());
388        }
389        return eventProps;
390    }
391
392    /**
393     * Called when the worker failed to run successfully even after retrying.
394     * @since 10.1
395     * @param exception the exception that occurred
396     */
397    public void workFailed(RuntimeException exception) {
398
399        EventService service = Framework.getService(EventService.class);
400        EventContext eventContext = new EventContextImpl(null, session != null ? session.getPrincipal() : null);
401        eventContext.setProperties(buildWorkFailureEventProps(exception));
402        Event event = new EventImpl(WORK_FAILED_EVENT, eventContext);
403        event.setIsCommitEvent(true);
404        service.fireEvent(event);
405
406        if (exception != null) {
407            String msg = "Work failed after " + getRetryCount() + " " + (getRetryCount() == 1 ? "retry" : "retries") + ", class="
408                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
409            SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms");
410            // all retries have been done, throw the exception
411            throw new NuxeoException(msg, exception);
412        }
413    }
414
415    private String getTitleOr(String defaultTitle) {
416        try {
417            return getTitle();
418        } catch (Exception e) {
419            return defaultTitle;
420        }
421    }
422
423    /**
424     * Does work under a transaction.
425     *
426     * @since 5.9.4
427     */
428    protected void runWorkWithTransaction() {
429        TransactionHelper.startTransaction();
430        boolean ok = false;
431        Exception exc = null;
432        try {
433            WorkSchedulePath.handleEnter(this);
434            // --- do work
435            setStartTime();
436            work(); // may throw ConcurrentUpdateException
437            ok = true;
438            // --- end work
439        } catch (Exception e) {
440            exc = e;
441            if (e instanceof RuntimeException) {
442                throw (RuntimeException) e;
443            } else if (e instanceof InterruptedException) {
444                // restore interrupted status for the thread pool worker
445                Thread.currentThread().interrupt();
446            }
447            throw new RuntimeException(e);
448        } finally {
449            WorkSchedulePath.handleReturn();
450            try {
451                setCompletionTime();
452                cleanUp(ok, exc);
453            } finally {
454                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
455                    if (!ok || isSuspending()) {
456                        log.trace(this + " is suspending, rollbacking");
457                        TransactionHelper.setTransactionRollbackOnly();
458                    }
459                    TransactionHelper.commitOrRollbackTransaction();
460                }
461            }
462        }
463    }
464
465    @Override
466    public abstract void work();
467
468    /**
469     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
470     *
471     * @return 0 for no retry, or more if some retries are possible
472     * @see #work
473     * @since 5.8
474     */
475    public int getRetryCount() {
476        return 0;
477    }
478
479    /**
480     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
481     * error or was interrupted.
482     *
483     * @param ok {@code true} if the work completed normally
484     * @param e the exception, if available
485     */
486    @Override
487    public void cleanUp(boolean ok, Exception e) {
488        if (!ok) {
489            if (ExceptionUtils.hasInterruptedCause(e)) {
490                log.debug("Interrupted work: " + this);
491            } else {
492                if (!(e instanceof ConcurrentUpdateException)) {
493                    if (!isSuspending()) {
494                        log.error("Exception during work: " + this, e);
495                        if (WorkSchedulePath.isCaptureStackEnabled()) {
496                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
497                        }
498                    }
499                }
500            }
501        }
502        closeSession();
503
504        try {
505            // loginContext may be null in tests
506            if (loginContext != null) {
507                loginContext.logout();
508            }
509        } catch (LoginException le) {
510            throw new NuxeoException(le);
511        }
512    }
513
514    @Override
515    public String getOriginatingUsername() {
516        return originatingUsername;
517    }
518
519    @Override
520    public long getSchedulingTime() {
521        return schedulingTime;
522    }
523
524    @Override
525    public long getStartTime() {
526        return startTime;
527    }
528
529    @Override
530    public long getCompletionTime() {
531        return completionTime;
532    }
533
534    @Override
535    public void setStartTime() {
536        startTime = System.currentTimeMillis();
537    }
538
539    protected void setCompletionTime() {
540        completionTime = System.currentTimeMillis();
541    }
542
543    @Override
544    public String getCategory() {
545        return getClass().getSimpleName();
546    }
547
548    @Override
549    public String toString() {
550        StringBuilder buf = new StringBuilder();
551        buf.append(getClass().getSimpleName());
552        buf.append('(');
553        if (docId != null) {
554            buf.append(docId);
555            buf.append(", ");
556        } else if (docIds != null && docIds.size() > 0) {
557            buf.append(docIds.get(0));
558            buf.append("..., ");
559        }
560        buf.append(getSchedulePath().getParentPath());
561        buf.append(", ");
562        buf.append(getProgress());
563        buf.append(", ");
564        buf.append(getStatus());
565        buf.append(')');
566        return buf.toString();
567    }
568
569    @Override
570    public DocumentLocation getDocument() {
571        if (docId != null) {
572            return newDocumentLocation(docId);
573        }
574        return null;
575    }
576
577    @Override
578    public List<DocumentLocation> getDocuments() {
579        if (docIds != null) {
580            List<DocumentLocation> res = new ArrayList<>(docIds.size());
581            for (String docId : docIds) {
582                res.add(newDocumentLocation(docId));
583            }
584            return res;
585        }
586        if (docId != null) {
587            return Collections.singletonList(newDocumentLocation(docId));
588        }
589        return Collections.emptyList();
590    }
591
592    protected DocumentLocation newDocumentLocation(String docId) {
593        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
594    }
595
596    @Override
597    public boolean isDocumentTree() {
598        return isTree;
599    }
600
601    /**
602     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
603     * running a long process.
604     */
605    public void commitOrRollbackTransaction() {
606        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
607            TransactionHelper.commitOrRollbackTransaction();
608        }
609    }
610
611    /**
612     * Starts a new transaction.
613     * <p>
614     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
615     * process.
616     *
617     * @return true if a new transaction was started
618     */
619    public boolean startTransaction() {
620        return TransactionHelper.startTransaction();
621    }
622
623    @Override
624    public boolean equals(Object other) {
625        if (!(other instanceof Work)) {
626            return false;
627        }
628        return ((Work) other).getId().equals(getId());
629    }
630
631    @Override
632    public int hashCode() {
633        return getId().hashCode();
634    }
635
636    @Override
637    public String getPartitionKey() {
638        if (docId != null) {
639            return docId;
640        } else if (docIds != null && !docIds.isEmpty()) {
641            return docIds.get(0);
642        }
643        return getId();
644    }
645}