001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.commands; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.codehaus.jackson.JsonFactory; 025import org.codehaus.jackson.JsonGenerator; 026import org.codehaus.jackson.JsonNode; 027import org.codehaus.jackson.map.ObjectMapper; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.ecm.core.api.CoreSessionService; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.api.DocumentRef; 032import org.nuxeo.ecm.core.api.IdRef; 033import org.nuxeo.runtime.api.Framework; 034 035import java.io.IOException; 036import java.io.PrintWriter; 037import java.io.Serializable; 038import java.io.StringWriter; 039import java.util.ArrayList; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.concurrent.atomic.AtomicLong; 044 045/** 046 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly" 047 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to 048 * process them. 049 */ 050public class IndexingCommand implements Serializable { 051 052 private static final Log log = LogFactory.getLog(IndexingCommand.class); 053 054 private static final long serialVersionUID = 1L; 055 056 public enum Type { 057 INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN, 058 } 059 060 public static final String PREFIX = "IxCd-"; 061 062 protected String id; 063 064 protected Type type; 065 066 protected boolean sync; 067 068 protected boolean recurse; 069 070 protected String targetDocumentId; 071 072 protected String path; 073 074 protected String repositoryName; 075 076 protected List<String> schemas; 077 078 protected long order; 079 080 protected transient String sessionId; 081 082 protected transient static AtomicLong seq = new AtomicLong(0); 083 084 protected IndexingCommand() { 085 } 086 087 /** 088 * Create an indexing command 089 * 090 * @param document the target document 091 * @param commandType the type of command 092 * @param sync if true the command will be processed on the same thread after transaction completion and the 093 * Elasticsearch index will be refresh 094 * @param recurse the command affect the document and all its descendants 095 */ 096 public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) { 097 id = PREFIX + seq.incrementAndGet(); 098 type = commandType; 099 this.sync = sync; 100 this.recurse = recurse; 101 if ((sync && recurse) && commandType != Type.DELETE) { 102 // we don't want sync and recursive command 103 throw new IllegalArgumentException("Recurse and synchronous command is not allowed: cmd: " + this 104 + ", doc: " + document); 105 } 106 if (document == null) { 107 throw new IllegalArgumentException("Target document is null for: " + this); 108 } 109 DocumentModel targetDocument = getValidTargetDocument(document); 110 repositoryName = targetDocument.getRepositoryName(); 111 targetDocumentId = targetDocument.getId(); 112 sessionId = targetDocument.getSessionId(); 113 path = targetDocument.getPathAsString(); 114 if (targetDocumentId == null) { 115 throw new IllegalArgumentException("Target document has a null uid: " + this); 116 } 117 } 118 119 protected DocumentModel getValidTargetDocument(DocumentModel target) { 120 if (target.getId() != null) { 121 return target; 122 } 123 if (target.getRef() == null || target.getCoreSession() == null) { 124 throw new IllegalArgumentException("Invalid target document: " + target); 125 } 126 // transient document try to get it from its path 127 DocumentRef documentRef = target.getRef(); 128 log.warn("Creating indexing command on a document with a null id, ref: " + documentRef 129 + ", trying to get the docId from its path, activate trace level for more info " + this); 130 if (log.isTraceEnabled()) { 131 Throwable throwable = new Throwable(); 132 StringWriter stack = new StringWriter(); 133 throwable.printStackTrace(new PrintWriter(stack)); 134 log.trace("You should use a document returned by session.createDocument, stack " + stack.toString()); 135 } 136 return target.getCoreSession().getDocument(documentRef); 137 } 138 139 public void attach(CoreSession session) { 140 if (!session.getRepositoryName().equals(repositoryName)) { 141 throw new IllegalArgumentException("Invalid session, expected repo: " + repositoryName + " actual: " 142 + session.getRepositoryName()); 143 } 144 sessionId = session.getSessionId(); 145 assert sessionId != null : "Attach to session with a null sessionId"; 146 } 147 148 /** 149 * Return the document or null if it does not exists anymore. 150 * 151 * @throws java.lang.IllegalStateException if there is no session attached 152 */ 153 public DocumentModel getTargetDocument() { 154 CoreSession session = null; 155 if (sessionId != null) { 156 session = Framework.getService(CoreSessionService.class).getCoreSession(sessionId); 157 } 158 if (session == null) { 159 throw new IllegalStateException("Command is not attached to a valid session: " + this); 160 } 161 IdRef idref = new IdRef(targetDocumentId); 162 if (!session.exists(idref)) { 163 // Doc was deleted : no way we can fetch it 164 return null; 165 } 166 return session.getDocument(idref); 167 } 168 169 public String getRepositoryName() { 170 return repositoryName; 171 } 172 173 /** 174 * @return true if merged 175 */ 176 public boolean merge(IndexingCommand other) { 177 if (canBeMerged(other)) { 178 merge(other.sync, other.recurse); 179 return true; 180 } 181 return false; 182 } 183 184 protected void merge(boolean sync, boolean recurse) { 185 this.sync = this.sync || sync; 186 this.recurse = this.recurse || recurse; 187 } 188 189 protected boolean canBeMerged(IndexingCommand other) { 190 if (type != other.type) { 191 return false; 192 } 193 if (type == Type.DELETE) { 194 // we support recursive sync deletion 195 return true; 196 } 197 // only if the result is not a sync and recurse command 198 return !((other.sync || sync) && (other.recurse || recurse)); 199 } 200 201 public boolean isSync() { 202 return sync; 203 } 204 205 public boolean isRecurse() { 206 return recurse; 207 } 208 209 public Type getType() { 210 return type; 211 } 212 213 public String toJSON() throws IOException { 214 StringWriter out = new StringWriter(); 215 JsonFactory factory = new JsonFactory(); 216 JsonGenerator jsonGen = factory.createJsonGenerator(out); 217 toJSON(jsonGen); 218 out.flush(); 219 jsonGen.close(); 220 return out.toString(); 221 } 222 223 public void toJSON(JsonGenerator jsonGen) throws IOException { 224 jsonGen.writeStartObject(); 225 jsonGen.writeStringField("id", id); 226 jsonGen.writeStringField("type", String.format("%s", type)); 227 jsonGen.writeStringField("docId", getTargetDocumentId()); 228 jsonGen.writeStringField("path", path); 229 jsonGen.writeStringField("repo", getRepositoryName()); 230 jsonGen.writeBooleanField("recurse", recurse); 231 jsonGen.writeBooleanField("sync", sync); 232 jsonGen.writeNumberField("order", getOrder()); 233 jsonGen.writeEndObject(); 234 } 235 236 /** 237 * Create a command from a JSON string. 238 * 239 * @throws IllegalArgumentException if json is invalid or command is invalid 240 */ 241 public static IndexingCommand fromJSON(String json) { 242 JsonFactory jsonFactory = new JsonFactory(); 243 ObjectMapper mapper = new ObjectMapper(jsonFactory); 244 try { 245 return fromJSON(mapper.readTree(json)); 246 } catch (IOException e) { 247 throw new IllegalArgumentException("Invalid JSON: " + json, e); 248 } 249 } 250 251 public static IndexingCommand fromJSON(JsonNode jsonNode) { 252 IndexingCommand cmd = new IndexingCommand(); 253 Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.getFields(); 254 while (fieldsIterator.hasNext()) { 255 Map.Entry<String, JsonNode> field = fieldsIterator.next(); 256 String key = field.getKey(); 257 JsonNode value = field.getValue(); 258 if (value.isNull()) { 259 continue; 260 } 261 if ("type".equals(key)) { 262 cmd.type = Type.valueOf(value.getTextValue()); 263 } else if ("docId".equals(key)) { 264 cmd.targetDocumentId = value.getTextValue(); 265 } else if ("path".equals(key)) { 266 cmd.path = value.getTextValue(); 267 } else if ("repo".equals(key)) { 268 cmd.repositoryName = value.getTextValue(); 269 } else if ("id".equals(key)) { 270 cmd.id = value.getTextValue(); 271 } else if ("recurse".equals(key)) { 272 cmd.recurse = value.getBooleanValue(); 273 } else if ("sync".equals(key)) { 274 cmd.sync = value.getBooleanValue(); 275 } 276 } 277 if (cmd.targetDocumentId == null) { 278 throw new IllegalArgumentException("Document uid is null: " + cmd); 279 } 280 if (cmd.type == null) { 281 throw new IllegalArgumentException("Invalid type: " + cmd); 282 } 283 return cmd; 284 } 285 286 public String getId() { 287 return id; 288 } 289 290 public String getTargetDocumentId() { 291 return targetDocumentId; 292 } 293 294 public IndexingCommand clone(DocumentModel newDoc) { 295 return new IndexingCommand(newDoc, type, sync, recurse); 296 } 297 298 public String[] getSchemas() { 299 String[] ret = null; 300 if (schemas != null && schemas.size() > 0) { 301 ret = schemas.toArray(new String[schemas.size()]); 302 } 303 return ret; 304 } 305 306 public void addSchemas(String schema) { 307 if (schemas == null) { 308 schemas = new ArrayList<>(); 309 } 310 if (!schemas.contains(schema)) { 311 schemas.add(schema); 312 } 313 } 314 315 @Override 316 public String toString() { 317 try { 318 return toJSON(); 319 } catch (IOException e) { 320 return super.toString(); 321 } 322 } 323 324 /** 325 * Try to make the command synchronous. Recurse command will stay in async for update. 326 */ 327 public void makeSync() { 328 if (!sync) { 329 if (!recurse || type == Type.DELETE) { 330 sync = true; 331 if (log.isDebugEnabled()) { 332 log.debug("Turn command into sync: " + toString()); 333 } 334 } 335 } 336 } 337 338 // @since 8.3 339 public long getOrder() { 340 return order; 341 } 342 343 // @since 8.3 344 public void setOrder(long order) { 345 this.order = order; 346 } 347}