001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Benoit Delbosc
019 */
020
021package org.nuxeo.elasticsearch.listener;
022
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import javax.naming.NamingException;
029import javax.transaction.RollbackException;
030import javax.transaction.Status;
031import javax.transaction.Synchronization;
032import javax.transaction.SystemException;
033import javax.transaction.TransactionManager;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.event.Event;
038import org.nuxeo.ecm.core.event.EventListener;
039import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
040import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
041import org.nuxeo.elasticsearch.commands.IndexingCommand;
042import org.nuxeo.elasticsearch.commands.IndexingCommands;
043import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047/**
048 * Synchronous Event listener used to record indexing command, submitted after commit completion.
049 */
050public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization {
051
052    private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class);
053
054    protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = new ThreadLocal<Map<String, IndexingCommands>>() {
055        @Override
056        protected HashMap<String, IndexingCommands> initialValue() {
057            return new HashMap<>();
058        }
059    };
060
061    protected static ThreadLocal<Boolean> isEnlisted = new ThreadLocal<Boolean>() {
062        @Override
063        protected Boolean initialValue() {
064            return Boolean.FALSE;
065        }
066    };
067
068    @Override
069    protected Map<String, IndexingCommands> getAllCommands() {
070        return transactionCommands.get();
071    }
072
073    @Override
074    protected boolean isSyncIndexingByDefault() {
075        Boolean ret = useSyncIndexing.get();
076        if (ret == null) {
077            ret = false;
078        }
079        return ret;
080    }
081
082    @Override
083    public void handleEvent(Event event) {
084        String eventId = event.getName();
085        if (!isEnlisted.get()) {
086            if (event.isCommitEvent()) {
087                // manual flush on save if TxManager is not hooked
088                afterCompletion(Status.STATUS_COMMITTED);
089                return;
090            }
091            // try to enlist our listener
092            isEnlisted.set(registerSynchronization(this));
093        }
094        if (!(event.getContext() instanceof DocumentEventContext)) {
095            // don't process Events that are not tied to Documents
096            return;
097        }
098        DocumentEventContext docCtx = (DocumentEventContext) event.getContext();
099        stackCommand(docCtx, eventId);
100    }
101
102    @Override
103    public void beforeCompletion() {
104
105    }
106
107    @Override
108    public void afterCompletion(int status) {
109        if (getAllCommands().isEmpty()) {
110            return;
111        }
112        try {
113            if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) {
114                return;
115            }
116            List<IndexingCommand> commandList = new ArrayList<>();
117            for (IndexingCommands cmds : getAllCommands().values()) {
118                for (IndexingCommand cmd : cmds.getCommands()) {
119                    commandList.add(cmd);
120                }
121            }
122            ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class);
123            esi.runIndexingWorker(commandList);
124        } finally {
125            isEnlisted.set(false);
126            getAllCommands().clear();
127            useSyncIndexing.set(null);
128        }
129    }
130
131    public static ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() {
132        @Override
133        protected Boolean initialValue() {
134            return Boolean.FALSE;
135        }
136
137        @Override
138        public void set(Boolean value) {
139            super.set(value);
140            if (Boolean.TRUE.equals(value)) {
141                // switch existing stack to sync
142                for (IndexingCommands cmds : transactionCommands.get().values()) {
143                    for (IndexingCommand cmd : cmds.getCommands()) {
144                        cmd.makeSync();
145                    }
146                }
147            }
148        }
149    };
150
151    protected boolean registerSynchronization(Synchronization sync) {
152        try {
153            TransactionManager tm = TransactionHelper.lookupTransactionManager();
154            if (tm != null) {
155                if (tm.getTransaction() != null) {
156                    tm.getTransaction().registerSynchronization(sync);
157                    return true;
158                }
159                if (!Framework.isTestModeSet()) {
160                    log.error("Unable to register synchronization : no active transaction");
161                }
162                return false;
163            } else {
164                log.error("Unable to register synchronization : no TransactionManager");
165                return false;
166            }
167        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
168            log.error("Unable to register synchronization", e);
169            return false;
170        }
171    }
172
173}