001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020 021package org.nuxeo.elasticsearch.listener; 022 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import javax.naming.NamingException; 029import javax.transaction.RollbackException; 030import javax.transaction.Status; 031import javax.transaction.Synchronization; 032import javax.transaction.SystemException; 033import javax.transaction.TransactionManager; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.event.Event; 038import org.nuxeo.ecm.core.event.EventListener; 039import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 040import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 041import org.nuxeo.elasticsearch.commands.IndexingCommand; 042import org.nuxeo.elasticsearch.commands.IndexingCommands; 043import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047/** 048 * Synchronous Event listener used to record indexing command, submitted after commit completion. 049 */ 050public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization { 051 052 private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class); 053 054 protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = new ThreadLocal<Map<String, IndexingCommands>>() { 055 @Override 056 protected HashMap<String, IndexingCommands> initialValue() { 057 return new HashMap<>(); 058 } 059 }; 060 061 protected static ThreadLocal<Boolean> isEnlisted = new ThreadLocal<Boolean>() { 062 @Override 063 protected Boolean initialValue() { 064 return Boolean.FALSE; 065 } 066 }; 067 068 @Override 069 protected Map<String, IndexingCommands> getAllCommands() { 070 return transactionCommands.get(); 071 } 072 073 @Override 074 protected boolean isSyncIndexingByDefault() { 075 Boolean ret = useSyncIndexing.get(); 076 if (ret == null) { 077 ret = false; 078 } 079 return ret; 080 } 081 082 @Override 083 public void handleEvent(Event event) { 084 String eventId = event.getName(); 085 if (!isEnlisted.get()) { 086 if (event.isCommitEvent()) { 087 // manual flush on save if TxManager is not hooked 088 afterCompletion(Status.STATUS_COMMITTED); 089 return; 090 } 091 // try to enlist our listener 092 isEnlisted.set(registerSynchronization(this)); 093 } 094 if (!(event.getContext() instanceof DocumentEventContext)) { 095 // don't process Events that are not tied to Documents 096 return; 097 } 098 DocumentEventContext docCtx = (DocumentEventContext) event.getContext(); 099 stackCommand(docCtx, eventId); 100 } 101 102 @Override 103 public void beforeCompletion() { 104 105 } 106 107 @Override 108 public void afterCompletion(int status) { 109 try { 110 if (getAllCommands().isEmpty()) { 111 // return and un hook the current listener even if there's no commands to index 112 // unless, during next transaction this listener won't be hooked to it 113 return; 114 } 115 if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) { 116 return; 117 } 118 List<IndexingCommand> commandList = new ArrayList<>(); 119 for (IndexingCommands cmds : getAllCommands().values()) { 120 for (IndexingCommand cmd : cmds.getCommands()) { 121 commandList.add(cmd); 122 } 123 } 124 ElasticSearchIndexing esi = Framework.getLocalService(ElasticSearchIndexing.class); 125 esi.runIndexingWorker(commandList); 126 } finally { 127 isEnlisted.set(false); 128 getAllCommands().clear(); 129 useSyncIndexing.set(null); 130 } 131 } 132 133 public static ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() { 134 @Override 135 protected Boolean initialValue() { 136 return Boolean.FALSE; 137 } 138 139 @Override 140 public void set(Boolean value) { 141 super.set(value); 142 if (Boolean.TRUE.equals(value)) { 143 // switch existing stack to sync 144 for (IndexingCommands cmds : transactionCommands.get().values()) { 145 for (IndexingCommand cmd : cmds.getCommands()) { 146 cmd.makeSync(); 147 } 148 } 149 } 150 } 151 }; 152 153 protected boolean registerSynchronization(Synchronization sync) { 154 try { 155 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 156 if (tm != null) { 157 if (tm.getTransaction() != null) { 158 tm.getTransaction().registerSynchronization(sync); 159 return true; 160 } 161 if (!Framework.isTestModeSet()) { 162 log.error("Unable to register synchronization : no active transaction"); 163 } 164 return false; 165 } else { 166 log.error("Unable to register synchronization : no TransactionManager"); 167 return false; 168 } 169 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 170 log.error("Unable to register synchronization", e); 171 return false; 172 } 173 } 174 175}