001/* 002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020 021package org.nuxeo.elasticsearch.listener; 022 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import javax.naming.NamingException; 029import javax.transaction.RollbackException; 030import javax.transaction.Status; 031import javax.transaction.Synchronization; 032import javax.transaction.SystemException; 033import javax.transaction.TransactionManager; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.event.Event; 038import org.nuxeo.ecm.core.event.EventListener; 039import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 040import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 041import org.nuxeo.elasticsearch.commands.IndexingCommand; 042import org.nuxeo.elasticsearch.commands.IndexingCommands; 043import org.nuxeo.elasticsearch.commands.IndexingCommandsStacker; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047/** 048 * Synchronous Event listener used to record indexing command, submitted after commit completion. 049 */ 050public class ElasticSearchInlineListener extends IndexingCommandsStacker implements EventListener, Synchronization { 051 052 private static final Log log = LogFactory.getLog(ElasticSearchInlineListener.class); 053 054 protected static ThreadLocal<Map<String, IndexingCommands>> transactionCommands = ThreadLocal.withInitial( 055 HashMap::new); 056 057 public static final ThreadLocal<Boolean> useSyncIndexing = new ThreadLocal<Boolean>() { 058 @Override 059 protected Boolean initialValue() { 060 return Boolean.FALSE; 061 } 062 063 @Override 064 public void set(Boolean value) { 065 super.set(value); 066 if (Boolean.TRUE.equals(value)) { 067 // switch existing stack to sync 068 for (IndexingCommands cmds : transactionCommands.get().values()) { 069 for (IndexingCommand cmd : cmds.getCommands()) { 070 cmd.makeSync(); 071 } 072 } 073 } 074 } 075 }; 076 077 protected static ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE); 078 079 @Override 080 protected Map<String, IndexingCommands> getAllCommands() { 081 return transactionCommands.get(); 082 } 083 084 @Override 085 protected boolean isSyncIndexingByDefault() { 086 Boolean ret = useSyncIndexing.get(); 087 if (ret == null) { 088 ret = false; 089 } 090 return ret; 091 } 092 093 @Override 094 public void handleEvent(Event event) { 095 String eventId = event.getName(); 096 if (!isEnlisted.get()) { 097 if (event.isCommitEvent()) { 098 // manual flush on save if TxManager is not hooked 099 afterCompletion(Status.STATUS_COMMITTED); 100 return; 101 } 102 // try to enlist our listener 103 isEnlisted.set(registerSynchronization(this)); 104 } 105 if (!(event.getContext() instanceof DocumentEventContext)) { 106 // don't process Events that are not tied to Documents 107 return; 108 } 109 DocumentEventContext docCtx = (DocumentEventContext) event.getContext(); 110 stackCommand(docCtx, eventId); 111 } 112 113 @Override 114 public void beforeCompletion() { 115 116 } 117 118 @Override 119 public void afterCompletion(int status) { 120 try { 121 if (getAllCommands().isEmpty()) { 122 // return and un hook the current listener even if there's no commands to index 123 // unless, during next transaction this listener won't be hooked to it 124 return; 125 } 126 if (Status.STATUS_MARKED_ROLLBACK == status || Status.STATUS_ROLLEDBACK == status) { 127 return; 128 } 129 List<IndexingCommand> commandList = new ArrayList<>(); 130 for (IndexingCommands cmds : getAllCommands().values()) { 131 for (IndexingCommand cmd : cmds.getCommands()) { 132 commandList.add(cmd); 133 } 134 } 135 ElasticSearchIndexing esi = Framework.getService(ElasticSearchIndexing.class); 136 esi.runIndexingWorker(commandList); 137 } finally { 138 isEnlisted.set(false); 139 getAllCommands().clear(); 140 useSyncIndexing.set(null); 141 } 142 } 143 144 protected boolean registerSynchronization(Synchronization sync) { 145 try { 146 TransactionManager tm = TransactionHelper.lookupTransactionManager(); 147 if (tm != null) { 148 if (tm.getTransaction() != null) { 149 tm.getTransaction().registerSynchronization(sync); 150 return true; 151 } 152 if (!Framework.isTestModeSet()) { 153 log.error("Unable to register synchronization : no active transaction"); 154 } 155 return false; 156 } else { 157 log.error("Unable to register synchronization : no TransactionManager"); 158 return false; 159 } 160 } catch (NamingException | IllegalStateException | SystemException | RollbackException e) { 161 log.error("Unable to register synchronization", e); 162 return false; 163 } 164 } 165 166}