001/*
002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import java.util.Random;
027
028import javax.security.auth.login.LoginContext;
029import javax.security.auth.login.LoginException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.common.logging.SequenceTracer;
034import org.nuxeo.common.utils.ExceptionUtils;
035import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentLocation;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
042import org.nuxeo.ecm.core.work.api.Work;
043import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047/**
048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
049 * <p>
050 * It also deals with transaction management, and prevents running work instances that are suspending.
051 * <p>
052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
053 * available.
054 * <p>
055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
056 * state and call {@link #suspended()}.
057 * <p>
058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
059 *
060 * @since 5.6
061 */
062public abstract class AbstractWork implements Work {
063
064    private static final long serialVersionUID = 1L;
065
066    private static final Log log = LogFactory.getLog(AbstractWork.class);
067
068    protected static final Random RANDOM = new Random();
069
070    protected String id;
071
072    /** Suspend requested by the work manager. */
073    protected transient volatile boolean suspending;
074
075    /** Suspend acknowledged by the work instance. */
076    protected transient volatile boolean suspended;
077
078    protected State state;
079
080    protected Progress progress;
081
082    /** Repository name for the Work instance, if relevant. */
083    protected String repositoryName;
084
085    /**
086     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
087     * instance will act.
088     * <p>
089     * Either docId or docIds is set. Not both.
090     */
091    protected String docId;
092
093    /**
094     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
095     * instance will act.
096     * <p>
097     * Either docId or docIds is set. Not both.
098     */
099    protected List<String> docIds;
100
101    /**
102     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
103     */
104    protected boolean isTree;
105
106    /**
107     * The originating username to use when opening the {@link CoreSession}.
108     *
109     * @since 8.1
110     */
111    protected String originatingUsername;
112
113    protected String status;
114
115    protected long schedulingTime;
116
117    protected long startTime;
118
119    protected long completionTime;
120
121    protected transient CoreSession session;
122
123    protected transient LoginContext loginContext;
124
125    protected WorkSchedulePath schedulePath;
126
127    protected String callerThread;
128
129    /**
130     * Constructs a {@link Work} instance with a unique id.
131     */
132    public AbstractWork() {
133        // we user RANDOM to deal with these cases:
134        // - several calls in the time granularity of nanoTime()
135        // - several concurrent calls on different servers
136        this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt()));
137        callerThread = SequenceTracer.getThreadName();
138    }
139
140    public AbstractWork(String id) {
141        this.id = id;
142        progress = PROGRESS_INDETERMINATE;
143        schedulingTime = System.currentTimeMillis();
144        callerThread = SequenceTracer.getThreadName();
145    }
146
147    @Override
148    public String getId() {
149        return id;
150    }
151
152    @Override
153    public WorkSchedulePath getSchedulePath() {
154        // schedulePath is transient so will become null after deserialization
155        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
156    }
157
158    @Override
159    public void setSchedulePath(WorkSchedulePath path) {
160        schedulePath = path;
161    }
162
163    public void setDocument(String repositoryName, String docId, boolean isTree) {
164        this.repositoryName = repositoryName;
165        this.docId = docId;
166        docIds = null;
167        this.isTree = isTree;
168    }
169
170    public void setDocument(String repositoryName, String docId) {
171        setDocument(repositoryName, docId, false);
172    }
173
174    public void setDocuments(String repositoryName, List<String> docIds) {
175        this.repositoryName = repositoryName;
176        docId = null;
177        this.docIds = new ArrayList<>(docIds);
178    }
179
180    /**
181     * @since 8.1
182     */
183    public void setOriginatingUsername(String originatingUsername) {
184        this.originatingUsername = originatingUsername;
185    }
186
187    @Override
188    public void setWorkInstanceSuspending() {
189        suspending = true;
190    }
191
192    @Override
193    public boolean isSuspending() {
194        return suspending;
195    }
196
197    @Override
198    public void suspended() {
199        suspended = true;
200    }
201
202    @Override
203    public boolean isWorkInstanceSuspended() {
204        return suspended;
205    }
206
207    @Override
208    public void setWorkInstanceState(State state) {
209        this.state = state;
210        if (log.isTraceEnabled()) {
211            log.trace(this + " state=" + state);
212        }
213    }
214
215    @Override
216    public State getWorkInstanceState() {
217        return state;
218    }
219
220    @Override
221    public void setProgress(Progress progress) {
222        this.progress = progress;
223        if (log.isTraceEnabled()) {
224            log.trace(String.valueOf(this));
225        }
226    }
227
228    @Override
229    public Progress getProgress() {
230        return progress;
231    }
232
233    /**
234     * Sets a human-readable status for this work instance.
235     *
236     * @param status the status
237     */
238    public void setStatus(String status) {
239        this.status = status;
240    }
241
242    @Override
243    public String getStatus() {
244        return status;
245    }
246
247    /**
248     * May be called by implementing classes to open a session on the repository.
249     *
250     * @return the session (also available in {@code session} field)
251     * @deprecated since 8.1. Use {@link #openSystemSession()}.
252     */
253    @Deprecated
254    public CoreSession initSession() {
255        return initSession(repositoryName);
256    }
257
258    /**
259     * May be called by implementing classes to open a System session on the repository.
260     *
261     * @since 8.1
262     */
263    public void openSystemSession() {
264        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
265    }
266
267    /**
268     * May be called by implementing classes to open a Use session on the repository.
269     * <p>
270     * It uses the set {@link #originatingUsername} to open the session.
271     *
272     * @since 8.1
273     */
274    public void openUserSession() {
275        if (originatingUsername == null) {
276            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
277        }
278
279        try {
280            loginContext = Framework.loginAsUser(originatingUsername);
281        } catch (LoginException e) {
282            throw new NuxeoException(e);
283        }
284
285        session = CoreInstance.openCoreSession(repositoryName);
286    }
287
288    /**
289     * May be called by implementing classes to open a session on the given repository.
290     *
291     * @param repositoryName the repository name
292     * @return the session (also available in {@code session} field)
293     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
294     *             otherwise use {@link CoreInstance#openCoreSessionSystem(String)}.
295     */
296    @Deprecated
297    public CoreSession initSession(String repositoryName) {
298        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
299        return session;
300    }
301
302    /**
303     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
304     *
305     * @since 5.8
306     */
307    public void closeSession() {
308        if (session != null) {
309            session.close();
310            session = null;
311        }
312    }
313
314    @Override
315    public void run() {
316        if (isSuspending()) {
317            // don't run anything if we're being started while a suspend
318            // has been requested
319            suspended();
320            return;
321        }
322        if (SequenceTracer.isEnabled()) {
323            SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9");
324        }
325        RuntimeException suppressed = null;
326        int retryCount = getRetryCount(); // may be 0
327        for (int i = 0; i <= retryCount; i++) {
328            if (i > 0) {
329                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
330                log.trace("Concurrent update", suppressed);
331            }
332            if (ExceptionUtils.hasInterruptedCause(suppressed)) {
333                // if we're here suppressed != null so we destroy SequenceTracer
334                log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down");
335                break;
336            }
337            try {
338                runWorkWithTransaction();
339                SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms");
340                return;
341            } catch (RuntimeException e) {
342                if (suppressed == null) {
343                    suppressed = e;
344                } else {
345                    suppressed.addSuppressed(e);
346                }
347            }
348        }
349        // all retries have been done, throw the exception
350        if (suppressed != null) {
351            String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class="
352                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
353            SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms");
354            throw new RuntimeException(msg, suppressed);
355        }
356    }
357
358    private String getTitleOr(String defaultTitle) {
359        try {
360            return getTitle();
361        } catch (Exception e) {
362            return defaultTitle;
363        }
364    }
365
366    /**
367     * Does work under a transaction.
368     *
369     * @since 5.9.4
370     */
371    protected void runWorkWithTransaction() {
372        TransactionHelper.startTransaction();
373        boolean ok = false;
374        Exception exc = null;
375        try {
376            WorkSchedulePath.handleEnter(this);
377            // --- do work
378            setStartTime();
379            work(); // may throw ConcurrentUpdateException
380            ok = true;
381            // --- end work
382        } catch (Exception e) {
383            exc = e;
384            if (e instanceof RuntimeException) {
385                throw (RuntimeException) e;
386            } else if (e instanceof InterruptedException) {
387                // restore interrupted status for the thread pool worker
388                Thread.currentThread().interrupt();
389            }
390            throw new RuntimeException(e);
391        } finally {
392            WorkSchedulePath.handleReturn();
393            try {
394                setCompletionTime();
395                cleanUp(ok, exc);
396            } finally {
397                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
398                    if (!ok || isSuspending()) {
399                        log.trace(this + " is suspending, rollbacking");
400                        TransactionHelper.setTransactionRollbackOnly();
401                    }
402                    TransactionHelper.commitOrRollbackTransaction();
403                }
404            }
405        }
406    }
407
408    @Override
409    public abstract void work();
410
411    /**
412     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
413     *
414     * @return 0 for no retry, or more if some retries are possible
415     * @see #work
416     * @since 5.8
417     */
418    public int getRetryCount() {
419        return 0;
420    }
421
422    /**
423     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
424     * error or was interrupted.
425     *
426     * @param ok {@code true} if the work completed normally
427     * @param e the exception, if available
428     */
429    @Override
430    public void cleanUp(boolean ok, Exception e) {
431        if (!ok) {
432            if (ExceptionUtils.hasInterruptedCause(e)) {
433                log.debug("Interrupted work: " + this);
434            } else {
435                if (!(e instanceof ConcurrentUpdateException)) {
436                    if (!isSuspending()) {
437                        log.error("Exception during work: " + this, e);
438                        if (WorkSchedulePath.captureStack) {
439                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
440                        }
441                    }
442                }
443            }
444        }
445        closeSession();
446
447        try {
448            // loginContext may be null in tests
449            if (loginContext != null) {
450                loginContext.logout();
451            }
452        } catch (LoginException le) {
453            throw new NuxeoException(le);
454        }
455    }
456
457    @Override
458    public String getOriginatingUsername() {
459        return originatingUsername;
460    }
461
462    @Override
463    public long getSchedulingTime() {
464        return schedulingTime;
465    }
466
467    @Override
468    public long getStartTime() {
469        return startTime;
470    }
471
472    @Override
473    public long getCompletionTime() {
474        return completionTime;
475    }
476
477    @Override
478    public void setStartTime() {
479        startTime = System.currentTimeMillis();
480    }
481
482    protected void setCompletionTime() {
483        completionTime = System.currentTimeMillis();
484    }
485
486    @Override
487    public String getCategory() {
488        return getClass().getSimpleName();
489    }
490
491    @Override
492    public String toString() {
493        StringBuilder buf = new StringBuilder();
494        buf.append(getClass().getSimpleName());
495        buf.append('(');
496        if (docId != null) {
497            buf.append(docId);
498            buf.append(", ");
499        } else if (docIds != null && docIds.size() > 0) {
500            buf.append(docIds.get(0));
501            buf.append("..., ");
502        }
503        buf.append(getSchedulePath().getParentPath());
504        buf.append(", ");
505        buf.append(getProgress());
506        buf.append(", ");
507        buf.append(getStatus());
508        buf.append(')');
509        return buf.toString();
510    }
511
512    @Override
513    public DocumentLocation getDocument() {
514        if (docId != null) {
515            return newDocumentLocation(docId);
516        }
517        return null;
518    }
519
520    @Override
521    public List<DocumentLocation> getDocuments() {
522        if (docIds != null) {
523            List<DocumentLocation> res = new ArrayList<>(docIds.size());
524            for (String docId : docIds) {
525                res.add(newDocumentLocation(docId));
526            }
527            return res;
528        }
529        if (docId != null) {
530            return Collections.singletonList(newDocumentLocation(docId));
531        }
532        return Collections.emptyList();
533    }
534
535    protected DocumentLocation newDocumentLocation(String docId) {
536        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
537    }
538
539    @Override
540    public boolean isDocumentTree() {
541        return isTree;
542    }
543
544    /**
545     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
546     * running a long process.
547     */
548    public void commitOrRollbackTransaction() {
549        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
550            TransactionHelper.commitOrRollbackTransaction();
551        }
552    }
553
554    /**
555     * Starts a new transaction.
556     * <p>
557     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
558     * process.
559     *
560     * @return true if a new transaction was started
561     */
562    public boolean startTransaction() {
563        return TransactionHelper.startTransaction();
564    }
565
566    @Override
567    public boolean equals(Object other) {
568        if (!(other instanceof Work)) {
569            return false;
570        }
571        return ((Work) other).getId().equals(getId());
572    }
573
574    @Override
575    public int hashCode() {
576        return getId().hashCode();
577    }
578
579    @Override
580    public String getPartitionKey() {
581        if (docId != null) {
582            return docId;
583        } else if (docIds != null && !docIds.isEmpty()) {
584            return docIds.get(0);
585        }
586        return getId();
587    }
588}