001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import java.util.Random;
027
028import javax.security.auth.login.LoginContext;
029import javax.security.auth.login.LoginException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.common.logging.SequenceTracer;
034import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentLocation;
038import org.nuxeo.ecm.core.api.IdRef;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
041import org.nuxeo.ecm.core.work.api.Work;
042import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
043import org.nuxeo.runtime.api.Framework;
044import org.nuxeo.runtime.transaction.TransactionHelper;
045import org.nuxeo.runtime.transaction.TransactionRuntimeException;
046
047/**
048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
049 * <p>
050 * It also deals with transaction management, and prevents running work instances that are suspending.
051 * <p>
052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
053 * available.
054 * <p>
055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
056 * state and call {@link #suspended()}.
057 * <p>
058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
059 *
060 * @since 5.6
061 */
062public abstract class AbstractWork implements Work {
063
064    private static final long serialVersionUID = 1L;
065
066    private static final Log log = LogFactory.getLog(AbstractWork.class);
067
068    protected static final Random RANDOM = new Random();
069
070    protected String id;
071
072    /** Suspend requested by the work manager. */
073    protected transient volatile boolean suspending;
074
075    /** Suspend acknowledged by the work instance. */
076    protected transient volatile boolean suspended;
077
078    protected State state;
079
080    protected Progress progress;
081
082    /** Repository name for the Work instance, if relevant. */
083    protected String repositoryName;
084
085    /**
086     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
087     * instance will act.
088     * <p>
089     * Either docId or docIds is set. Not both.
090     */
091    protected String docId;
092
093    /**
094     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
095     * instance will act.
096     * <p>
097     * Either docId or docIds is set. Not both.
098     */
099    protected List<String> docIds;
100
101    /**
102     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
103     */
104    protected boolean isTree;
105
106    /**
107     * The originating username to use when opening the {@link CoreSession}.
108     *
109     * @since 8.1
110     */
111    protected String originatingUsername;
112
113    protected String status;
114
115    protected long schedulingTime;
116
117    protected long startTime;
118
119    protected long completionTime;
120
121    protected transient CoreSession session;
122
123    protected transient LoginContext loginContext;
124
125    protected WorkSchedulePath schedulePath;
126
127    protected String callerThread;
128
129    /**
130     * Constructs a {@link Work} instance with a unique id.
131     */
132    public AbstractWork() {
133        // we user RANDOM to deal with these cases:
134        // - several calls in the time granularity of nanoTime()
135        // - several concurrent calls on different servers
136        this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt()));
137        callerThread = SequenceTracer.getThreadName();
138    }
139
140    public AbstractWork(String id) {
141        this.id = id;
142        progress = PROGRESS_INDETERMINATE;
143        schedulingTime = System.currentTimeMillis();
144        callerThread = SequenceTracer.getThreadName();
145    }
146
147    @Override
148    public String getId() {
149        return id;
150    }
151
152    @Override
153    public WorkSchedulePath getSchedulePath() {
154        // schedulePath is transient so will become null after deserialization
155        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
156    }
157
158    @Override
159    public void setSchedulePath(WorkSchedulePath path) {
160        schedulePath = path;
161    }
162
163    public void setDocument(String repositoryName, String docId, boolean isTree) {
164        this.repositoryName = repositoryName;
165        this.docId = docId;
166        docIds = null;
167        this.isTree = isTree;
168    }
169
170    public void setDocument(String repositoryName, String docId) {
171        setDocument(repositoryName, docId, false);
172    }
173
174    public void setDocuments(String repositoryName, List<String> docIds) {
175        this.repositoryName = repositoryName;
176        docId = null;
177        this.docIds = new ArrayList<String>(docIds);
178    }
179
180    /**
181     * @since 8.1
182     */
183    public void setOriginatingUsername(String originatingUsername) {
184        this.originatingUsername = originatingUsername;
185    }
186
187    @Override
188    public void setWorkInstanceSuspending() {
189        suspending = true;
190    }
191
192    @Override
193    public boolean isSuspending() {
194        return suspending;
195    }
196
197    @Override
198    public void suspended() {
199        suspended = true;
200    }
201
202    @Override
203    public boolean isWorkInstanceSuspended() {
204        return suspended;
205    }
206
207    @Override
208    public void setWorkInstanceState(State state) {
209        this.state = state;
210        if (log.isTraceEnabled()) {
211            log.trace(this + " state=" + state);
212        }
213    }
214
215    @Override
216    public State getWorkInstanceState() {
217        return state;
218    }
219
220    @Override
221    @Deprecated
222    public State getState() {
223        return state;
224    }
225
226    @Override
227    public void setProgress(Progress progress) {
228        this.progress = progress;
229        if (log.isTraceEnabled()) {
230            log.trace(String.valueOf(this));
231        }
232    }
233
234    @Override
235    public Progress getProgress() {
236        return progress;
237    }
238
239    /**
240     * Sets a human-readable status for this work instance.
241     *
242     * @param status the status
243     */
244    public void setStatus(String status) {
245        this.status = status;
246    }
247
248    @Override
249    public String getStatus() {
250        return status;
251    }
252
253    /**
254     * May be called by implementing classes to open a session on the repository.
255     *
256     * @return the session (also available in {@code session} field)
257     * @deprecated since 8.1. Use {@link #openSystemSession()}.
258     */
259    @Deprecated
260    public CoreSession initSession() {
261        return initSession(repositoryName);
262    }
263
264    /**
265     * May be called by implementing classes to open a System session on the repository.
266     *
267     * @since 8.1
268     */
269    public void openSystemSession() {
270        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
271    }
272
273    /**
274     * May be called by implementing classes to open a Use session on the repository.
275     * <p>
276     * It uses the set {@link #originatingUsername} to open the session.
277     *
278     * @since 8.1
279     */
280    public void openUserSession() {
281        if (originatingUsername == null) {
282            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
283        }
284
285        try {
286            loginContext = Framework.loginAsUser(originatingUsername);
287        } catch (LoginException e) {
288            throw new NuxeoException(e);
289        }
290
291        session = CoreInstance.openCoreSession(repositoryName);
292    }
293
294    /**
295     * May be called by implementing classes to open a session on the given repository.
296     *
297     * @param repositoryName the repository name
298     * @return the session (also available in {@code session} field)
299     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
300     *             otherwise use {@link CoreInstance#openCoreSessionSystem(String)}.
301     */
302    @Deprecated
303    public CoreSession initSession(String repositoryName) {
304        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
305        return session;
306    }
307
308    /**
309     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
310     *
311     * @since 5.8
312     */
313    public void closeSession() {
314        if (session != null) {
315            session.close();
316            session = null;
317        }
318    }
319
320    @Override
321    public void run() {
322        if (isSuspending()) {
323            // don't run anything if we're being started while a suspend
324            // has been requested
325            suspended();
326            return;
327        }
328        if (SequenceTracer.isEnabled()) {
329            SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9");
330        }
331        Exception suppressed = null;
332        int retryCount = getRetryCount(); // may be 0
333        for (int i = 0; i <= retryCount; i++) {
334            if (i > 0) {
335                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
336                log.trace("Concurrent update", suppressed);
337            }
338            Exception e = runWorkWithTransactionAndCheckExceptions();
339            if (e == null) {
340                // no exception, work is done
341                SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms");
342                return;
343            }
344            if (suppressed == null) {
345                suppressed = e;
346            } else {
347                suppressed.addSuppressed(e);
348            }
349        }
350        // all retries have been done, throw the exception
351        if (suppressed != null) {
352            String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class="
353                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
354            SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms");
355            throw new RuntimeException(msg, suppressed);
356        }
357    }
358
359    private String getTitleOr(String defaultTitle) {
360        try {
361            return getTitle();
362        } catch (Exception e) {
363            return defaultTitle;
364        }
365    }
366
367    /**
368     * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry.
369     *
370     * @since 5.9.4
371     */
372    protected Exception runWorkWithTransactionAndCheckExceptions() {
373        List<Exception> suppressed = Collections.emptyList();
374        try {
375            TransactionHelper.noteSuppressedExceptions();
376            try {
377                runWorkWithTransaction();
378            } finally {
379                suppressed = TransactionHelper.getSuppressedExceptions();
380            }
381        } catch (ConcurrentUpdateException e) {
382            // happens typically during save()
383            return e;
384        } catch (TransactionRuntimeException e) {
385            // error at commit time
386            if (suppressed.isEmpty()) {
387                return e;
388            }
389        }
390        // reached if no catch, or if TransactionRuntimeException caught
391        if (suppressed.isEmpty()) {
392            return null;
393        }
394        // exceptions during commit caused a rollback in SessionImpl#end
395        Exception e = suppressed.get(0);
396        for (int i = 1; i < suppressed.size(); i++) {
397            e.addSuppressed(suppressed.get(i));
398        }
399        return e;
400    }
401
402    /**
403     * Does work under a transaction.
404     *
405     * @since 5.9.4
406     * @throws ConcurrentUpdateException, TransactionRuntimeException
407     */
408    protected void runWorkWithTransaction() throws ConcurrentUpdateException {
409        TransactionHelper.startTransaction();
410        boolean ok = false;
411        Exception exc = null;
412        try {
413            WorkSchedulePath.handleEnter(this);
414            // --- do work
415            setStartTime();
416            work(); // may throw ConcurrentUpdateException
417            ok = true;
418            // --- end work
419        } catch (Exception e) {
420            exc = e;
421            if (e instanceof ConcurrentUpdateException) {
422                throw (ConcurrentUpdateException) e;
423            } else if (e instanceof RuntimeException) {
424                throw (RuntimeException) e;
425            } else if (e instanceof InterruptedException) {
426                // restore interrupted status for the thread pool worker
427                Thread.currentThread().interrupt();
428            }
429            throw new RuntimeException(e);
430        } finally {
431            WorkSchedulePath.handleReturn();
432            try {
433                setCompletionTime();
434                cleanUp(ok, exc);
435            } finally {
436                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
437                    if (!ok || isSuspending()) {
438                        log.trace(this + " is suspending, rollbacking");
439                        TransactionHelper.setTransactionRollbackOnly();
440                    }
441                    TransactionHelper.commitOrRollbackTransaction();
442                }
443            }
444        }
445    }
446
447    @Override
448    public abstract void work();
449
450    /**
451     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
452     *
453     * @return 0 for no retry, or more if some retries are possible
454     * @see #work
455     * @since 5.8
456     */
457    public int getRetryCount() {
458        return 0;
459    }
460
461    /**
462     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
463     * error or was interrupted.
464     *
465     * @param ok {@code true} if the work completed normally
466     * @param e the exception, if available
467     */
468    @Override
469    public void cleanUp(boolean ok, Exception e) {
470        if (!ok) {
471            if (e instanceof InterruptedException) {
472                log.debug("Suspended work: " + this);
473            } else {
474                if (!(e instanceof ConcurrentUpdateException)) {
475                    if (!isSuspending() || !(e instanceof InterruptedException)) {
476                        log.error("Exception during work: " + this, e);
477                        if (WorkSchedulePath.captureStack) {
478                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
479                        }
480                    }
481                }
482            }
483        }
484        closeSession();
485
486        try {
487            // loginContext may be null in tests
488            if (loginContext != null) {
489                loginContext.logout();
490            }
491        } catch (LoginException le) {
492            throw new NuxeoException(le);
493        }
494    }
495
496    @Override
497    public String getOriginatingUsername() {
498        return originatingUsername;
499    }
500
501    @Override
502    public long getSchedulingTime() {
503        return schedulingTime;
504    }
505
506    @Override
507    public long getStartTime() {
508        return startTime;
509    }
510
511    @Override
512    public long getCompletionTime() {
513        return completionTime;
514    }
515
516    @Override
517    public void setStartTime() {
518        startTime = System.currentTimeMillis();
519    }
520
521    protected void setCompletionTime() {
522        completionTime = System.currentTimeMillis();
523    }
524
525    @Override
526    public String getCategory() {
527        return getClass().getSimpleName();
528    }
529
530    @Override
531    public String toString() {
532        StringBuilder buf = new StringBuilder();
533        buf.append(getClass().getSimpleName());
534        buf.append('(');
535        if (docId != null) {
536            buf.append(docId);
537            buf.append(", ");
538        } else if (docIds != null && docIds.size() > 0) {
539            buf.append(docIds.get(0));
540            buf.append("..., ");
541        }
542        buf.append(getSchedulePath().getParentPath());
543        buf.append(", ");
544        buf.append(getProgress());
545        buf.append(", ");
546        buf.append(getStatus());
547        buf.append(')');
548        return buf.toString();
549    }
550
551    @Override
552    public DocumentLocation getDocument() {
553        if (docId != null) {
554            return newDocumentLocation(docId);
555        }
556        return null;
557    }
558
559    @Override
560    public List<DocumentLocation> getDocuments() {
561        if (docIds != null) {
562            List<DocumentLocation> res = new ArrayList<DocumentLocation>(docIds.size());
563            for (String docId : docIds) {
564                res.add(newDocumentLocation(docId));
565            }
566            return res;
567        }
568        if (docId != null) {
569            return Collections.singletonList(newDocumentLocation(docId));
570        }
571        return Collections.emptyList();
572    }
573
574    protected DocumentLocation newDocumentLocation(String docId) {
575        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
576    }
577
578    @Override
579    public boolean isDocumentTree() {
580        return isTree;
581    }
582
583    /**
584     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
585     * running a long process.
586     */
587    public void commitOrRollbackTransaction() {
588        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
589            TransactionHelper.commitOrRollbackTransaction();
590        }
591    }
592
593    /**
594     * Starts a new transaction.
595     * <p>
596     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
597     * process.
598     *
599     * @return true if a new transaction was started
600     */
601    public boolean startTransaction() {
602        return TransactionHelper.startTransaction();
603    }
604
605    @Override
606    public boolean equals(Object other) {
607        if (!(other instanceof Work)) {
608            return false;
609        }
610        return ((Work) other).getId().equals(getId());
611    }
612
613    @Override
614    public int hashCode() {
615        return getId().hashCode();
616    }
617
618}