001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * tiry 018 */ 019package org.nuxeo.ecm.core.event.pipe.dispatch; 020 021import org.nuxeo.ecm.core.event.EventBundle; 022import org.nuxeo.ecm.core.event.pipe.EventBundlePipe; 023import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor; 024 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028 029/** 030 * Basic implementation that simply forwards {@link EventBundle} to all underlying {@link EventBundlePipe} 031 * 032 * @since 8.4 033 */ 034public class SimpleEventBundlePipeDispatcher implements EventBundleDispatcher { 035 036 protected List<EventBundlePipe> pipes = new ArrayList<>(); 037 038 protected Map<String, String> parameters; 039 040 @Override 041 public void init(List<EventPipeDescriptor> pipeDescriptors, Map<String, String> parameters) { 042 043 this.parameters = parameters; 044 045 pipeDescriptors.sort((o1, o2) -> o1.getPriority().compareTo(o2.getPriority())); 046 047 for (EventPipeDescriptor descriptor : pipeDescriptors) { 048 EventBundlePipe pipe = descriptor.getInstance(); 049 pipe.initPipe(descriptor.getName(), descriptor.getParameters()); 050 pipes.add(pipe); 051 } 052 } 053 054 @Override 055 public void sendEventBundle(EventBundle events) { 056 if (events.isEmpty()) { 057 return; 058 } 059 for (EventBundlePipe pipe : pipes) { 060 pipe.sendEventBundle(events); 061 } 062 } 063 064 @Override 065 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 066 boolean res = true; 067 for (EventBundlePipe pipe : pipes) { 068 res = pipe.waitForCompletion(timeoutMillis) && res; 069 } 070 return res; 071 } 072 073 @Override 074 public void shutdown() throws InterruptedException { 075 for (EventBundlePipe pipe : pipes) { 076 pipe.shutdown(); 077 } 078 } 079}