001/*
002 * Copyright (c) 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.ecm.core.storage.sql;
013
014import java.util.concurrent.CyclicBarrier;
015import java.util.concurrent.TimeUnit;
016
017/**
018 * Helper class to run jobs in lock step in several threads.
019 * <p>
020 * You should override the {@link #job} method and make it execute code where blocks are wrapped in:
021 *
022 * <pre>
023 * if (thread(1)) {
024 *     // code to execute only in thread 1
025 * }
026 * </pre>
027 *
028 * The parameter to {@link #thread} should be 1, 2, 3... depending on the thread you want this block to be executed in.
029 * <p>
030 * After you created the job instances, run the whole process by calling:
031 *
032 * <pre>
033 * LockStepJob.run(job1, job2, job3...);
034 * </pre>
035 *
036 * @since 5.7
037 */
038public abstract class LockStepJob implements Runnable {
039
040    protected int n;
041
042    protected CyclicBarrier barrier;
043
044    protected Throwable throwable;
045
046    /** Run the thread n (1, 2...). */
047    public void initialize(int n, CyclicBarrier barrier) {
048        this.n = n;
049        this.barrier = barrier;
050    }
051
052    @Override
053    public void run() {
054        try {
055            job();
056        } catch (Throwable t) {
057            // System.err.println("Exception in thread " +
058            // Thread.currentThread());
059            // t.printStackTrace();
060            throwable = t;
061        }
062    }
063
064    /**
065     * Method to call around each part to be executed by a single thread.
066     *
067     * @param which which thread is concerned
068     * @return {@code true} if the code should be executed
069     */
070    public boolean thread(int which) throws Exception {
071        // sync all threads
072        barrier.await(30, TimeUnit.SECONDS); // throws on timeout
073        // execute this step if in the right thread
074        return which == n;
075    }
076
077    /**
078     * Override this to define the actual job to execute in multiple threads.
079     */
080    public abstract void job() throws Exception;
081
082    public static void run(LockStepJob... jobs) throws Exception {
083        int n = jobs.length;
084        CyclicBarrier barrier = new CyclicBarrier(n);
085        for (int i = 0; i < n; i++) {
086            jobs[i].initialize(i + 1, barrier);
087        }
088        Thread[] threads = new Thread[n];
089        try {
090            for (int i = 0; i < n; i++) {
091                threads[i] = new Thread(jobs[i], "test-" + (i + 1));
092                threads[i].start();
093            }
094            for (int i = 0; i < n; i++) {
095                threads[i].join();
096                threads[i] = null;
097            }
098            Exception exception = new RuntimeException("failed");
099            for (int i = 0; i < n; i++) {
100                Throwable t = jobs[i].throwable;
101                if (t != null) {
102                    exception.addSuppressed(t);
103                }
104            }
105            if (exception.getSuppressed().length > 0) {
106                throw exception;
107            }
108        } finally {
109            // error condition recovery
110            for (int i = 0; i < n; i++) {
111                if (threads[i] != null) {
112                    threads[i].interrupt();
113                }
114            }
115        }
116    }
117
118}