001/*
002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import java.util.Random;
027
028import javax.security.auth.login.LoginContext;
029import javax.security.auth.login.LoginException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.common.logging.SequenceTracer;
034import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentLocation;
038import org.nuxeo.ecm.core.api.IdRef;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
041import org.nuxeo.ecm.core.work.api.Work;
042import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
043import org.nuxeo.runtime.api.Framework;
044import org.nuxeo.runtime.transaction.TransactionHelper;
045import org.nuxeo.runtime.transaction.TransactionRuntimeException;
046
047/**
048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
049 * <p>
050 * It also deals with transaction management, and prevents running work instances that are suspending.
051 * <p>
052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
053 * available.
054 * <p>
055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
056 * state and call {@link #suspended()}.
057 * <p>
058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
059 *
060 * @since 5.6
061 */
062public abstract class AbstractWork implements Work {
063
064    private static final long serialVersionUID = 1L;
065
066    private static final Log log = LogFactory.getLog(AbstractWork.class);
067
068    protected static final Random RANDOM = new Random();
069
070    protected String id;
071
072    /** Suspend requested by the work manager. */
073    protected transient volatile boolean suspending;
074
075    /** Suspend acknowledged by the work instance. */
076    protected transient volatile boolean suspended;
077
078    protected State state;
079
080    protected Progress progress;
081
082    /** Repository name for the Work instance, if relevant. */
083    protected String repositoryName;
084
085    /**
086     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
087     * instance will act.
088     * <p>
089     * Either docId or docIds is set. Not both.
090     */
091    protected String docId;
092
093    /**
094     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
095     * instance will act.
096     * <p>
097     * Either docId or docIds is set. Not both.
098     */
099    protected List<String> docIds;
100
101    /**
102     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
103     */
104    protected boolean isTree;
105
106    /**
107     * The originating username to use when opening the {@link CoreSession}.
108     *
109     * @since 8.1
110     */
111    protected String originatingUsername;
112
113    protected String status;
114
115    protected long schedulingTime;
116
117    protected long startTime;
118
119    protected long completionTime;
120
121    protected transient CoreSession session;
122
123    protected transient LoginContext loginContext;
124
125    protected WorkSchedulePath schedulePath;
126
127    protected String callerThread;
128
129    /**
130     * Constructs a {@link Work} instance with a unique id.
131     */
132    public AbstractWork() {
133        // we user RANDOM to deal with these cases:
134        // - several calls in the time granularity of nanoTime()
135        // - several concurrent calls on different servers
136        this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt()));
137        callerThread = SequenceTracer.getThreadName();
138    }
139
140    public AbstractWork(String id) {
141        this.id = id;
142        progress = PROGRESS_INDETERMINATE;
143        schedulingTime = System.currentTimeMillis();
144        callerThread = SequenceTracer.getThreadName();
145    }
146
147    @Override
148    public String getId() {
149        return id;
150    }
151
152    @Override
153    public WorkSchedulePath getSchedulePath() {
154        // schedulePath is transient so will become null after deserialization
155        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
156    }
157
158    @Override
159    public void setSchedulePath(WorkSchedulePath path) {
160        schedulePath = path;
161    }
162
163    public void setDocument(String repositoryName, String docId, boolean isTree) {
164        this.repositoryName = repositoryName;
165        this.docId = docId;
166        docIds = null;
167        this.isTree = isTree;
168    }
169
170    public void setDocument(String repositoryName, String docId) {
171        setDocument(repositoryName, docId, false);
172    }
173
174    public void setDocuments(String repositoryName, List<String> docIds) {
175        this.repositoryName = repositoryName;
176        docId = null;
177        this.docIds = new ArrayList<>(docIds);
178    }
179
180    /**
181     * @since 8.1
182     */
183    public void setOriginatingUsername(String originatingUsername) {
184        this.originatingUsername = originatingUsername;
185    }
186
187    @Override
188    public void setWorkInstanceSuspending() {
189        suspending = true;
190    }
191
192    @Override
193    public boolean isSuspending() {
194        return suspending;
195    }
196
197    @Override
198    public void suspended() {
199        suspended = true;
200    }
201
202    @Override
203    public boolean isWorkInstanceSuspended() {
204        return suspended;
205    }
206
207    @Override
208    public void setWorkInstanceState(State state) {
209        this.state = state;
210        if (log.isTraceEnabled()) {
211            log.trace(this + " state=" + state);
212        }
213    }
214
215    @Override
216    public State getWorkInstanceState() {
217        return state;
218    }
219
220    @Override
221    public void setProgress(Progress progress) {
222        this.progress = progress;
223        if (log.isTraceEnabled()) {
224            log.trace(String.valueOf(this));
225        }
226    }
227
228    @Override
229    public Progress getProgress() {
230        return progress;
231    }
232
233    /**
234     * Sets a human-readable status for this work instance.
235     *
236     * @param status the status
237     */
238    public void setStatus(String status) {
239        this.status = status;
240    }
241
242    @Override
243    public String getStatus() {
244        return status;
245    }
246
247    /**
248     * May be called by implementing classes to open a session on the repository.
249     *
250     * @return the session (also available in {@code session} field)
251     * @deprecated since 8.1. Use {@link #openSystemSession()}.
252     */
253    @Deprecated
254    public CoreSession initSession() {
255        return initSession(repositoryName);
256    }
257
258    /**
259     * May be called by implementing classes to open a System session on the repository.
260     *
261     * @since 8.1
262     */
263    public void openSystemSession() {
264        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
265    }
266
267    /**
268     * May be called by implementing classes to open a Use session on the repository.
269     * <p>
270     * It uses the set {@link #originatingUsername} to open the session.
271     *
272     * @since 8.1
273     */
274    public void openUserSession() {
275        if (originatingUsername == null) {
276            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
277        }
278
279        try {
280            loginContext = Framework.loginAsUser(originatingUsername);
281        } catch (LoginException e) {
282            throw new NuxeoException(e);
283        }
284
285        session = CoreInstance.openCoreSession(repositoryName);
286    }
287
288    /**
289     * May be called by implementing classes to open a session on the given repository.
290     *
291     * @param repositoryName the repository name
292     * @return the session (also available in {@code session} field)
293     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
294     *             otherwise use {@link CoreInstance#openCoreSessionSystem(String)}.
295     */
296    @Deprecated
297    public CoreSession initSession(String repositoryName) {
298        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
299        return session;
300    }
301
302    /**
303     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
304     *
305     * @since 5.8
306     */
307    public void closeSession() {
308        if (session != null) {
309            session.close();
310            session = null;
311        }
312    }
313
314    @Override
315    public void run() {
316        if (isSuspending()) {
317            // don't run anything if we're being started while a suspend
318            // has been requested
319            suspended();
320            return;
321        }
322        if (SequenceTracer.isEnabled()) {
323            SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9");
324        }
325        Exception suppressed = null;
326        int retryCount = getRetryCount(); // may be 0
327        for (int i = 0; i <= retryCount; i++) {
328            if (i > 0) {
329                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
330                log.trace("Concurrent update", suppressed);
331            }
332            Exception e = runWorkWithTransactionAndCheckExceptions();
333            if (e == null) {
334                // no exception, work is done
335                SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms");
336                return;
337            }
338            if (suppressed == null) {
339                suppressed = e;
340            } else {
341                suppressed.addSuppressed(e);
342            }
343        }
344        // all retries have been done, throw the exception
345        if (suppressed != null) {
346            String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class="
347                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
348            SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms");
349            throw new RuntimeException(msg, suppressed);
350        }
351    }
352
353    private String getTitleOr(String defaultTitle) {
354        try {
355            return getTitle();
356        } catch (Exception e) {
357            return defaultTitle;
358        }
359    }
360
361    /**
362     * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry.
363     *
364     * @since 5.9.4
365     */
366    protected Exception runWorkWithTransactionAndCheckExceptions() {
367        List<Exception> suppressed = Collections.emptyList();
368        try {
369            TransactionHelper.noteSuppressedExceptions();
370            try {
371                runWorkWithTransaction();
372            } finally {
373                suppressed = TransactionHelper.getSuppressedExceptions();
374            }
375        } catch (ConcurrentUpdateException e) {
376            // happens typically during save()
377            return e;
378        } catch (TransactionRuntimeException e) {
379            // error at commit time
380            if (suppressed.isEmpty()) {
381                return e;
382            }
383        }
384        // reached if no catch, or if TransactionRuntimeException caught
385        if (suppressed.isEmpty()) {
386            return null;
387        }
388        // exceptions during commit caused a rollback in SessionImpl#end
389        Exception e = suppressed.get(0);
390        for (int i = 1; i < suppressed.size(); i++) {
391            e.addSuppressed(suppressed.get(i));
392        }
393        return e;
394    }
395
396    /**
397     * Does work under a transaction.
398     *
399     * @since 5.9.4
400     * @throws ConcurrentUpdateException, TransactionRuntimeException
401     */
402    protected void runWorkWithTransaction() throws ConcurrentUpdateException {
403        TransactionHelper.startTransaction();
404        boolean ok = false;
405        Exception exc = null;
406        try {
407            WorkSchedulePath.handleEnter(this);
408            // --- do work
409            setStartTime();
410            work(); // may throw ConcurrentUpdateException
411            ok = true;
412            // --- end work
413        } catch (Exception e) {
414            exc = e;
415            if (e instanceof ConcurrentUpdateException) {
416                throw (ConcurrentUpdateException) e;
417            } else if (e instanceof RuntimeException) {
418                throw (RuntimeException) e;
419            } else if (e instanceof InterruptedException) {
420                // restore interrupted status for the thread pool worker
421                Thread.currentThread().interrupt();
422            }
423            throw new RuntimeException(e);
424        } finally {
425            WorkSchedulePath.handleReturn();
426            try {
427                setCompletionTime();
428                cleanUp(ok, exc);
429            } finally {
430                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
431                    if (!ok || isSuspending()) {
432                        log.trace(this + " is suspending, rollbacking");
433                        TransactionHelper.setTransactionRollbackOnly();
434                    }
435                    TransactionHelper.commitOrRollbackTransaction();
436                }
437            }
438        }
439    }
440
441    @Override
442    public abstract void work();
443
444    /**
445     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
446     *
447     * @return 0 for no retry, or more if some retries are possible
448     * @see #work
449     * @since 5.8
450     */
451    public int getRetryCount() {
452        return 0;
453    }
454
455    /**
456     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
457     * error or was interrupted.
458     *
459     * @param ok {@code true} if the work completed normally
460     * @param e the exception, if available
461     */
462    @Override
463    public void cleanUp(boolean ok, Exception e) {
464        if (!ok) {
465            if (e instanceof InterruptedException) {
466                log.debug("Suspended work: " + this);
467            } else {
468                if (!(e instanceof ConcurrentUpdateException)) {
469                    if (!isSuspending()) {
470                        log.error("Exception during work: " + this, e);
471                        if (WorkSchedulePath.captureStack) {
472                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
473                        }
474                    }
475                }
476            }
477        }
478        closeSession();
479
480        try {
481            // loginContext may be null in tests
482            if (loginContext != null) {
483                loginContext.logout();
484            }
485        } catch (LoginException le) {
486            throw new NuxeoException(le);
487        }
488    }
489
490    @Override
491    public String getOriginatingUsername() {
492        return originatingUsername;
493    }
494
495    @Override
496    public long getSchedulingTime() {
497        return schedulingTime;
498    }
499
500    @Override
501    public long getStartTime() {
502        return startTime;
503    }
504
505    @Override
506    public long getCompletionTime() {
507        return completionTime;
508    }
509
510    @Override
511    public void setStartTime() {
512        startTime = System.currentTimeMillis();
513    }
514
515    protected void setCompletionTime() {
516        completionTime = System.currentTimeMillis();
517    }
518
519    @Override
520    public String getCategory() {
521        return getClass().getSimpleName();
522    }
523
524    @Override
525    public String toString() {
526        StringBuilder buf = new StringBuilder();
527        buf.append(getClass().getSimpleName());
528        buf.append('(');
529        if (docId != null) {
530            buf.append(docId);
531            buf.append(", ");
532        } else if (docIds != null && docIds.size() > 0) {
533            buf.append(docIds.get(0));
534            buf.append("..., ");
535        }
536        buf.append(getSchedulePath().getParentPath());
537        buf.append(", ");
538        buf.append(getProgress());
539        buf.append(", ");
540        buf.append(getStatus());
541        buf.append(')');
542        return buf.toString();
543    }
544
545    @Override
546    public DocumentLocation getDocument() {
547        if (docId != null) {
548            return newDocumentLocation(docId);
549        }
550        return null;
551    }
552
553    @Override
554    public List<DocumentLocation> getDocuments() {
555        if (docIds != null) {
556            List<DocumentLocation> res = new ArrayList<>(docIds.size());
557            for (String docId : docIds) {
558                res.add(newDocumentLocation(docId));
559            }
560            return res;
561        }
562        if (docId != null) {
563            return Collections.singletonList(newDocumentLocation(docId));
564        }
565        return Collections.emptyList();
566    }
567
568    protected DocumentLocation newDocumentLocation(String docId) {
569        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
570    }
571
572    @Override
573    public boolean isDocumentTree() {
574        return isTree;
575    }
576
577    /**
578     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
579     * running a long process.
580     */
581    public void commitOrRollbackTransaction() {
582        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
583            TransactionHelper.commitOrRollbackTransaction();
584        }
585    }
586
587    /**
588     * Starts a new transaction.
589     * <p>
590     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
591     * process.
592     *
593     * @return true if a new transaction was started
594     */
595    public boolean startTransaction() {
596        return TransactionHelper.startTransaction();
597    }
598
599    @Override
600    public boolean equals(Object other) {
601        if (!(other instanceof Work)) {
602            return false;
603        }
604        return ((Work) other).getId().equals(getId());
605    }
606
607    @Override
608    public int hashCode() {
609        return getId().hashCode();
610    }
611
612}