001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Thierry Delprat 016 * Benoit Delbosc 017 */ 018 019package org.nuxeo.elasticsearch.listener; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025 026import javax.naming.NamingException; 027import javax.transaction.RollbackException; 028import javax.transaction.Status; 029import javax.transaction.Synchronization; 030import javax.transaction.SystemException; 031import javax.transaction.TransactionManager; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.core.event.Event; 036import org.nuxeo.ecm.core.event.EventListener; 037import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 038import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 039import org.nuxeo.elasticsearch.commands.IndexingCommand; 040import org.nuxeo.elasticsearch.commands.IndexingCommands; 041import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.transaction.TransactionHelper; 044 045/** 046 * Synchronous Event listener used to record indexing command, submitted after commit completion. 047 */ 048public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization { 049 050 private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class); 051 052 protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = new ThreadLocal<Map<String, IndexingCommands>>() { 053 @Override 054 protected HashMap<String, IndexingCommands> initialValue() { 055 return new HashMap<>(); 056 } 057 }; 058 059 protected static ThreadLocal<Boolean> isEnlisted = new ThreadLocal<Boolean>() { 060 @Override 061 protected Boolean initialValue() { 062 return Boolean.FALSE; 063 } 064 }; 065 066 @Override 067 protected Map<String, IndexingCommands> getAllCommands() { 068 return transactionCommands.get(); 069 } 070 071 @Override 072 protected boolean isSyncIndexingByDefault() { 073 Boolean ret = useSyncIndexing.get(); 074 if (ret == null) { 075 ret = false; 076 } 077 return ret; 078 } 079 080 @Override 081 public void handleEvent(Event event) { 082 String eventId = event.getName(); 083 if (!isEnlisted.get()) { 084 if (event.isCommitEvent()) { 085 // manual flush on save if TxManager is not hooked 086 afterCompletion(Status.STATUS_COMMITTED); 087 return; 088 } 089 // try to enlist our listener 090 isEnlisted.set(registerSynchronization(this)); 091 } 092 if (!(event.getContext() instanceof DocumentEventContext)) { 093 // don't process Events that are not tied to Documents 094 return; 095 } 096 DocumentEventContext docCtx = (DocumentEventContext) event.getContext(); 097 stackCommand(docCtx, eventId); 098 } 099 100 @Override 101 public void beforeCompletion() { 102 103 } 104 105 @Override 106 public void afterCompletion(int status) { 107 if (getAllCommands().isEmpty()) { 108 return; 109 } 110 try { 111 if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) { 112 return; 113 } 114 List<IndexingCommand> commandList = new ArrayList<>(); 115 for (IndexingCommands cmds : getAllCommands().values()) { 116 for (IndexingCommand cmd : cmds.getCommands()) { 117 commandList.add(cmd); 118 } 119 } 120 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 121 esi.runIndexingWorker(commandList); 122 } finally { 123 isEnlisted.set(false); 124 getAllCommands().clear(); 125 useSyncIndexing.set(null); 126 } 127 } 128 129 public static ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() { 130 @Override 131 protected Boolean initialValue() { 132 return Boolean.FALSE; 133 } 134 135 @Override 136 public void set(Boolean value) { 137 super.set(value); 138 if (Boolean.TRUE.equals(value)) { 139 // switch existing stack to sync 140 for (IndexingCommands cmds : transactionCommands.get().values()) { 141 for (IndexingCommand cmd : cmds.getCommands()) { 142 cmd.makeSync(); 143 } 144 } 145 } 146 } 147 }; 148 149 protected boolean registerSynchronization(Synchronization sync) { 150 try { 151 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 152 if (tm != null) { 153 if (tm.getTransaction() != null) { 154 tm.getTransaction().registerSynchronization(sync); 155 return true; 156 } 157 if (!Framework.isTestModeSet()) { 158 log.error("Unable to register synchronization : no active transaction"); 159 } 160 return false; 161 } else { 162 log.error("Unable to register synchronization : no TransactionManager"); 163 return false; 164 } 165 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 166 log.error("Unable to register synchronization", e); 167 return false; 168 } 169 } 170 171}