001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.commands; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.codehaus.jackson.JsonFactory; 025import org.codehaus.jackson.JsonGenerator; 026import org.codehaus.jackson.JsonNode; 027import org.codehaus.jackson.map.ObjectMapper; 028import org.nuxeo.ecm.core.api.CoreInstance; 029import org.nuxeo.ecm.core.api.CoreSession; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.api.DocumentRef; 032import org.nuxeo.ecm.core.api.IdRef; 033 034import java.io.IOException; 035import java.io.PrintWriter; 036import java.io.Serializable; 037import java.io.StringWriter; 038import java.util.ArrayList; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Map; 042import java.util.UUID; 043 044/** 045 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly" 046 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to 047 * process them. 048 */ 049public class IndexingCommand implements Serializable { 050 051 private static final Log log = LogFactory.getLog(IndexingCommand.class); 052 053 private static final long serialVersionUID = 1L; 054 055 public enum Type { 056 INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN, 057 } 058 059 public static final String PREFIX = "IndexingCommand-"; 060 061 protected String id; 062 063 protected Type type; 064 065 protected boolean sync; 066 067 protected boolean recurse; 068 069 protected String targetDocumentId; 070 071 protected String path; 072 073 protected String repositoryName; 074 075 protected List<String> schemas; 076 077 protected transient String sessionId; 078 079 protected IndexingCommand() { 080 } 081 082 /** 083 * Create an indexing command 084 * 085 * @param document the target document 086 * @param commandType the type of command 087 * @param sync if true the command will be processed on the same thread after transaction completion and the 088 * Elasticsearch index will be refresh 089 * @param recurse the command affect the document and all its descendants 090 */ 091 public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) { 092 id = PREFIX + UUID.randomUUID().toString(); 093 type = commandType; 094 this.sync = sync; 095 this.recurse = recurse; 096 if ((sync && recurse) && commandType != Type.DELETE) { 097 // we don't want sync and recursive command 098 throw new IllegalArgumentException("Recurse and synchronous command is not allowed: cmd: " + this 099 + ", doc: " + document); 100 } 101 if (document == null) { 102 throw new IllegalArgumentException("Target document is null for: " + this); 103 } 104 DocumentModel targetDocument = getValidTargetDocument(document); 105 repositoryName = targetDocument.getRepositoryName(); 106 targetDocumentId = targetDocument.getId(); 107 sessionId = targetDocument.getSessionId(); 108 path = targetDocument.getPathAsString(); 109 if (targetDocumentId == null) { 110 throw new IllegalArgumentException("Target document has a null uid: " + this); 111 } 112 } 113 114 protected DocumentModel getValidTargetDocument(DocumentModel target) { 115 if (target.getId() != null) { 116 return target; 117 } 118 if (target.getRef() == null || target.getCoreSession() == null) { 119 throw new IllegalArgumentException("Invalid target document: " + target); 120 } 121 // transient document try to get it from its path 122 DocumentRef documentRef = target.getRef(); 123 log.warn("Creating indexing command on a document with a null id, ref: " + documentRef 124 + ", trying to get the docId from its path, activate trace level for more info " + this); 125 if (log.isTraceEnabled()) { 126 Throwable throwable = new Throwable(); 127 StringWriter stack = new StringWriter(); 128 throwable.printStackTrace(new PrintWriter(stack)); 129 log.trace("You should use a document returned by session.createDocument, stack " + stack.toString()); 130 } 131 return target.getCoreSession().getDocument(documentRef); 132 } 133 134 public void attach(CoreSession session) { 135 if (!session.getRepositoryName().equals(repositoryName)) { 136 throw new IllegalArgumentException("Invalid session, expected repo: " + repositoryName + " actual: " 137 + session.getRepositoryName()); 138 } 139 sessionId = session.getSessionId(); 140 assert sessionId != null : "Attach to session with a null sessionId"; 141 } 142 143 /** 144 * Return the document or null if it does not exists anymore. 145 * 146 * @throws java.lang.IllegalStateException if there is no session attached 147 */ 148 public DocumentModel getTargetDocument() { 149 CoreSession session = null; 150 if (sessionId != null) { 151 session = CoreInstance.getInstance().getSession(sessionId); 152 } 153 if (session == null) { 154 throw new IllegalStateException("Command is not attached to a valid session: " + this); 155 } 156 IdRef idref = new IdRef(targetDocumentId); 157 if (!session.exists(idref)) { 158 // Doc was deleted : no way we can fetch it 159 return null; 160 } 161 return session.getDocument(idref); 162 } 163 164 public String getRepositoryName() { 165 return repositoryName; 166 } 167 168 /** 169 * @return true if merged 170 */ 171 public boolean merge(IndexingCommand other) { 172 if (canBeMerged(other)) { 173 merge(other.sync, other.recurse); 174 return true; 175 } 176 return false; 177 } 178 179 protected void merge(boolean sync, boolean recurse) { 180 this.sync = this.sync || sync; 181 this.recurse = this.recurse || recurse; 182 } 183 184 protected boolean canBeMerged(IndexingCommand other) { 185 if (type != other.type) { 186 return false; 187 } 188 if (type == Type.DELETE) { 189 // we support recursive sync deletion 190 return true; 191 } 192 // only if the result is not a sync and recurse command 193 return !((other.sync || sync) && (other.recurse || recurse)); 194 } 195 196 public boolean isSync() { 197 return sync; 198 } 199 200 public boolean isRecurse() { 201 return recurse; 202 } 203 204 public Type getType() { 205 return type; 206 } 207 208 public String toJSON() throws IOException { 209 StringWriter out = new StringWriter(); 210 JsonFactory factory = new JsonFactory(); 211 JsonGenerator jsonGen = factory.createJsonGenerator(out); 212 toJSON(jsonGen); 213 out.flush(); 214 jsonGen.close(); 215 return out.toString(); 216 } 217 218 public void toJSON(JsonGenerator jsonGen) throws IOException { 219 jsonGen.writeStartObject(); 220 jsonGen.writeStringField("id", id); 221 jsonGen.writeStringField("type", String.format("%s", type)); 222 jsonGen.writeStringField("docId", getTargetDocumentId()); 223 jsonGen.writeStringField("path", path); 224 jsonGen.writeStringField("repo", getRepositoryName()); 225 jsonGen.writeBooleanField("recurse", recurse); 226 jsonGen.writeBooleanField("sync", sync); 227 jsonGen.writeEndObject(); 228 } 229 230 /** 231 * Create a command from a JSON string. 232 * 233 * @throws IllegalArgumentException if json is invalid or command is invalid 234 */ 235 public static IndexingCommand fromJSON(String json) { 236 JsonFactory jsonFactory = new JsonFactory(); 237 ObjectMapper mapper = new ObjectMapper(jsonFactory); 238 try { 239 return fromJSON(mapper.readTree(json)); 240 } catch (IOException e) { 241 throw new IllegalArgumentException("Invalid JSON: " + json, e); 242 } 243 } 244 245 public static IndexingCommand fromJSON(JsonNode jsonNode) { 246 IndexingCommand cmd = new IndexingCommand(); 247 Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.getFields(); 248 while (fieldsIterator.hasNext()) { 249 Map.Entry<String, JsonNode> field = fieldsIterator.next(); 250 String key = field.getKey(); 251 JsonNode value = field.getValue(); 252 if (value.isNull()) { 253 continue; 254 } 255 if ("type".equals(key)) { 256 cmd.type = Type.valueOf(value.getTextValue()); 257 } else if ("docId".equals(key)) { 258 cmd.targetDocumentId = value.getTextValue(); 259 } else if ("path".equals(key)) { 260 cmd.path = value.getTextValue(); 261 } else if ("repo".equals(key)) { 262 cmd.repositoryName = value.getTextValue(); 263 } else if ("id".equals(key)) { 264 cmd.id = value.getTextValue(); 265 } else if ("recurse".equals(key)) { 266 cmd.recurse = value.getBooleanValue(); 267 } else if ("sync".equals(key)) { 268 cmd.sync = value.getBooleanValue(); 269 } 270 } 271 if (cmd.targetDocumentId == null) { 272 throw new IllegalArgumentException("Document uid is null: " + cmd); 273 } 274 if (cmd.type == null) { 275 throw new IllegalArgumentException("Invalid type: " + cmd); 276 } 277 return cmd; 278 } 279 280 public String getId() { 281 return id; 282 } 283 284 public String getTargetDocumentId() { 285 return targetDocumentId; 286 } 287 288 public IndexingCommand clone(DocumentModel newDoc) { 289 return new IndexingCommand(newDoc, type, sync, recurse); 290 } 291 292 public String[] getSchemas() { 293 String[] ret = null; 294 if (schemas != null && schemas.size() > 0) { 295 ret = schemas.toArray(new String[schemas.size()]); 296 } 297 return ret; 298 } 299 300 public void addSchemas(String schema) { 301 if (schemas == null) { 302 schemas = new ArrayList<>(); 303 } 304 if (!schemas.contains(schema)) { 305 schemas.add(schema); 306 } 307 } 308 309 @Override 310 public String toString() { 311 try { 312 return toJSON(); 313 } catch (IOException e) { 314 return super.toString(); 315 } 316 } 317 318 /** 319 * Try to make the command synchronous. Recurse command will stay in async for update. 320 */ 321 public void makeSync() { 322 if (!sync) { 323 if (!recurse || type == Type.DELETE) { 324 sync = true; 325 if (log.isDebugEnabled()) { 326 log.debug("Turn command into sync: " + toString()); 327 } 328 } 329 } 330 } 331 332}