001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql;
020
021import java.util.concurrent.CyclicBarrier;
022import java.util.concurrent.TimeUnit;
023
024/**
025 * Helper class to run jobs in lock step in several threads.
026 * <p>
027 * You should override the {@link #job} method and make it execute code where blocks are wrapped in:
028 *
029 * <pre>
030 * if (thread(1)) {
031 *     // code to execute only in thread 1
032 * }
033 * </pre>
034 *
035 * The parameter to {@link #thread} should be 1, 2, 3... depending on the thread you want this block to be executed in.
036 * <p>
037 * After you created the job instances, run the whole process by calling:
038 *
039 * <pre>
040 * LockStepJob.run(job1, job2, job3...);
041 * </pre>
042 *
043 * @since 5.7
044 */
045public abstract class LockStepJob implements Runnable {
046
047    protected int n;
048
049    protected CyclicBarrier barrier;
050
051    protected Throwable throwable;
052
053    /** Run the thread n (1, 2...). */
054    public void initialize(int n, CyclicBarrier barrier) {
055        this.n = n;
056        this.barrier = barrier;
057    }
058
059    @Override
060    public void run() {
061        try {
062            job();
063        } catch (Throwable t) {
064            // System.err.println("Exception in thread " +
065            // Thread.currentThread());
066            // t.printStackTrace();
067            throwable = t;
068        }
069    }
070
071    /**
072     * Method to call around each part to be executed by a single thread.
073     *
074     * @param which which thread is concerned
075     * @return {@code true} if the code should be executed
076     */
077    public boolean thread(int which) throws Exception {
078        // sync all threads
079        barrier.await(30, TimeUnit.SECONDS); // throws on timeout
080        // execute this step if in the right thread
081        return which == n;
082    }
083
084    /**
085     * Override this to define the actual job to execute in multiple threads.
086     */
087    public abstract void job() throws Exception;
088
089    public static void run(LockStepJob... jobs) throws Exception {
090        int n = jobs.length;
091        CyclicBarrier barrier = new CyclicBarrier(n);
092        for (int i = 0; i < n; i++) {
093            jobs[i].initialize(i + 1, barrier);
094        }
095        Thread[] threads = new Thread[n];
096        try {
097            for (int i = 0; i < n; i++) {
098                threads[i] = new Thread(jobs[i], "test-" + (i + 1));
099                threads[i].start();
100            }
101            for (int i = 0; i < n; i++) {
102                threads[i].join();
103                threads[i] = null;
104            }
105            Exception exception = new RuntimeException("failed");
106            for (int i = 0; i < n; i++) {
107                Throwable t = jobs[i].throwable;
108                if (t != null) {
109                    exception.addSuppressed(t);
110                }
111            }
112            if (exception.getSuppressed().length > 0) {
113                throw exception;
114            }
115        } finally {
116            // error condition recovery
117            for (int i = 0; i < n; i++) {
118                if (threads[i] != null) {
119                    threads[i].interrupt();
120                }
121            }
122        }
123    }
124
125}