001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Benoit Delbosc
019 */
020
021package org.nuxeo.elasticsearch.listener;
022
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import javax.naming.NamingException;
029import javax.transaction.RollbackException;
030import javax.transaction.Status;
031import javax.transaction.Synchronization;
032import javax.transaction.SystemException;
033import javax.transaction.TransactionManager;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.event.Event;
038import org.nuxeo.ecm.core.event.EventListener;
039import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
040import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
041import org.nuxeo.elasticsearch.commands.IndexingCommand;
042import org.nuxeo.elasticsearch.commands.IndexingCommands;
043import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047/**
048 * Synchronous Event listener used to record indexing command, submitted after commit completion.
049 */
050public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization {
051
052    private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class);
053
054    protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = new ThreadLocal<Map<String, IndexingCommands>>() {
055        @Override
056        protected HashMap<String, IndexingCommands> initialValue() {
057            return new HashMap<>();
058        }
059    };
060
061    protected static ThreadLocal<Boolean> isEnlisted = new ThreadLocal<Boolean>() {
062        @Override
063        protected Boolean initialValue() {
064            return Boolean.FALSE;
065        }
066    };
067
068    @Override
069    protected Map<String, IndexingCommands> getAllCommands() {
070        return transactionCommands.get();
071    }
072
073    @Override
074    protected boolean isSyncIndexingByDefault() {
075        Boolean ret = useSyncIndexing.get();
076        if (ret == null) {
077            ret = false;
078        }
079        return ret;
080    }
081
082    @Override
083    public void handleEvent(Event event) {
084        String eventId = event.getName();
085        if (!isEnlisted.get()) {
086            if (event.isCommitEvent()) {
087                // manual flush on save if TxManager is not hooked
088                afterCompletion(Status.STATUS_COMMITTED);
089                return;
090            }
091            // try to enlist our listener
092            isEnlisted.set(registerSynchronization(this));
093        }
094        if (!(event.getContext() instanceof DocumentEventContext)) {
095            // don't process Events that are not tied to Documents
096            return;
097        }
098        DocumentEventContext docCtx = (DocumentEventContext) event.getContext();
099        stackCommand(docCtx, eventId);
100    }
101
102    @Override
103    public void beforeCompletion() {
104
105    }
106
107    @Override
108    public void afterCompletion(int status) {
109        try {
110            if (getAllCommands().isEmpty()) {
111                // return and un hook the current listener even if there's no commands to index
112                // unless, during next transaction this listener won't be hooked to it
113                return;
114            }
115            if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) {
116                return;
117            }
118            List<IndexingCommand> commandList = new ArrayList<>();
119            for (IndexingCommands cmds : getAllCommands().values()) {
120                for (IndexingCommand cmd : cmds.getCommands()) {
121                    commandList.add(cmd);
122                }
123            }
124            ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class);
125            esi.runIndexingWorker(commandList);
126        } finally {
127            isEnlisted.set(false);
128            getAllCommands().clear();
129            useSyncIndexing.set(null);
130        }
131    }
132
133    public static ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() {
134        @Override
135        protected Boolean initialValue() {
136            return Boolean.FALSE;
137        }
138
139        @Override
140        public void set(Boolean value) {
141            super.set(value);
142            if (Boolean.TRUE.equals(value)) {
143                // switch existing stack to sync
144                for (IndexingCommands cmds : transactionCommands.get().values()) {
145                    for (IndexingCommand cmd : cmds.getCommands()) {
146                        cmd.makeSync();
147                    }
148                }
149            }
150        }
151    };
152
153    protected boolean registerSynchronization(Synchronization sync) {
154        try {
155            TransactionManager tm = TransactionHelper.lookupTransactionManager();
156            if (tm != null) {
157                if (tm.getTransaction() != null) {
158                    tm.getTransaction().registerSynchronization(sync);
159                    return true;
160                }
161                if (!Framework.isTestModeSet()) {
162                    log.error("Unable to register synchronization : no active transaction");
163                }
164                return false;
165            } else {
166                log.error("Unable to register synchronization : no TransactionManager");
167                return false;
168            }
169        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
170            log.error("Unable to register synchronization", e);
171            return false;
172        }
173    }
174
175}