001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.commands; 021 022import java.io.IOException; 023import java.io.PrintWriter; 024import java.io.Serializable; 025import java.io.StringWriter; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.atomic.AtomicLong; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.CoreSession; 035import org.nuxeo.ecm.core.api.CoreSessionService; 036import org.nuxeo.ecm.core.api.DocumentModel; 037import org.nuxeo.ecm.core.api.DocumentRef; 038import org.nuxeo.ecm.core.api.IdRef; 039import org.nuxeo.runtime.api.Framework; 040 041import com.fasterxml.jackson.core.JsonFactory; 042import com.fasterxml.jackson.core.JsonGenerator; 043import com.fasterxml.jackson.databind.JsonNode; 044import com.fasterxml.jackson.databind.ObjectMapper; 045 046/** 047 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly" 048 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to 049 * process them. 050 */ 051public class IndexingCommand implements Serializable { 052 053 private static final Log log = LogFactory.getLog(IndexingCommand.class); 054 055 private static final long serialVersionUID = 1L; 056 057 public enum Type { 058 INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN, 059 } 060 061 public static final String PREFIX = "IxCd-"; 062 063 protected String id; 064 065 protected Type type; 066 067 protected boolean sync; 068 069 protected boolean recurse; 070 071 protected String targetDocumentId; 072 073 protected String path; 074 075 protected String repositoryName; 076 077 protected List<String> schemas; 078 079 protected long order; 080 081 protected transient String sessionId; 082 083 protected transient static AtomicLong seq = new AtomicLong(0); 084 085 protected IndexingCommand() { 086 } 087 088 /** 089 * Create an indexing command 090 * 091 * @param document the target document 092 * @param commandType the type of command 093 * @param sync if true the command will be processed on the same thread after transaction completion and the 094 * Elasticsearch index will be refresh 095 * @param recurse the command affect the document and all its descendants 096 */ 097 public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) { 098 id = PREFIX + seq.incrementAndGet(); 099 type = commandType; 100 this.sync = sync; 101 this.recurse = recurse; 102 if ((sync && recurse) && commandType != Type.DELETE) { 103 // we don't want sync and recursive command 104 throw new IllegalArgumentException( 105 "Recurse and synchronous command is not allowed: cmd: " + this + ", doc: " + document); 106 } 107 if (document == null) { 108 throw new IllegalArgumentException("Target document is null for: " + this); 109 } 110 DocumentModel targetDocument = getValidTargetDocument(document); 111 repositoryName = targetDocument.getRepositoryName(); 112 targetDocumentId = targetDocument.getId(); 113 sessionId = targetDocument.getSessionId(); 114 path = targetDocument.getPathAsString(); 115 if (targetDocumentId == null) { 116 throw new IllegalArgumentException("Target document has a null uid: " + this); 117 } 118 } 119 120 protected DocumentModel getValidTargetDocument(DocumentModel target) { 121 if (target.getId() != null) { 122 return target; 123 } 124 if (target.getRef() == null || target.getCoreSession() == null) { 125 throw new IllegalArgumentException("Invalid target document: " + target); 126 } 127 // transient document try to get it from its path 128 DocumentRef documentRef = target.getRef(); 129 log.warn("Creating indexing command on a document with a null id, ref: " + documentRef 130 + ", trying to get the docId from its path, activate trace level for more info " + this); 131 if (log.isTraceEnabled()) { 132 Throwable throwable = new Throwable(); 133 StringWriter stack = new StringWriter(); 134 throwable.printStackTrace(new PrintWriter(stack)); 135 log.trace("You should use a document returned by session.createDocument, stack " + stack.toString()); 136 } 137 return target.getCoreSession().getDocument(documentRef); 138 } 139 140 public void attach(CoreSession session) { 141 if (!session.getRepositoryName().equals(repositoryName)) { 142 throw new IllegalArgumentException( 143 "Invalid session, expected repo: " + repositoryName + " actual: " + session.getRepositoryName()); 144 } 145 sessionId = session.getSessionId(); 146 assert sessionId != null : "Attach to session with a null sessionId"; 147 } 148 149 /** 150 * Return the document or null if it does not exists anymore. 151 * 152 * @throws java.lang.IllegalStateException if there is no session attached 153 */ 154 public DocumentModel getTargetDocument() { 155 CoreSession session = null; 156 if (sessionId != null) { 157 session = Framework.getService(CoreSessionService.class).getCoreSession(sessionId); 158 } 159 if (session == null) { 160 throw new IllegalStateException("Command is not attached to a valid session: " + this); 161 } 162 IdRef idref = new IdRef(targetDocumentId); 163 if (!session.exists(idref)) { 164 // Doc was deleted : no way we can fetch it 165 return null; 166 } 167 return session.getDocument(idref); 168 } 169 170 public String getRepositoryName() { 171 return repositoryName; 172 } 173 174 /** 175 * @return true if merged 176 */ 177 public boolean merge(IndexingCommand other) { 178 if (canBeMerged(other)) { 179 merge(other.sync, other.recurse); 180 return true; 181 } 182 return false; 183 } 184 185 protected void merge(boolean sync, boolean recurse) { 186 this.sync = this.sync || sync; 187 this.recurse = this.recurse || recurse; 188 } 189 190 protected boolean canBeMerged(IndexingCommand other) { 191 if (type != other.type) { 192 return false; 193 } 194 if (type == Type.DELETE) { 195 // we support recursive sync deletion 196 return true; 197 } 198 // only if the result is not a sync and recurse command 199 return !((other.sync || sync) && (other.recurse || recurse)); 200 } 201 202 public boolean isSync() { 203 return sync; 204 } 205 206 public boolean isRecurse() { 207 return recurse; 208 } 209 210 public Type getType() { 211 return type; 212 } 213 214 public String toJSON() throws IOException { 215 StringWriter out = new StringWriter(); 216 JsonFactory factory = new JsonFactory(); 217 JsonGenerator jsonGen = factory.createGenerator(out); 218 toJSON(jsonGen); 219 out.flush(); 220 jsonGen.close(); 221 return out.toString(); 222 } 223 224 public void toJSON(JsonGenerator jsonGen) throws IOException { 225 jsonGen.writeStartObject(); 226 jsonGen.writeStringField("id", id); 227 jsonGen.writeStringField("type", String.format("%s", type)); 228 jsonGen.writeStringField("docId", getTargetDocumentId()); 229 jsonGen.writeStringField("path", path); 230 jsonGen.writeStringField("repo", getRepositoryName()); 231 jsonGen.writeBooleanField("recurse", recurse); 232 jsonGen.writeBooleanField("sync", sync); 233 jsonGen.writeNumberField("order", getOrder()); 234 jsonGen.writeEndObject(); 235 } 236 237 /** 238 * Create a command from a JSON string. 239 * 240 * @throws IllegalArgumentException if json is invalid or command is invalid 241 */ 242 public static IndexingCommand fromJSON(String json) { 243 JsonFactory jsonFactory = new JsonFactory(); 244 ObjectMapper mapper = new ObjectMapper(jsonFactory); 245 try { 246 return fromJSON(mapper.readTree(json)); 247 } catch (IOException e) { 248 throw new IllegalArgumentException("Invalid JSON: " + json, e); 249 } 250 } 251 252 public static IndexingCommand fromJSON(JsonNode jsonNode) { 253 IndexingCommand cmd = new IndexingCommand(); 254 Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.fields(); 255 while (fieldsIterator.hasNext()) { 256 Map.Entry<String, JsonNode> field = fieldsIterator.next(); 257 String key = field.getKey(); 258 JsonNode value = field.getValue(); 259 if (value.isNull()) { 260 continue; 261 } 262 if ("type".equals(key)) { 263 cmd.type = Type.valueOf(value.textValue()); 264 } else if ("docId".equals(key)) { 265 cmd.targetDocumentId = value.textValue(); 266 } else if ("path".equals(key)) { 267 cmd.path = value.textValue(); 268 } else if ("repo".equals(key)) { 269 cmd.repositoryName = value.textValue(); 270 } else if ("id".equals(key)) { 271 cmd.id = value.textValue(); 272 } else if ("recurse".equals(key)) { 273 cmd.recurse = value.booleanValue(); 274 } else if ("sync".equals(key)) { 275 cmd.sync = value.booleanValue(); 276 } 277 } 278 if (cmd.targetDocumentId == null) { 279 throw new IllegalArgumentException("Document uid is null: " + cmd); 280 } 281 if (cmd.type == null) { 282 throw new IllegalArgumentException("Invalid type: " + cmd); 283 } 284 return cmd; 285 } 286 287 public String getId() { 288 return id; 289 } 290 291 public String getTargetDocumentId() { 292 return targetDocumentId; 293 } 294 295 public IndexingCommand clone(DocumentModel newDoc) { 296 return new IndexingCommand(newDoc, type, sync, recurse); 297 } 298 299 public String[] getSchemas() { 300 String[] ret = null; 301 if (schemas != null && schemas.size() > 0) { 302 ret = schemas.toArray(new String[schemas.size()]); 303 } 304 return ret; 305 } 306 307 public void addSchemas(String schema) { 308 if (schemas == null) { 309 schemas = new ArrayList<>(); 310 } 311 if (!schemas.contains(schema)) { 312 schemas.add(schema); 313 } 314 } 315 316 @Override 317 public String toString() { 318 try { 319 return toJSON(); 320 } catch (IOException e) { 321 return super.toString(); 322 } 323 } 324 325 /** 326 * Try to make the command synchronous. Recurse command will stay in async for update. 327 */ 328 public void makeSync() { 329 if (!sync) { 330 if (!recurse || type == Type.DELETE) { 331 sync = true; 332 if (log.isDebugEnabled()) { 333 log.debug("Turn command into sync: " + toString()); 334 } 335 } 336 } 337 } 338 339 // @since 8.3 340 public long getOrder() { 341 return order; 342 } 343 344 // @since 8.3 345 public void setOrder(long order) { 346 this.order = order; 347 } 348}