001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.work;
018
019import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Random;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
029import org.nuxeo.ecm.core.api.CoreInstance;
030import org.nuxeo.ecm.core.api.CoreSession;
031import org.nuxeo.ecm.core.api.DocumentLocation;
032import org.nuxeo.ecm.core.api.IdRef;
033import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl;
034import org.nuxeo.ecm.core.work.api.Work;
035import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
036import org.nuxeo.runtime.transaction.TransactionHelper;
037import org.nuxeo.runtime.transaction.TransactionRuntimeException;
038
039/**
040 * A base implementation for a {@link Work} instance, dealing with most of the details around state change.
041 * <p>
042 * It also deals with transaction management, and prevents running work instances that are suspending.
043 * <p>
044 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is
045 * available.
046 * <p>
047 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its
048 * state and call {@link #suspended()}.
049 * <p>
050 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}.
051 *
052 * @since 5.6
053 */
054public abstract class AbstractWork implements Work {
055
056    private static final long serialVersionUID = 1L;
057
058    private static final Log log = LogFactory.getLog(AbstractWork.class);
059
060    protected static final Random RANDOM = new Random();
061
062    protected String id;
063
064    /** Suspend requested by the work manager. */
065    protected transient volatile boolean suspending;
066
067    /** Suspend acknowledged by the work instance. */
068    protected transient volatile boolean suspended;
069
070    protected transient State state;
071
072    protected transient Progress progress;
073
074    /** Repository name for the Work instance, if relevant. */
075    protected String repositoryName;
076
077    /**
078     * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work
079     * instance will act.
080     * <p>
081     * Either docId or docIds is set. Not both.
082     */
083    protected String docId;
084
085    /**
086     * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work
087     * instance will act.
088     * <p>
089     * Either docId or docIds is set. Not both.
090     */
091    protected List<String> docIds;
092
093    /**
094     * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act.
095     */
096    protected boolean isTree;
097
098    protected String status;
099
100    protected long schedulingTime;
101
102    protected long startTime;
103
104    protected long completionTime;
105
106    protected transient CoreSession session;
107
108    protected WorkSchedulePath schedulePath;
109
110    /**
111     * Constructs a {@link Work} instance with a unique id.
112     */
113    public AbstractWork() {
114        // we user RANDOM to deal with these cases:
115        // - several calls in the time granularity of nanoTime()
116        // - several concurrent calls on different servers
117        this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt()));
118    }
119
120    public AbstractWork(String id) {
121        this.id = id;
122        progress = PROGRESS_INDETERMINATE;
123        schedulingTime = System.currentTimeMillis();
124    }
125
126    @Override
127    public String getId() {
128        return id;
129    }
130
131    @Override
132    public WorkSchedulePath getSchedulePath() {
133        // schedulePath is transient so will become null after deserialization
134        return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath;
135    }
136
137    @Override
138    public void setSchedulePath(WorkSchedulePath path) {
139        schedulePath = path;
140    }
141
142    public void setDocument(String repositoryName, String docId, boolean isTree) {
143        this.repositoryName = repositoryName;
144        this.docId = docId;
145        docIds = null;
146        this.isTree = isTree;
147    }
148
149    public void setDocument(String repositoryName, String docId) {
150        setDocument(repositoryName, docId, false);
151    }
152
153    public void setDocuments(String repositoryName, List<String> docIds) {
154        this.repositoryName = repositoryName;
155        docId = null;
156        this.docIds = new ArrayList<String>(docIds);
157    }
158
159    @Override
160    public void setWorkInstanceSuspending() {
161        suspending = true;
162    }
163
164    @Override
165    public boolean isSuspending() {
166        return suspending;
167    }
168
169    @Override
170    public void suspended() {
171        suspended = true;
172    }
173
174    @Override
175    public boolean isWorkInstanceSuspended() {
176        return suspended;
177    }
178
179    @Override
180    public void setWorkInstanceState(State state) {
181        this.state = state;
182        if (log.isTraceEnabled()) {
183            log.trace(this + " state=" + state);
184        }
185    }
186
187    @Override
188    public State getWorkInstanceState() {
189        return state;
190    }
191
192    @Override
193    @Deprecated
194    public State getState() {
195        return state;
196    }
197
198    @Override
199    public void setProgress(Progress progress) {
200        this.progress = progress;
201        if (log.isTraceEnabled()) {
202            log.trace(String.valueOf(this));
203        }
204    }
205
206    @Override
207    public Progress getProgress() {
208        return progress;
209    }
210
211    /**
212     * Sets a human-readable status for this work instance.
213     *
214     * @param status the status
215     */
216    public void setStatus(String status) {
217        this.status = status;
218    }
219
220    @Override
221    public String getStatus() {
222        return status;
223    }
224
225    /**
226     * May be called by implementing classes to open a session on the repository.
227     *
228     * @return the session (also available in {@code session} field)
229     */
230    public CoreSession initSession() {
231        return initSession(repositoryName);
232    }
233
234    /**
235     * May be called by implementing classes to open a session on the given repository.
236     *
237     * @param repositoryName the repository name
238     * @return the session (also available in {@code session} field)
239     */
240    public CoreSession initSession(String repositoryName) {
241        session = CoreInstance.openCoreSessionSystem(repositoryName);
242        return session;
243    }
244
245    /**
246     * Closes the session that was opened by {@link #initSession}.
247     *
248     * @since 5.8
249     */
250    public void closeSession() {
251        if (session != null) {
252            session.close();
253            session = null;
254        }
255    }
256
257    @Override
258    public void run() {
259        if (isSuspending()) {
260            // don't run anything if we're being started while a suspend
261            // has been requested
262            suspended();
263            return;
264        }
265        Exception suppressed = null;
266        int retryCount = getRetryCount(); // may be 0
267        for (int i = 0; i <= retryCount; i++) {
268            if (i > 0) {
269                log.debug("Retrying work due to concurrent update (" + i + "): " + this);
270                log.trace("Concurrent update", suppressed);
271            }
272            Exception e = runWorkWithTransactionAndCheckExceptions();
273            if (e == null) {
274                // no exception, work is done
275                return;
276            }
277            if (suppressed == null) {
278                suppressed = e;
279            } else {
280                suppressed.addSuppressed(e);
281            }
282        }
283        // all retries have been done, throw the exception
284        if (suppressed != null) {
285            String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class="
286                    + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle();
287            throw new RuntimeException(msg, suppressed);
288        }
289    }
290
291    /**
292     * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry.
293     *
294     * @since 5.9.4
295     */
296    protected Exception runWorkWithTransactionAndCheckExceptions() {
297        List<Exception> suppressed = Collections.emptyList();
298        try {
299            TransactionHelper.noteSuppressedExceptions();
300            try {
301                runWorkWithTransaction();
302            } finally {
303                suppressed = TransactionHelper.getSuppressedExceptions();
304            }
305        } catch (ConcurrentUpdateException e) {
306            // happens typically during save()
307            return e;
308        } catch (TransactionRuntimeException e) {
309            // error at commit time
310            if (suppressed.isEmpty()) {
311                return e;
312            }
313        }
314        // reached if no catch, or if TransactionRuntimeException caught
315        if (suppressed.isEmpty()) {
316            return null;
317        }
318        // exceptions during commit caused a rollback in SessionImpl#end
319        Exception e = suppressed.get(0);
320        for (int i = 1; i < suppressed.size(); i++) {
321            e.addSuppressed(suppressed.get(i));
322        }
323        return e;
324    }
325
326    /**
327     * Does work under a transaction.
328     *
329     * @since 5.9.4
330     * @throws ConcurrentUpdateException, TransactionRuntimeException
331     */
332    protected void runWorkWithTransaction() throws ConcurrentUpdateException {
333        TransactionHelper.startTransaction();
334        boolean ok = false;
335        Exception exc = null;
336        try {
337            WorkSchedulePath.handleEnter(this);
338            // --- do work
339            setStartTime();
340            work(); // may throw ConcurrentUpdateException
341            ok = true;
342            // --- end work
343        } catch (Exception e) {
344            exc = e;
345            if (e instanceof ConcurrentUpdateException) {
346                throw (ConcurrentUpdateException) e;
347            } else if (e instanceof RuntimeException) {
348                throw (RuntimeException) e;
349            } else if (e instanceof InterruptedException) {
350                // restore interrupted status for the thread pool worker
351                Thread.currentThread().interrupt();
352            }
353            throw new RuntimeException(e);
354        } finally {
355            WorkSchedulePath.handleReturn();
356            try {
357                setCompletionTime();
358                cleanUp(ok, exc);
359            } finally {
360                if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
361                    if (!ok) {
362                        TransactionHelper.setTransactionRollbackOnly();
363                    }
364                    TransactionHelper.commitOrRollbackTransaction();
365                }
366            }
367        }
368    }
369
370    @Override
371    public abstract void work();
372
373    /**
374     * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions.
375     *
376     * @return 0 for no retry, or more if some retries are possible
377     * @see #work
378     * @since 5.8
379     */
380    public int getRetryCount() {
381        return 0;
382    }
383
384    /**
385     * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in
386     * error or was interrupted.
387     *
388     * @param ok {@code true} if the work completed normally
389     * @param e the exception, if available
390     */
391    @Override
392    public void cleanUp(boolean ok, Exception e) {
393        if (!ok) {
394            if (e instanceof InterruptedException) {
395                log.debug("Suspended work: " + this);
396            } else {
397                if (!(e instanceof ConcurrentUpdateException)) {
398                    log.error("Exception during work: " + this, e);
399                    if (WorkSchedulePath.captureStack) {
400                        WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack());
401                    }
402                }
403            }
404        }
405        closeSession();
406    }
407
408    @Override
409    public String getUserId() {
410        // TODO
411        return null;
412    }
413
414    @Override
415    public long getSchedulingTime() {
416        return schedulingTime;
417    }
418
419    @Override
420    public long getStartTime() {
421        return startTime;
422    }
423
424    @Override
425    public long getCompletionTime() {
426        return completionTime;
427    }
428
429    @Override
430    public void setStartTime() {
431        startTime = System.currentTimeMillis();
432    }
433
434    protected void setCompletionTime() {
435        completionTime = System.currentTimeMillis();
436    }
437
438    @Override
439    public String getCategory() {
440        return getClass().getSimpleName();
441    }
442
443    @Override
444    public String toString() {
445        StringBuilder buf = new StringBuilder();
446        buf.append(getClass().getSimpleName());
447        buf.append('(');
448        if (docId != null) {
449            buf.append(docId);
450            buf.append(", ");
451        } else if (docIds != null && docIds.size() > 0) {
452            buf.append(docIds.get(0));
453            buf.append("..., ");
454        }
455        buf.append(getSchedulePath().getParentPath());
456        buf.append(", ");
457        buf.append(getProgress());
458        buf.append(", ");
459        buf.append(getStatus());
460        buf.append(')');
461        return buf.toString();
462    }
463
464    @Override
465    public DocumentLocation getDocument() {
466        if (docId != null) {
467            return newDocumentLocation(docId);
468        }
469        return null;
470    }
471
472    @Override
473    public List<DocumentLocation> getDocuments() {
474        if (docIds != null) {
475            List<DocumentLocation> res = new ArrayList<DocumentLocation>(docIds.size());
476            for (String docId : docIds) {
477                res.add(newDocumentLocation(docId));
478            }
479            return res;
480        }
481        if (docId != null) {
482            return Collections.singletonList(newDocumentLocation(docId));
483        }
484        return Collections.emptyList();
485    }
486
487    protected DocumentLocation newDocumentLocation(String docId) {
488        return new DocumentLocationImpl(repositoryName, new IdRef(docId));
489    }
490
491    @Override
492    public boolean isDocumentTree() {
493        return isTree;
494    }
495
496    @Override
497    public String getWorkInstanceResult() {
498        return null;
499    }
500
501    /**
502     * Releases the transaction resources by committing the existing transaction (if any). This is recommended before
503     * running a long process.
504     */
505    public void commitOrRollbackTransaction() {
506        if (TransactionHelper.isTransactionActiveOrMarkedRollback()) {
507            TransactionHelper.commitOrRollbackTransaction();
508        }
509    }
510
511    /**
512     * Starts a new transaction.
513     * <p>
514     * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long
515     * process.
516     *
517     * @return true if a new transaction was started
518     */
519    public boolean startTransaction() {
520        return TransactionHelper.startTransaction();
521    }
522
523    @Override
524    public boolean equals(Object other) {
525        if (!(other instanceof Work)) {
526            return false;
527        }
528        return ((Work) other).getId().equals(getId());
529    }
530
531    @Override
532    public int hashCode() {
533        return getId().hashCode();
534    }
535
536}