001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Thierry Delprat
016 *     Benoit Delbosc
017 */
018
019package org.nuxeo.elasticsearch.listener;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import javax.naming.NamingException;
027import javax.transaction.RollbackException;
028import javax.transaction.Status;
029import javax.transaction.Synchronization;
030import javax.transaction.SystemException;
031import javax.transaction.TransactionManager;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.core.event.Event;
036import org.nuxeo.ecm.core.event.EventListener;
037import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
038import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
039import org.nuxeo.elasticsearch.commands.IndexingCommand;
040import org.nuxeo.elasticsearch.commands.IndexingCommands;
041import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.transaction.TransactionHelper;
044
045/**
046 * Synchronous Event listener used to record indexing command, submitted after commit completion.
047 */
048public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization {
049
050    private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class);
051
052    protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = new ThreadLocal<Map<String, IndexingCommands>>() {
053        @Override
054        protected HashMap<String, IndexingCommands> initialValue() {
055            return new HashMap<>();
056        }
057    };
058
059    protected static ThreadLocal<Boolean> isEnlisted = new ThreadLocal<Boolean>() {
060        @Override
061        protected Boolean initialValue() {
062            return Boolean.FALSE;
063        }
064    };
065
066    @Override
067    protected Map<String, IndexingCommands> getAllCommands() {
068        return transactionCommands.get();
069    }
070
071    @Override
072    protected boolean isSyncIndexingByDefault() {
073        Boolean ret = useSyncIndexing.get();
074        if (ret == null) {
075            ret = false;
076        }
077        return ret;
078    }
079
080    @Override
081    public void handleEvent(Event event) {
082        String eventId = event.getName();
083        if (!isEnlisted.get()) {
084            if (event.isCommitEvent()) {
085                // manual flush on save if TxManager is not hooked
086                afterCompletion(Status.STATUS_COMMITTED);
087                return;
088            }
089            // try to enlist our listener
090            isEnlisted.set(registerSynchronization(this));
091        }
092        if (!(event.getContext() instanceof DocumentEventContext)) {
093            // don't process Events that are not tied to Documents
094            return;
095        }
096        DocumentEventContext docCtx = (DocumentEventContext) event.getContext();
097        stackCommand(docCtx, eventId);
098    }
099
100    @Override
101    public void beforeCompletion() {
102
103    }
104
105    @Override
106    public void afterCompletion(int status) {
107        if (getAllCommands().isEmpty()) {
108            return;
109        }
110        try {
111            if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) {
112                return;
113            }
114            List<IndexingCommand> commandList = new ArrayList<>();
115            for (IndexingCommands cmds : getAllCommands().values()) {
116                for (IndexingCommand cmd : cmds.getCommands()) {
117                    commandList.add(cmd);
118                }
119            }
120            ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class);
121            esi.runIndexingWorker(commandList);
122        } finally {
123            isEnlisted.set(false);
124            getAllCommands().clear();
125            useSyncIndexing.set(null);
126        }
127    }
128
129    public static ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() {
130        @Override
131        protected Boolean initialValue() {
132            return Boolean.FALSE;
133        }
134
135        @Override
136        public void set(Boolean value) {
137            super.set(value);
138            if (Boolean.TRUE.equals(value)) {
139                // switch existing stack to sync
140                for (IndexingCommands cmds : transactionCommands.get().values()) {
141                    for (IndexingCommand cmd : cmds.getCommands()) {
142                        cmd.makeSync();
143                    }
144                }
145            }
146        }
147    };
148
149    protected boolean registerSynchronization(Synchronization sync) {
150        try {
151            TransactionManager tm = TransactionHelper.lookupTransactionManager();
152            if (tm != null) {
153                if (tm.getTransaction() != null) {
154                    tm.getTransaction().registerSynchronization(sync);
155                    return true;
156                }
157                if (!Framework.isTestModeSet()) {
158                    log.error("Unable to register synchronization : no active transaction");
159                }
160                return false;
161            } else {
162                log.error("Unable to register synchronization : no TransactionManager");
163                return false;
164            }
165        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
166            log.error("Unable to register synchronization", e);
167            return false;
168        }
169    }
170
171}