001/* 002 * Copyright (c) 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012package org.nuxeo.ecm.core.storage.sql; 013 014import java.util.concurrent.CyclicBarrier; 015import java.util.concurrent.TimeUnit; 016 017/** 018 * Helper class to run jobs in lock step in several threads. 019 * <p> 020 * You should override the {@link #job} method and make it execute code where blocks are wrapped in: 021 * 022 * <pre> 023 * if (thread(1)) { 024 * // code to execute only in thread 1 025 * } 026 * </pre> 027 * 028 * The parameter to {@link #thread} should be 1, 2, 3... depending on the thread you want this block to be executed in. 029 * <p> 030 * After you created the job instances, run the whole process by calling: 031 * 032 * <pre> 033 * LockStepJob.run(job1, job2, job3...); 034 * </pre> 035 * 036 * @since 5.7 037 */ 038public abstract class LockStepJob implements Runnable { 039 040 protected int n; 041 042 protected CyclicBarrier barrier; 043 044 protected Throwable throwable; 045 046 /** Run the thread n (1, 2...). */ 047 public void initialize(int n, CyclicBarrier barrier) { 048 this.n = n; 049 this.barrier = barrier; 050 } 051 052 @Override 053 public void run() { 054 try { 055 job(); 056 } catch (Throwable t) { 057 // System.err.println("Exception in thread " + 058 // Thread.currentThread()); 059 // t.printStackTrace(); 060 throwable = t; 061 } 062 } 063 064 /** 065 * Method to call around each part to be executed by a single thread. 066 * 067 * @param which which thread is concerned 068 * @return {@code true} if the code should be executed 069 */ 070 public boolean thread(int which) throws Exception { 071 // sync all threads 072 barrier.await(30, TimeUnit.SECONDS); // throws on timeout 073 // execute this step if in the right thread 074 return which == n; 075 } 076 077 /** 078 * Override this to define the actual job to execute in multiple threads. 079 */ 080 public abstract void job() throws Exception; 081 082 public static void run(LockStepJob... jobs) throws Exception { 083 int n = jobs.length; 084 CyclicBarrier barrier = new CyclicBarrier(n); 085 for (int i = 0; i < n; i++) { 086 jobs[i].initialize(i + 1, barrier); 087 } 088 Thread[] threads = new Thread[n]; 089 try { 090 for (int i = 0; i < n; i++) { 091 threads[i] = new Thread(jobs[i], "test-" + (i + 1)); 092 threads[i].start(); 093 } 094 for (int i = 0; i < n; i++) { 095 threads[i].join(); 096 threads[i] = null; 097 } 098 Exception exception = new RuntimeException("failed"); 099 for (int i = 0; i < n; i++) { 100 Throwable t = jobs[i].throwable; 101 if (t != null) { 102 exception.addSuppressed(t); 103 } 104 } 105 if (exception.getSuppressed().length > 0) { 106 throw exception; 107 } 108 } finally { 109 // error condition recovery 110 for (int i = 0; i < n; i++) { 111 if (threads[i] != null) { 112 threads[i].interrupt(); 113 } 114 } 115 } 116 } 117 118}