001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql; 020 021import java.util.concurrent.CyclicBarrier; 022import java.util.concurrent.TimeUnit; 023 024/** 025 * Helper class to run jobs in lock step in several threads. 026 * <p> 027 * You should override the {@link #job} method and make it execute code where blocks are wrapped in: 028 * 029 * <pre> 030 * if (thread(1)) { 031 * // code to execute only in thread 1 032 * } 033 * </pre> 034 * 035 * The parameter to {@link #thread} should be 1, 2, 3... depending on the thread you want this block to be executed in. 036 * <p> 037 * After you created the job instances, run the whole process by calling: 038 * 039 * <pre> 040 * LockStepJob.run(job1, job2, job3...); 041 * </pre> 042 * 043 * @since 5.7 044 */ 045public abstract class LockStepJob implements Runnable { 046 047 protected int n; 048 049 protected CyclicBarrier barrier; 050 051 protected Throwable throwable; 052 053 /** Run the thread n (1, 2...). */ 054 public void initialize(int n, CyclicBarrier barrier) { 055 this.n = n; 056 this.barrier = barrier; 057 } 058 059 @Override 060 public void run() { 061 try { 062 job(); 063 } catch (Throwable t) { 064 // System.err.println("Exception in thread " + 065 // Thread.currentThread()); 066 // t.printStackTrace(); 067 throwable = t; 068 } 069 } 070 071 /** 072 * Method to call around each part to be executed by a single thread. 073 * 074 * @param which which thread is concerned 075 * @return {@code true} if the code should be executed 076 */ 077 public boolean thread(int which) throws Exception { 078 // sync all threads 079 barrier.await(30, TimeUnit.SECONDS); // throws on timeout 080 // execute this step if in the right thread 081 return which == n; 082 } 083 084 /** 085 * Override this to define the actual job to execute in multiple threads. 086 */ 087 public abstract void job() throws Exception; 088 089 public static void run(LockStepJob... jobs) throws Exception { 090 int n = jobs.length; 091 CyclicBarrier barrier = new CyclicBarrier(n); 092 for (int i = 0; i < n; i++) { 093 jobs[i].initialize(i + 1, barrier); 094 } 095 Thread[] threads = new Thread[n]; 096 try { 097 for (int i = 0; i < n; i++) { 098 threads[i] = new Thread(jobs[i], "test-" + (i + 1)); 099 threads[i].start(); 100 } 101 for (int i = 0; i < n; i++) { 102 threads[i].join(); 103 threads[i] = null; 104 } 105 Exception exception = new RuntimeException("failed"); 106 for (int i = 0; i < n; i++) { 107 Throwable t = jobs[i].throwable; 108 if (t != null) { 109 exception.addSuppressed(t); 110 } 111 } 112 if (exception.getSuppressed().length > 0) { 113 throw exception; 114 } 115 } finally { 116 // error condition recovery 117 for (int i = 0; i < n; i++) { 118 if (threads[i] != null) { 119 threads[i].interrupt(); 120 } 121 } 122 } 123 } 124 125}