001/*
002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
022
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import java.util.Random;
027
028import javax.security.auth.login.LoginContext;
029import javax.security.auth.login.LoginException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.common.logging.SequenceTracer;
034import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentLocation;
038import org.nuxeo.ecm.core.api.IdRef;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
041import org.nuxeo.ecm.core.work.api.Work;
042import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
043import org.nuxeo.runtime.api.Framework;
044import org.nuxeo.runtime.transaction.TransactionHelper;
045
046/**
047 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
048 * <p>
049 * It also deals with transaction management, and prevents running work instances that are suspending.
050 * <p>
051 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
052 * available.
053 * <p>
054 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
055 * state and call {@link #suspended()}.
056 * <p>
057 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
058 *
059 * @since 5.6
060 */
061public abstract class AbstractWork implements Work {
062
063    private static final long serialVersionUID = 1L;
064
065    private static final Log log = LogFactory.getLog(AbstractWork.class);
066
067    protected static final Random RANDOM = new Random();
068
069    protected String id;
070
071    /** Suspend requested by the work manager. */
072    protected transient volatile boolean suspending;
073
074    /** Suspend acknowledged by the work instance. */
075    protected transient volatile boolean suspended;
076
077    protected State state;
078
079    protected Progress progress;
080
081    /** Repository name for the Work instance, if relevant. */
082    protected String repositoryName;
083
084    /**
085     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
086     * instance will act.
087     * <p>
088     * Either docId or docIds is set. Not both.
089     */
090    protected String docId;
091
092    /**
093     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
094     * instance will act.
095     * <p>
096     * Either docId or docIds is set. Not both.
097     */
098    protected List<String> docIds;
099
100    /**
101     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
102     */
103    protected boolean isTree;
104
105    /**
106     * The originating username to use when opening the {@link CoreSession}.
107     *
108     * @since 8.1
109     */
110    protected String originatingUsername;
111
112    protected String status;
113
114    protected long schedulingTime;
115
116    protected long startTime;
117
118    protected long completionTime;
119
120    protected transient CoreSession session;
121
122    protected transient LoginContext loginContext;
123
124    protected WorkSchedulePath schedulePath;
125
126    protected String callerThread;
127
128    /**
129     * Constructs a {@link Work} instance with a unique id.
130     */
131    public AbstractWork() {
132        // we user RANDOM to deal with these cases:
133        // - several calls in the time granularity of nanoTime()
134        // - several concurrent calls on different servers
135        this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt()));
136        callerThread = SequenceTracer.getThreadName();
137    }
138
139    public AbstractWork(String id) {
140        this.id = id;
141        progress = PROGRESS_INDETERMINATE;
142        schedulingTime = System.currentTimeMillis();
143        callerThread = SequenceTracer.getThreadName();
144    }
145
146    @Override
147    public String getId() {
148        return id;
149    }
150
151    @Override
152    public WorkSchedulePath getSchedulePath() {
153        // schedulePath is transient so will become null after deserialization
154        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
155    }
156
157    @Override
158    public void setSchedulePath(WorkSchedulePath path) {
159        schedulePath = path;
160    }
161
162    public void setDocument(String repositoryName, String docId, boolean isTree) {
163        this.repositoryName = repositoryName;
164        this.docId = docId;
165        docIds = null;
166        this.isTree = isTree;
167    }
168
169    public void setDocument(String repositoryName, String docId) {
170        setDocument(repositoryName, docId, false);
171    }
172
173    public void setDocuments(String repositoryName, List<String> docIds) {
174        this.repositoryName = repositoryName;
175        docId = null;
176        this.docIds = new ArrayList<>(docIds);
177    }
178
179    /**
180     * @since 8.1
181     */
182    public void setOriginatingUsername(String originatingUsername) {
183        this.originatingUsername = originatingUsername;
184    }
185
186    @Override
187    public void setWorkInstanceSuspending() {
188        suspending = true;
189    }
190
191    @Override
192    public boolean isSuspending() {
193        return suspending;
194    }
195
196    @Override
197    public void suspended() {
198        suspended = true;
199    }
200
201    @Override
202    public boolean isWorkInstanceSuspended() {
203        return suspended;
204    }
205
206    @Override
207    public void setWorkInstanceState(State state) {
208        this.state = state;
209        if (log.isTraceEnabled()) {
210            log.trace(this + " state=" + state);
211        }
212    }
213
214    @Override
215    public State getWorkInstanceState() {
216        return state;
217    }
218
219    @Override
220    public void setProgress(Progress progress) {
221        this.progress = progress;
222        if (log.isTraceEnabled()) {
223            log.trace(String.valueOf(this));
224        }
225    }
226
227    @Override
228    public Progress getProgress() {
229        return progress;
230    }
231
232    /**
233     * Sets a human-readable status for this work instance.
234     *
235     * @param status the status
236     */
237    public void setStatus(String status) {
238        this.status = status;
239    }
240
241    @Override
242    public String getStatus() {
243        return status;
244    }
245
246    /**
247     * May be called by implementing classes to open a session on the repository.
248     *
249     * @return the session (also available in {@code session} field)
250     * @deprecated since 8.1. Use {@link #openSystemSession()}.
251     */
252    @Deprecated
253    public CoreSession initSession() {
254        return initSession(repositoryName);
255    }
256
257    /**
258     * May be called by implementing classes to open a System session on the repository.
259     *
260     * @since 8.1
261     */
262    public void openSystemSession() {
263        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
264    }
265
266    /**
267     * May be called by implementing classes to open a Use session on the repository.
268     * <p>
269     * It uses the set {@link #originatingUsername} to open the session.
270     *
271     * @since 8.1
272     */
273    public void openUserSession() {
274        if (originatingUsername == null) {
275            throw new IllegalStateException("Cannot open an user session without an originatingUsername");
276        }
277
278        try {
279            loginContext = Framework.loginAsUser(originatingUsername);
280        } catch (LoginException e) {
281            throw new NuxeoException(e);
282        }
283
284        session = CoreInstance.openCoreSession(repositoryName);
285    }
286
287    /**
288     * May be called by implementing classes to open a session on the given repository.
289     *
290     * @param repositoryName the repository name
291     * @return the session (also available in {@code session} field)
292     * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name,
293     *             otherwise use {@link CoreInstance#openCoreSessionSystem(String)}.
294     */
295    @Deprecated
296    public CoreSession initSession(String repositoryName) {
297        session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername);
298        return session;
299    }
300
301    /**
302     * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}.
303     *
304     * @since 5.8
305     */
306    public void closeSession() {
307        if (session != null) {
308            session.close();
309            session = null;
310        }
311    }
312
313    @Override
314    public void run() {
315        if (isSuspending()) {
316            // don't run anything if we're being started while a suspend
317            // has been requested
318            suspended();
319            return;
320        }
321        if (SequenceTracer.isEnabled()) {
322            SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9");
323        }
324        RuntimeException suppressed = null;
325        int retryCount = getRetryCount(); // may be 0
326        for (int i = 0; i <= retryCount; i++) {
327            if (i > 0) {
328                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
329                log.trace("Concurrent update", suppressed);
330            }
331            try {
332                runWorkWithTransaction();
333                SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms");
334                return;
335            } catch (RuntimeException e) {
336                if (suppressed == null) {
337                    suppressed = e;
338                } else {
339                    suppressed.addSuppressed(e);
340                }
341            }
342        }
343        // all retries have been done, throw the exception
344        if (suppressed != null) {
345            String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class="
346                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
347            SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms");
348            throw new RuntimeException(msg, suppressed);
349        }
350    }
351
352    private String getTitleOr(String defaultTitle) {
353        try {
354            return getTitle();
355        } catch (Exception e) {
356            return defaultTitle;
357        }
358    }
359
360    /**
361     * Does work under a transaction.
362     *
363     * @since 5.9.4
364     */
365    protected void runWorkWithTransaction() {
366        TransactionHelper.startTransaction();
367        boolean ok = false;
368        Exception exc = null;
369        try {
370            WorkSchedulePath.handleEnter(this);
371            // --- do work
372            setStartTime();
373            work(); // may throw ConcurrentUpdateException
374            ok = true;
375            // --- end work
376        } catch (Exception e) {
377            exc = e;
378            if (e instanceof RuntimeException) {
379                throw (RuntimeException) e;
380            } else if (e instanceof InterruptedException) {
381                // restore interrupted status for the thread pool worker
382                Thread.currentThread().interrupt();
383            }
384            throw new RuntimeException(e);
385        } finally {
386            WorkSchedulePath.handleReturn();
387            try {
388                setCompletionTime();
389                cleanUp(ok, exc);
390            } finally {
391                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
392                    if (!ok || isSuspending()) {
393                        log.trace(this + " is suspending, rollbacking");
394                        TransactionHelper.setTransactionRollbackOnly();
395                    }
396                    TransactionHelper.commitOrRollbackTransaction();
397                }
398            }
399        }
400    }
401
402    @Override
403    public abstract void work();
404
405    /**
406     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
407     *
408     * @return 0 for no retry, or more if some retries are possible
409     * @see #work
410     * @since 5.8
411     */
412    public int getRetryCount() {
413        return 0;
414    }
415
416    /**
417     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
418     * error or was interrupted.
419     *
420     * @param ok {@code true} if the work completed normally
421     * @param e the exception, if available
422     */
423    @Override
424    public void cleanUp(boolean ok, Exception e) {
425        if (!ok) {
426            if (e instanceof InterruptedException) {
427                log.debug("Suspended work: " + this);
428            } else {
429                if (!(e instanceof ConcurrentUpdateException)) {
430                    if (!isSuspending()) {
431                        log.error("Exception during work: " + this, e);
432                        if (WorkSchedulePath.captureStack) {
433                            WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
434                        }
435                    }
436                }
437            }
438        }
439        closeSession();
440
441        try {
442            // loginContext may be null in tests
443            if (loginContext != null) {
444                loginContext.logout();
445            }
446        } catch (LoginException le) {
447            throw new NuxeoException(le);
448        }
449    }
450
451    @Override
452    public String getOriginatingUsername() {
453        return originatingUsername;
454    }
455
456    @Override
457    public long getSchedulingTime() {
458        return schedulingTime;
459    }
460
461    @Override
462    public long getStartTime() {
463        return startTime;
464    }
465
466    @Override
467    public long getCompletionTime() {
468        return completionTime;
469    }
470
471    @Override
472    public void setStartTime() {
473        startTime = System.currentTimeMillis();
474    }
475
476    protected void setCompletionTime() {
477        completionTime = System.currentTimeMillis();
478    }
479
480    @Override
481    public String getCategory() {
482        return getClass().getSimpleName();
483    }
484
485    @Override
486    public String toString() {
487        StringBuilder buf = new StringBuilder();
488        buf.append(getClass().getSimpleName());
489        buf.append('(');
490        if (docId != null) {
491            buf.append(docId);
492            buf.append(", ");
493        } else if (docIds != null && docIds.size() > 0) {
494            buf.append(docIds.get(0));
495            buf.append("..., ");
496        }
497        buf.append(getSchedulePath().getParentPath());
498        buf.append(", ");
499        buf.append(getProgress());
500        buf.append(", ");
501        buf.append(getStatus());
502        buf.append(')');
503        return buf.toString();
504    }
505
506    @Override
507    public DocumentLocation getDocument() {
508        if (docId != null) {
509            return newDocumentLocation(docId);
510        }
511        return null;
512    }
513
514    @Override
515    public List<DocumentLocation> getDocuments() {
516        if (docIds != null) {
517            List<DocumentLocation> res = new ArrayList<>(docIds.size());
518            for (String docId : docIds) {
519                res.add(newDocumentLocation(docId));
520            }
521            return res;
522        }
523        if (docId != null) {
524            return Collections.singletonList(newDocumentLocation(docId));
525        }
526        return Collections.emptyList();
527    }
528
529    protected DocumentLocation newDocumentLocation(String docId) {
530        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
531    }
532
533    @Override
534    public boolean isDocumentTree() {
535        return isTree;
536    }
537
538    /**
539     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
540     * running a long process.
541     */
542    public void commitOrRollbackTransaction() {
543        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
544            TransactionHelper.commitOrRollbackTransaction();
545        }
546    }
547
548    /**
549     * Starts a new transaction.
550     * <p>
551     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
552     * process.
553     *
554     * @return true if a new transaction was started
555     */
556    public boolean startTransaction() {
557        return TransactionHelper.startTransaction();
558    }
559
560    @Override
561    public boolean equals(Object other) {
562        if (!(other instanceof Work)) {
563            return false;
564        }
565        return ((Work) other).getId().equals(getId());
566    }
567
568    @Override
569    public int hashCode() {
570        return getId().hashCode();
571    }
572
573}