001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 */
019package org.nuxeo.segment.io.listener;
020
021import java.io.Serializable;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.stream.Collectors;
026
027import javax.security.auth.login.LoginContext;
028import javax.security.auth.login.LoginException;
029
030import org.nuxeo.ecm.core.api.NuxeoException;
031import org.nuxeo.ecm.core.api.NuxeoPrincipal;
032import org.nuxeo.ecm.core.event.Event;
033import org.nuxeo.ecm.core.event.EventBundle;
034import org.nuxeo.ecm.core.event.PostCommitFilteringEventListener;
035import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
036import org.nuxeo.runtime.api.Framework;
037import org.nuxeo.segment.io.SegmentIO;
038import org.nuxeo.segment.io.SegmentIOMapper;
039
040public class SegmentIOAsyncListener implements PostCommitFilteringEventListener {
041
042    @Override
043    public boolean acceptEvent(Event event) {
044        SegmentIO service = Framework.getService(SegmentIO.class);
045        return service.getMappedEvents().contains(event.getName());
046    }
047
048    @Override
049    public void handleEvent(EventBundle bundle) {
050
051        SegmentIO service = Framework.getService(SegmentIO.class);
052
053        List<String> eventToProcess = service.getMappedEvents()
054                                             .stream()
055                                             .filter(event -> bundle.containsEventName(event))
056                                             .collect(Collectors.toList());
057
058        Map<String, List<SegmentIOMapper>> event2Mappers = service.getMappers(eventToProcess);
059
060        try {
061            // Force system login in order to have access to user directory
062            LoginContext login = Framework.login();
063            try {
064                processEvents(event2Mappers, bundle);
065            } finally {
066                if (login != null) {
067                    login.logout();
068                }
069            }
070        } catch (LoginException e) {
071            throw new NuxeoException(e);
072        }
073    }
074
075    protected void processEvents(Map<String, List<SegmentIOMapper>> event2Mappers, EventBundle bundle) {
076
077        for (Event event : bundle) {
078            List<SegmentIOMapper> mappers = event2Mappers.get(event.getName());
079            if (mappers == null || mappers.size() == 0) {
080                continue;
081            }
082
083            for (SegmentIOMapper mapper : mappers) {
084                Map<String, Object> ctx = new HashMap<>();
085
086                NuxeoPrincipal principal = event.getContext().getPrincipal();
087                SegmentIO service = Framework.getService(SegmentIO.class);
088                if (!service.mustTrackprincipal(principal.getName())) {
089                    break;
090                }
091
092                ctx.put("event", event);
093                ctx.put("eventContext", event.getContext());
094                ctx.put("principal", principal);
095                if (event.getContext() instanceof DocumentEventContext) {
096                    DocumentEventContext docCtx = (DocumentEventContext) event.getContext();
097                    ctx.put("doc", docCtx.getSourceDocument());
098                    ctx.put("repository", docCtx.getRepositoryName());
099                    ctx.put("session", docCtx.getCoreSession());
100                    ctx.put("dest", docCtx.getDestination());
101                }
102
103                Map<String, Serializable> mapped = mapper.getMappedData(ctx);
104                if (mapper.isIdentify()) {
105                    service.identify(principal, mapped);
106                } else if (mapper.isPage()) {
107                    service.page(principal, getNameWithDefault(event, "page"), mapped);
108                } else if (mapper.isScreen()) {
109                    service.screen(principal, getNameWithDefault(event, "screen"), mapped);
110                } else {
111                    service.track(principal, event.getName(), mapped);
112                }
113            }
114        }
115    }
116
117    protected String getNameWithDefault(Event e, String defaultName) {
118        return (String) e.getContext().getProperties().getOrDefault("name", defaultName);
119    }
120}