001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     tiry
018 */
019package org.nuxeo.ecm.core.event.pipe;
020
021import org.nuxeo.ecm.core.event.EventBundle;
022
023import java.util.Map;
024
025/**
026 * @since 8.4
027 */
028public abstract class AbstractEventBundlePipe<T> implements EventBundlePipe {
029
030    protected String name;
031
032    protected Map<String, String> params;
033
034    @Override
035    public void initPipe(String name, Map<String, String> params) {
036        this.name = name;
037        this.params = params;
038
039        // XXX get contributions
040    }
041
042    protected String getName() {
043        return name;
044    }
045
046    protected Map<String, String> getParameters() {
047        return params;
048    }
049
050    @Override
051    public void sendEventBundle(EventBundle events) {
052        events = filterBundle(events);
053        if (events.isEmpty()) {
054            return;
055        }
056        preProcessBundle(events);
057        send(marshall(events));
058    }
059
060    protected EventBundle filterBundle(EventBundle events) {
061        // XXX handle contributions
062
063        // remove events from bundles
064        // remove bundles that are not interesting
065        // typical use case : filter events before forwarding to an external bus
066
067        return events;
068    }
069
070    protected void preProcessBundle(EventBundle events) {
071        // XXX handle contributions
072
073        // enrich events bundle
074        // typical use case : resolve events extended attributes for Audit pipe
075
076        return;
077    }
078
079    /**
080     * de-hydrate the EventBundle to make it suitable for transmission on a Bus
081     *
082     * @param events
083     * @return
084     */
085    protected abstract T marshall(EventBundle events);
086
087    /**
088     * Do the actual push on the Bus
089     *
090     * @param message
091     */
092    protected abstract void send(T message);
093
094
095    @Override
096    public void shutdown() throws InterruptedException {
097
098    }
099
100}