001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Nuxeo - initial API and implementation
018 *
019 * $Id$
020 */
021
022package org.nuxeo.ecm.core.event.impl;
023
024import java.io.Serializable;
025import java.rmi.dgc.VMID;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Map.Entry;
032
033import javax.security.auth.login.LoginContext;
034import javax.security.auth.login.LoginException;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.api.CloseableCoreSession;
039import org.nuxeo.ecm.core.api.CoreInstance;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.DocumentNotFoundException;
043import org.nuxeo.ecm.core.api.DocumentRef;
044import org.nuxeo.ecm.core.event.DeletedDocumentModel;
045import org.nuxeo.ecm.core.event.Event;
046import org.nuxeo.ecm.core.event.EventBundle;
047import org.nuxeo.ecm.core.event.EventContext;
048import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
049import org.nuxeo.runtime.api.Framework;
050
051/**
052 * Default implementation for an {@link EventBundle} that need to be reconnected to a usable Session.
053 *
054 * @author tiry
055 */
056public class ReconnectedEventBundleImpl implements ReconnectedEventBundle {
057
058    private static final long serialVersionUID = 1L;
059
060    protected EventBundle sourceEventBundle;
061
062    /** Lister name or names. */
063    protected String listenerName;
064
065    protected transient List<Event> reconnectedEvents;
066
067    protected transient LoginContext loginCtx;
068
069    protected transient CloseableCoreSession reconnectedCoreSession;
070
071    private static final Log log = LogFactory.getLog(ReconnectedEventBundleImpl.class);
072
073    protected ReconnectedEventBundleImpl() {
074    }
075
076    public ReconnectedEventBundleImpl(EventBundle sourceEventBundle) {
077        this.sourceEventBundle = sourceEventBundle;
078    }
079
080    /** @since 5.6 */
081    public ReconnectedEventBundleImpl(EventBundle sourceEventBundle, String listenerName) {
082        this.sourceEventBundle = sourceEventBundle;
083        this.listenerName = listenerName;
084    }
085
086    protected CoreSession getReconnectedCoreSession(String repoName) {
087        if (reconnectedCoreSession == null) {
088            try {
089                loginCtx = Framework.login();
090            } catch (LoginException e) {
091                log.error("Cannot log in", e);
092                return null;
093            }
094            reconnectedCoreSession = CoreInstance.openCoreSessionSystem(repoName);
095        } else {
096            // Sanity Check
097            if (!reconnectedCoreSession.getRepositoryName().equals(repoName)) {
098                if (repoName != null) {
099                    throw new IllegalStateException("Can no reconnected a Bundle tied to several Core instances !");
100                }
101            }
102        }
103        return reconnectedCoreSession;
104    }
105
106    protected List<Event> getReconnectedEvents() {
107        if (reconnectedEvents == null) {
108            reconnectedEvents = new ArrayList<Event>();
109            for (Event event : sourceEventBundle) {
110                EventContext ctx = event.getContext();
111                CoreSession session = ctx.getRepositoryName() == null ? null
112                        : getReconnectedCoreSession(ctx.getRepositoryName());
113
114                List<Object> newArgs = new ArrayList<Object>();
115                for (Object arg : ctx.getArguments()) {
116                    Object newArg = arg;
117                    if (refetchDocumentModel(session, arg) && session.getPrincipal() != null) {
118                        DocumentModel oldDoc = (DocumentModel) arg;
119                        DocumentRef ref = oldDoc.getRef();
120                        if (ref != null) {
121                            try {
122                                if (session.exists(oldDoc.getRef())) {
123                                    newArg = session.getDocument(oldDoc.getRef());
124                                } else {
125                                    // probably deleted doc
126                                    newArg = new DeletedDocumentModel(oldDoc);
127                                }
128                            } catch (DocumentNotFoundException e) {
129                                log.error("Can not refetch Doc with ref " + ref.toString(), e);
130                            }
131                        }
132                    }
133                    // XXX treat here other cases !!!!
134                    newArgs.add(newArg);
135                }
136
137                EventContext newCtx = null;
138                if (ctx instanceof DocumentEventContext) {
139                    newCtx = new DocumentEventContext(session, ctx.getPrincipal(), (DocumentModel) newArgs.get(0),
140                            (DocumentRef) newArgs.get(1));
141                } else {
142                    newCtx = new EventContextImpl(session, ctx.getPrincipal());
143                    ((EventContextImpl) newCtx).setArgs(newArgs.toArray());
144                }
145
146                Map<String, Serializable> newProps = new HashMap<String, Serializable>();
147                for (Entry<String, Serializable> prop : ctx.getProperties().entrySet()) {
148                    Serializable propValue = prop.getValue();
149                    if (refetchDocumentModel(session, propValue)) {
150                        DocumentModel oldDoc = (DocumentModel) propValue;
151                        DocumentRef oldRef = oldDoc.getRef();
152                        try {
153                            if (session.exists(oldRef)) {
154                                propValue = session.getDocument(oldRef);
155                            } else {
156                                log.warn("Listener " + (listenerName == null ? "" : "'" + listenerName + "' ")
157                                        + "cannot refetch missing document: " + oldRef + " ("
158                                        + oldDoc.getPathAsString() + ")");
159                            }
160                        } catch (DocumentNotFoundException e) {
161                            log.error("Can not refetch Doc with ref " + oldRef, e);
162                        }
163                    }
164                    // XXX treat here other cases !!!!
165                    newProps.put(prop.getKey(), propValue);
166                }
167                newCtx.setProperties(newProps);
168                Event newEvt = new EventImpl(event.getName(), newCtx, event.getFlags(), event.getTime());
169                reconnectedEvents.add(newEvt);
170            }
171        }
172        return reconnectedEvents;
173    }
174
175    protected boolean refetchDocumentModel(CoreSession session, Object eventProperty) {
176        if (eventProperty instanceof DocumentModel && session != null) {
177            DocumentModel doc = (DocumentModel) eventProperty;
178            if (Boolean.TRUE.equals(doc.getContextData(SKIP_REFETCH_DOCUMENT_CONTEXT_KEY))) {
179                return false;
180            }
181            return true;
182        }
183        return false;
184    }
185
186    @Override
187    public String getName() {
188        return sourceEventBundle.getName();
189    }
190
191    @Override
192    public VMID getSourceVMID() {
193        return sourceEventBundle.getSourceVMID();
194    }
195
196    @Override
197    public boolean hasRemoteSource() {
198        return sourceEventBundle.hasRemoteSource();
199    }
200
201    @Override
202    public boolean isEmpty() {
203        return sourceEventBundle.isEmpty();
204    }
205
206    @Override
207    public Event peek() {
208        return getReconnectedEvents().get(0);
209    }
210
211    @Override
212    public void push(Event event) {
213        throw new UnsupportedOperationException();
214    }
215
216    @Override
217    public int size() {
218        return sourceEventBundle.size();
219    }
220
221    @Override
222    public Iterator<Event> iterator() {
223        return getReconnectedEvents().iterator();
224    }
225
226    @Override
227    public void disconnect() {
228        if (reconnectedCoreSession != null) {
229            reconnectedCoreSession.close();
230        }
231        reconnectedCoreSession = null;
232        reconnectedEvents = null;
233        if (loginCtx != null) {
234            try {
235                loginCtx.logout();
236            } catch (LoginException e) {
237                log.error("Cannot log out", e);
238            } finally {
239                loginCtx = null;
240            }
241        }
242    }
243
244    @Override
245    public boolean comesFromJMS() {
246        return false;
247    }
248
249    @Override
250    public boolean containsEventName(String eventName) {
251        return sourceEventBundle.containsEventName(eventName);
252    }
253
254    public List<String> getEventNames() {
255        List<String> eventNames = new ArrayList<String>();
256        for (Event event : sourceEventBundle) {
257            eventNames.add(event.getName());
258        }
259        return eventNames;
260    }
261}