001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020 021package org.nuxeo.elasticsearch.listener; 022 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import javax.naming.NamingException; 029import javax.transaction.RollbackException; 030import javax.transaction.Status; 031import javax.transaction.Synchronization; 032import javax.transaction.SystemException; 033import javax.transaction.TransactionManager; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.event.Event; 038import org.nuxeo.ecm.core.event.EventListener; 039import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 040import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 041import org.nuxeo.elasticsearch.commands.IndexingCommand; 042import org.nuxeo.elasticsearch.commands.IndexingCommands; 043import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047/** 048 * Synchronous Event listener used to record indexing command, submitted after commit completion. 049 */ 050public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization { 051 052 private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class); 053 054 protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = new ThreadLocal<Map<String, IndexingCommands>>() { 055 @Override 056 protected HashMap<String, IndexingCommands> initialValue() { 057 return new HashMap<>(); 058 } 059 }; 060 061 protected static ThreadLocal<Boolean> isEnlisted = new ThreadLocal<Boolean>() { 062 @Override 063 protected Boolean initialValue() { 064 return Boolean.FALSE; 065 } 066 }; 067 068 @Override 069 protected Map<String, IndexingCommands> getAllCommands() { 070 return transactionCommands.get(); 071 } 072 073 @Override 074 protected boolean isSyncIndexingByDefault() { 075 Boolean ret = useSyncIndexing.get(); 076 if (ret == null) { 077 ret = false; 078 } 079 return ret; 080 } 081 082 @Override 083 public void handleEvent(Event event) { 084 String eventId = event.getName(); 085 if (!isEnlisted.get()) { 086 if (event.isCommitEvent()) { 087 // manual flush on save if TxManager is not hooked 088 afterCompletion(Status.STATUS_COMMITTED); 089 return; 090 } 091 // try to enlist our listener 092 isEnlisted.set(registerSynchronization(this)); 093 } 094 if (!(event.getContext() instanceof DocumentEventContext)) { 095 // don't process Events that are not tied to Documents 096 return; 097 } 098 DocumentEventContext docCtx = (DocumentEventContext) event.getContext(); 099 stackCommand(docCtx, eventId); 100 } 101 102 @Override 103 public void beforeCompletion() { 104 105 } 106 107 @Override 108 public void afterCompletion(int status) { 109 if (getAllCommands().isEmpty()) { 110 return; 111 } 112 try { 113 if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) { 114 return; 115 } 116 List<IndexingCommand> commandList = new ArrayList<>(); 117 for (IndexingCommands cmds : getAllCommands().values()) { 118 for (IndexingCommand cmd : cmds.getCommands()) { 119 commandList.add(cmd); 120 } 121 } 122 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 123 esi.runIndexingWorker(commandList); 124 } finally { 125 isEnlisted.set(false); 126 getAllCommands().clear(); 127 useSyncIndexing.set(null); 128 } 129 } 130 131 public static ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() { 132 @Override 133 protected Boolean initialValue() { 134 return Boolean.FALSE; 135 } 136 137 @Override 138 public void set(Boolean value) { 139 super.set(value); 140 if (Boolean.TRUE.equals(value)) { 141 // switch existing stack to sync 142 for (IndexingCommands cmds : transactionCommands.get().values()) { 143 for (IndexingCommand cmd : cmds.getCommands()) { 144 cmd.makeSync(); 145 } 146 } 147 } 148 } 149 }; 150 151 protected boolean registerSynchronization(Synchronization sync) { 152 try { 153 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 154 if (tm != null) { 155 if (tm.getTransaction() != null) { 156 tm.getTransaction().registerSynchronization(sync); 157 return true; 158 } 159 if (!Framework.isTestModeSet()) { 160 log.error("Unable to register synchronization : no active transaction"); 161 } 162 return false; 163 } else { 164 log.error("Unable to register synchronization : no TransactionManager"); 165 return false; 166 } 167 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 168 log.error("Unable to register synchronization", e); 169 return false; 170 } 171 } 172 173}