001/*
002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.api.event.CoreEventConstants.REPOSITORY_NAME;
022import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
023
024import java.io.Serializable;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032
033import javax.security.auth.login.LoginContext;
034import javax.security.auth.login.LoginException;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.common.logging.SequenceTracer;
039import org.nuxeo.common.utils.ExceptionUtils;
040import org.nuxeo.ecm.core.api.CloseableCoreSession;
041import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
042import org.nuxeo.ecm.core.api.CoreInstance;
043import org.nuxeo.ecm.core.api.CoreSession;
044import org.nuxeo.ecm.core.api.DocumentLocation;
045import org.nuxeo.ecm.core.api.IdRef;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
048import org.nuxeo.ecm.core.event.Event;
049import org.nuxeo.ecm.core.event.EventContext;
050import org.nuxeo.ecm.core.event.EventService;
051import org.nuxeo.ecm.core.event.impl.EventContextImpl;
052import org.nuxeo.ecm.core.event.impl.EventImpl;
053import org.nuxeo.ecm.core.work.api.Work;
054import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
055import org.nuxeo.runtime.api.Framework;
056import org.nuxeo.runtime.transaction.TransactionHelper;
057
058/**
059 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
060 * <p>
061 * It also deals with transaction management, and prevents running work instances that are suspending.
062 * <p>
063 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
064 * available.
065 * <p>
066 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
067 * state and call {@link #suspended()}.
068 * <p>
069 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
070 *
071 * @since 5.6
072 */
073public abstract class AbstractWork implements Work {
074
075    private static final long serialVersionUID = 1L;
076
077    private static final Log log = LogFactory.getLog(AbstractWork.class);
078
079    protected static final Random RANDOM = new SecureRandom();
080
081    public static final String WORK_FAILED_EVENT = "workFailed";
082
083    public static final String WORK_INSTANCE = "workInstance";
084
085    public static final String FAILURE_MSG = "failureMsg";
086
087    public static final String FAILURE_EXCEPTION = "failureException";
088
089    protected String id;
090
091    /** Suspend requested by the work manager. */
092    protected transient volatile boolean suspending;
093
094    /** Suspend acknowledged by the work instance. */
095    protected transient volatile boolean suspended;
096
097    protected State state;
098
099    protected Progress progress;
100
101    /** Repository name for the Work instance, if relevant. */
102    protected String repositoryName;
103
104    /**
105     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
106     * instance will act.
107     * <p>
108     * Either docId or docIds is set. Not both.
109     */
110    protected String docId;
111
112    /**
113     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
114     * instance will act.
115     * <p>
116     * Either docId or docIds is set. Not both.
117     */
118    protected List<String> docIds;
119
120    /**
121     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
122     */
123    protected boolean isTree;
124
125    /**
126     * The originating username to use when opening the {@link CoreSession}.
127     *
128     * @since 8.1
129     */
130    protected String originatingUsername;
131
132    protected String status;
133
134    protected long schedulingTime;
135
136    protected long startTime;
137
138    protected long completionTime;
139
140    protected transient CoreSession session;
141
142    protected transient LoginContext loginContext;
143
144    protected WorkSchedulePath schedulePath;
145
146    protected String callerThread;
147
148    /**
149     * Constructs a {@link Work} instance with a unique id.
150     */
151    public AbstractWork() {
152        // we user RANDOM to deal with these cases:
153        // - several calls in the time granularity of nanoTime()
154        // - several concurrent calls on different servers
155        this(System.nanoTime() + "." + (RANDOM.nextInt() & 0x7fffffff));
156        callerThread = SequenceTracer.getThreadName();
157    }
158
159    public AbstractWork(String id) {
160        this.id = id;
161        progress = PROGRESS_INDETERMINATE;
162        schedulingTime = System.currentTimeMillis();
163        callerThread = SequenceTracer.getThreadName();
164    }
165
166    @Override
167    public String getId() {
168        return id;
169    }
170
171    @Override
172    public WorkSchedulePath getSchedulePath() {
173        // schedulePath is transient so will become null after deserialization
174        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
175    }
176
177    @Override
178    public void setSchedulePath(WorkSchedulePath path) {
179        schedulePath = path;
180    }
181
182    public void setDocument(String repositoryName, String docId, boolean isTree) {
183        this.repositoryName = repositoryName;
184        this.docId = docId;
185        docIds = null;
186        this.isTree = isTree;
187    }
188
189    public void setDocument(String repositoryName, String docId) {
190        setDocument(repositoryName, docId, false);
191    }
192
193    public void setDocuments(String repositoryName, List<String> docIds) {
194        this.repositoryName = repositoryName;
195        docId = null;
196        this.docIds = new ArrayList<>(docIds);
197    }
198
199    /**
200     * @since 8.1
201     */
202    public void setOriginatingUsername(String originatingUsername) {
203        this.originatingUsername = originatingUsername;
204    }
205
206    @Override
207    public void setWorkInstanceSuspending() {
208        suspending = true;
209    }
210
211    @Override
212    public boolean isSuspending() {
213        return suspending;
214    }
215
216    @Override
217    public void suspended() {
218        suspended = true;
219    }
220
221    @Override
222    public boolean isWorkInstanceSuspended() {
223        return suspended;
224    }
225
226    @Override
227    public void setWorkInstanceState(State state) {
228        this.state = state;
229        if (log.isTraceEnabled()) {
230            log.trace(this + " state=" + state);
231        }
232    }
233
234    @Override
235    public State getWorkInstanceState() {
236        return state;
237    }
238
239    @Override
240    public void setProgress(Progress progress) {
241        this.progress = progress;
242        if (log.isTraceEnabled()) {
243            log.trace(String.valueOf(this));
244        }
245    }
246
247    @Override
248    public Progress getProgress() {
249        return progress;
250    }
251
252    /**
253     * Sets a human-readable status for this work instance.
254     *
255     * @param status the status
256     */
257    public void setStatus(String status) {
258        this.status = status;
259    }
260
261    @Override
262    public String getStatus() {
263        return status;
264    }
265
266    /**
267     * May be called by implementing classes to open a session on the repository.
268     *
269     * @return the session (also available in {@code session} field)
270     * @deprecated since 8.1. Use {@link #openSystemSession()}.
271     */
272    @Deprecated
273    public CoreSession initSession() {
274        return initSession(repositoryName);
275    }
276
277    /**
278     * May be called by implementing classes to open a System session on the repository.
279     *
280     * @since 8.1
281     */
282    public void openSystemSession() {
283        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
284    }
285
286    /**
287     * May be called by implementing classes to open a Use session on the repository.
288     * <p>
289     * It uses the set {@link #originatingUsername} to open the session.
290     *
291     * @since 8.1
292     */
293    public void openUserSession() {
294        if (originatingUsername == null) {
295            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
296        }
297
298        try {
299            loginContext = Framework.loginAsUser(originatingUsername);
300        } catch (LoginException e) {
301            throw new NuxeoException(e);
302        }
303
304        session = CoreInstance.openCoreSession(repositoryName);
305    }
306
307    /**
308     * May be called by implementing classes to open a session on the given repository.
309     *
310     * @param repositoryName the repository name
311     * @return the session (also available in {@code session} field)
312     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
313     *             otherwise use {@link CoreInstance#openCoreSessionSystem(String)}.
314     */
315    @Deprecated
316    public CoreSession initSession(String repositoryName) {
317        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
318        return session;
319    }
320
321    /**
322     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
323     *
324     * @since 5.8
325     */
326    public void closeSession() {
327        if (session != null) {
328            ((CloseableCoreSession) session).close();
329            session = null;
330        }
331    }
332
333    @Override
334    public void run() {
335        if (isSuspending()) {
336            // don't run anything if we're being started while a suspend
337            // has been requested
338            suspended();
339            return;
340        }
341        if (SequenceTracer.isEnabled()) {
342            SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9");
343        }
344        RuntimeException suppressed = null;
345        int retryCount = getRetryCount(); // may be 0
346        for (int i = 0; i <= retryCount; i++) {
347            if (i > 0) {
348                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
349                log.trace("Concurrent update", suppressed);
350            }
351            if (ExceptionUtils.hasInterruptedCause(suppressed)) {
352                // if we're here suppressed != null so we destroy SequenceTracer
353                log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down");
354                break;
355            }
356            try {
357                runWorkWithTransaction();
358                SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms");
359                return;
360            } catch (RuntimeException e) {
361                if (suppressed == null) {
362                    suppressed = e;
363                } else {
364                    suppressed.addSuppressed(e);
365                }
366            }
367        }
368
369        workFailed(suppressed);
370    }
371
372    /**
373     * Builds failure event properties. Work implementations can override this method to inject
374     * more event properties than the default.
375     * @since 10.1
376     */
377    public Map<String, Serializable> buildWorkFailureEventProps(RuntimeException exception) {
378
379        Map<String, Serializable> eventProps = new HashMap<>();
380        eventProps.put(WORK_INSTANCE, this);  // Work objects are serializable so send the whole thing
381
382        if (session != null) {
383            eventProps.put(REPOSITORY_NAME, session.getRepositoryName());
384        }
385
386        if (exception != null) {
387            eventProps.put(FAILURE_MSG, exception.getMessage());
388            eventProps.put(FAILURE_EXCEPTION, exception.getClass().getName());
389        }
390        return eventProps;
391    }
392
393    /**
394     * Called when the worker failed to run successfully even after retrying.
395     * @since 10.1
396     * @param exception the exception that occurred
397     */
398    public void workFailed(RuntimeException exception) {
399
400        EventService service = Framework.getService(EventService.class);
401        EventContext eventContext = new EventContextImpl(null, session != null ? session.getPrincipal() : null);
402        eventContext.setProperties(buildWorkFailureEventProps(exception));
403        Event event = new EventImpl(WORK_FAILED_EVENT, eventContext);
404        event.setIsCommitEvent(true);
405        service.fireEvent(event);
406
407        if (exception != null) {
408            String msg = "Work failed after " + getRetryCount() + " " + (getRetryCount() == 1 ? "retry" : "retries") + ", class="
409                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
410            SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms");
411            // all retries have been done, throw the exception
412            throw new NuxeoException(msg, exception);
413        }
414    }
415
416    private String getTitleOr(String defaultTitle) {
417        try {
418            return getTitle();
419        } catch (Exception e) {
420            return defaultTitle;
421        }
422    }
423
424    /**
425     * Does work under a transaction.
426     *
427     * @since 5.9.4
428     */
429    protected void runWorkWithTransaction() {
430        TransactionHelper.startTransaction();
431        boolean ok = false;
432        Exception exc = null;
433        try {
434            WorkSchedulePath.handleEnter(this);
435            // --- do work
436            setStartTime();
437            work(); // may throw ConcurrentUpdateException
438            ok = true;
439            // --- end work
440        } catch (Exception e) {
441            exc = e;
442            if (e instanceof RuntimeException) {
443                throw (RuntimeException) e;
444            } else if (e instanceof InterruptedException) {
445                // restore interrupted status for the thread pool worker
446                Thread.currentThread().interrupt();
447            }
448            throw new RuntimeException(e);
449        } finally {
450            WorkSchedulePath.handleReturn();
451            try {
452                setCompletionTime();
453                cleanUp(ok, exc);
454            } finally {
455                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
456                    if (!ok || isSuspending()) {
457                        log.trace(this + " is suspending, rollbacking");
458                        TransactionHelper.setTransactionRollbackOnly();
459                    }
460                    TransactionHelper.commitOrRollbackTransaction();
461                }
462            }
463        }
464    }
465
466    @Override
467    public abstract void work();
468
469    /**
470     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
471     *
472     * @return 0 for no retry, or more if some retries are possible
473     * @see #work
474     * @since 5.8
475     */
476    public int getRetryCount() {
477        return 0;
478    }
479
480    /**
481     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
482     * error or was interrupted.
483     *
484     * @param ok {@code true} if the work completed normally
485     * @param e the exception, if available
486     */
487    @Override
488    public void cleanUp(boolean ok, Exception e) {
489        if (!ok) {
490            if (ExceptionUtils.hasInterruptedCause(e)) {
491                log.debug("Interrupted work: " + this);
492            } else {
493                if (!(e instanceof ConcurrentUpdateException)) {
494                    if (!isSuspending()) {
495                        log.error("Exception during work: " + this, e);
496                        if (WorkSchedulePath.isCaptureStackEnabled()) {
497                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
498                        }
499                    }
500                }
501            }
502        }
503        closeSession();
504
505        try {
506            // loginContext may be null in tests
507            if (loginContext != null) {
508                loginContext.logout();
509            }
510        } catch (LoginException le) {
511            throw new NuxeoException(le);
512        }
513    }
514
515    @Override
516    public String getOriginatingUsername() {
517        return originatingUsername;
518    }
519
520    @Override
521    public long getSchedulingTime() {
522        return schedulingTime;
523    }
524
525    @Override
526    public long getStartTime() {
527        return startTime;
528    }
529
530    @Override
531    public long getCompletionTime() {
532        return completionTime;
533    }
534
535    @Override
536    public void setStartTime() {
537        startTime = System.currentTimeMillis();
538    }
539
540    protected void setCompletionTime() {
541        completionTime = System.currentTimeMillis();
542    }
543
544    @Override
545    public String getCategory() {
546        return getClass().getSimpleName();
547    }
548
549    @Override
550    public String toString() {
551        StringBuilder buf = new StringBuilder();
552        buf.append(getClass().getSimpleName());
553        buf.append('(');
554        if (docId != null) {
555            buf.append(docId);
556            buf.append(", ");
557        } else if (docIds != null && docIds.size() > 0) {
558            buf.append(docIds.get(0));
559            buf.append("..., ");
560        }
561        buf.append(getSchedulePath().getParentPath());
562        buf.append(", ");
563        buf.append(getProgress());
564        buf.append(", ");
565        buf.append(getStatus());
566        buf.append(')');
567        return buf.toString();
568    }
569
570    @Override
571    public DocumentLocation getDocument() {
572        if (docId != null) {
573            return newDocumentLocation(docId);
574        }
575        return null;
576    }
577
578    @Override
579    public List<DocumentLocation> getDocuments() {
580        if (docIds != null) {
581            List<DocumentLocation> res = new ArrayList<>(docIds.size());
582            for (String docId : docIds) {
583                res.add(newDocumentLocation(docId));
584            }
585            return res;
586        }
587        if (docId != null) {
588            return Collections.singletonList(newDocumentLocation(docId));
589        }
590        return Collections.emptyList();
591    }
592
593    protected DocumentLocation newDocumentLocation(String docId) {
594        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
595    }
596
597    @Override
598    public boolean isDocumentTree() {
599        return isTree;
600    }
601
602    /**
603     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
604     * running a long process.
605     */
606    public void commitOrRollbackTransaction() {
607        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
608            TransactionHelper.commitOrRollbackTransaction();
609        }
610    }
611
612    /**
613     * Starts a new transaction.
614     * <p>
615     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
616     * process.
617     *
618     * @return true if a new transaction was started
619     */
620    public boolean startTransaction() {
621        return TransactionHelper.startTransaction();
622    }
623
624    @Override
625    public boolean equals(Object other) {
626        if (!(other instanceof Work)) {
627            return false;
628        }
629        return ((Work) other).getId().equals(getId());
630    }
631
632    @Override
633    public int hashCode() {
634        return getId().hashCode();
635    }
636
637    @Override
638    public String getPartitionKey() {
639        if (docId != null) {
640            return docId;
641        } else if (docIds != null && !docIds.isEmpty()) {
642            return docIds.get(0);
643        }
644        return getId();
645    }
646}