001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Thierry Delprat 016 * Benoit Delbosc 017 */ 018package org.nuxeo.elasticsearch.commands; 019 020import org.apache.commons.logging.Log; 021import org.apache.commons.logging.LogFactory; 022import org.codehaus.jackson.JsonFactory; 023import org.codehaus.jackson.JsonGenerator; 024import org.codehaus.jackson.JsonNode; 025import org.codehaus.jackson.map.ObjectMapper; 026import org.nuxeo.ecm.core.api.CoreInstance; 027import org.nuxeo.ecm.core.api.CoreSession; 028import org.nuxeo.ecm.core.api.DocumentModel; 029import org.nuxeo.ecm.core.api.DocumentRef; 030import org.nuxeo.ecm.core.api.IdRef; 031 032import java.io.IOException; 033import java.io.PrintWriter; 034import java.io.Serializable; 035import java.io.StringWriter; 036import java.util.ArrayList; 037import java.util.Iterator; 038import java.util.List; 039import java.util.Map; 040import java.util.UUID; 041 042/** 043 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly" 044 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to 045 * process them. 046 */ 047public class IndexingCommand implements Serializable { 048 049 private static final Log log = LogFactory.getLog(IndexingCommand.class); 050 051 private static final long serialVersionUID = 1L; 052 053 public enum Type { 054 INSERT, UPDATE, UPDATE_SECURITY, DELETE 055 } 056 057 public static final String PREFIX = "IndexingCommand-"; 058 059 protected String id; 060 061 protected Type type; 062 063 protected boolean sync; 064 065 protected boolean recurse; 066 067 protected String targetDocumentId; 068 069 protected String path; 070 071 protected String repositoryName; 072 073 protected List<String> schemas; 074 075 protected transient String sessionId; 076 077 protected IndexingCommand() { 078 } 079 080 /** 081 * Create an indexing command 082 * 083 * @param document the target document 084 * @param commandType the type of command 085 * @param sync if true the command will be processed on the same thread after transaction completion and the 086 * Elasticsearch index will be refresh 087 * @param recurse the command affect the document and all its descendants 088 */ 089 public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) { 090 id = PREFIX + UUID.randomUUID().toString(); 091 type = commandType; 092 this.sync = sync; 093 this.recurse = recurse; 094 if ((sync && recurse) && commandType != Type.DELETE) { 095 // we don't want sync and recursive command 096 throw new IllegalArgumentException("Recurse and synchronous command is not allowed: cmd: " + this 097 + ", doc: " + document); 098 } 099 if (document == null) { 100 throw new IllegalArgumentException("Target document is null for: " + this); 101 } 102 DocumentModel targetDocument = getValidTargetDocument(document); 103 repositoryName = targetDocument.getRepositoryName(); 104 targetDocumentId = targetDocument.getId(); 105 sessionId = targetDocument.getSessionId(); 106 path = targetDocument.getPathAsString(); 107 if (targetDocumentId == null) { 108 throw new IllegalArgumentException("Target document has a null uid: " + this); 109 } 110 } 111 112 protected DocumentModel getValidTargetDocument(DocumentModel target) { 113 if (target.getId() != null) { 114 return target; 115 } 116 if (target.getRef() == null || target.getCoreSession() == null) { 117 throw new IllegalArgumentException("Invalid target document: " + target); 118 } 119 // transient document try to get it from its path 120 DocumentRef documentRef = target.getRef(); 121 log.warn("Creating indexing command on a document with a null id, ref: " + documentRef 122 + ", trying to get the docId from its path, activate trace level for more info " + this); 123 if (log.isTraceEnabled()) { 124 Throwable throwable = new Throwable(); 125 StringWriter stack = new StringWriter(); 126 throwable.printStackTrace(new PrintWriter(stack)); 127 log.trace("You should use a document returned by session.createDocument, stack " + stack.toString()); 128 } 129 return target.getCoreSession().getDocument(documentRef); 130 } 131 132 public void attach(CoreSession session) { 133 if (!session.getRepositoryName().equals(repositoryName)) { 134 throw new IllegalArgumentException("Invalid session, expected repo: " + repositoryName + " actual: " 135 + session.getRepositoryName()); 136 } 137 sessionId = session.getSessionId(); 138 assert sessionId != null : "Attach to session with a null sessionId"; 139 } 140 141 /** 142 * Return the document or null if it does not exists anymore. 143 * 144 * @throws java.lang.IllegalStateException if there is no session attached 145 */ 146 public DocumentModel getTargetDocument() { 147 CoreSession session = null; 148 if (sessionId != null) { 149 session = CoreInstance.getInstance().getSession(sessionId); 150 } 151 if (session == null) { 152 throw new IllegalStateException("Command is not attached to a valid session: " + this); 153 } 154 IdRef idref = new IdRef(targetDocumentId); 155 if (!session.exists(idref)) { 156 // Doc was deleted : no way we can fetch it 157 return null; 158 } 159 return session.getDocument(idref); 160 } 161 162 public String getRepositoryName() { 163 return repositoryName; 164 } 165 166 /** 167 * @return true if merged 168 */ 169 public boolean merge(IndexingCommand other) { 170 if (canBeMerged(other)) { 171 merge(other.sync, other.recurse); 172 return true; 173 } 174 return false; 175 } 176 177 protected void merge(boolean sync, boolean recurse) { 178 this.sync = this.sync || sync; 179 this.recurse = this.recurse || recurse; 180 } 181 182 protected boolean canBeMerged(IndexingCommand other) { 183 if (type != other.type) { 184 return false; 185 } 186 if (type == Type.DELETE) { 187 // we support recursive sync deletion 188 return true; 189 } 190 // only if the result is not a sync and recurse command 191 return !((other.sync || sync) && (other.recurse || recurse)); 192 } 193 194 public boolean isSync() { 195 return sync; 196 } 197 198 public boolean isRecurse() { 199 return recurse; 200 } 201 202 public Type getType() { 203 return type; 204 } 205 206 public String toJSON() throws IOException { 207 StringWriter out = new StringWriter(); 208 JsonFactory factory = new JsonFactory(); 209 JsonGenerator jsonGen = factory.createJsonGenerator(out); 210 toJSON(jsonGen); 211 out.flush(); 212 jsonGen.close(); 213 return out.toString(); 214 } 215 216 public void toJSON(JsonGenerator jsonGen) throws IOException { 217 jsonGen.writeStartObject(); 218 jsonGen.writeStringField("id", id); 219 jsonGen.writeStringField("type", String.format("%s", type)); 220 jsonGen.writeStringField("docId", getTargetDocumentId()); 221 jsonGen.writeStringField("path", path); 222 jsonGen.writeStringField("repo", getRepositoryName()); 223 jsonGen.writeBooleanField("recurse", recurse); 224 jsonGen.writeBooleanField("sync", sync); 225 jsonGen.writeEndObject(); 226 } 227 228 /** 229 * Create a command from a JSON string. 230 * 231 * @throws IllegalArgumentException if json is invalid or command is invalid 232 */ 233 public static IndexingCommand fromJSON(String json) { 234 JsonFactory jsonFactory = new JsonFactory(); 235 ObjectMapper mapper = new ObjectMapper(jsonFactory); 236 try { 237 return fromJSON(mapper.readTree(json)); 238 } catch (IOException e) { 239 throw new IllegalArgumentException("Invalid JSON: " + json, e); 240 } 241 } 242 243 public static IndexingCommand fromJSON(JsonNode jsonNode) { 244 IndexingCommand cmd = new IndexingCommand(); 245 Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.getFields(); 246 while (fieldsIterator.hasNext()) { 247 Map.Entry<String, JsonNode> field = fieldsIterator.next(); 248 String key = field.getKey(); 249 JsonNode value = field.getValue(); 250 if (value.isNull()) { 251 continue; 252 } 253 if ("type".equals(key)) { 254 cmd.type = Type.valueOf(value.getTextValue()); 255 } else if ("docId".equals(key)) { 256 cmd.targetDocumentId = value.getTextValue(); 257 } else if ("path".equals(key)) { 258 cmd.path = value.getTextValue(); 259 } else if ("repo".equals(key)) { 260 cmd.repositoryName = value.getTextValue(); 261 } else if ("id".equals(key)) { 262 cmd.id = value.getTextValue(); 263 } else if ("recurse".equals(key)) { 264 cmd.recurse = value.getBooleanValue(); 265 } else if ("sync".equals(key)) { 266 cmd.sync = value.getBooleanValue(); 267 } 268 } 269 if (cmd.targetDocumentId == null) { 270 throw new IllegalArgumentException("Document uid is null: " + cmd); 271 } 272 if (cmd.type == null) { 273 throw new IllegalArgumentException("Invalid type: " + cmd); 274 } 275 return cmd; 276 } 277 278 public String getId() { 279 return id; 280 } 281 282 public String getTargetDocumentId() { 283 return targetDocumentId; 284 } 285 286 public IndexingCommand clone(DocumentModel newDoc) { 287 return new IndexingCommand(newDoc, type, sync, recurse); 288 } 289 290 public String[] getSchemas() { 291 String[] ret = null; 292 if (schemas != null && schemas.size() > 0) { 293 ret = schemas.toArray(new String[schemas.size()]); 294 } 295 return ret; 296 } 297 298 public void addSchemas(String schema) { 299 if (schemas == null) { 300 schemas = new ArrayList<>(); 301 } 302 if (!schemas.contains(schema)) { 303 schemas.add(schema); 304 } 305 } 306 307 @Override 308 public String toString() { 309 try { 310 return toJSON(); 311 } catch (IOException e) { 312 return super.toString(); 313 } 314 } 315 316 /** 317 * Try to make the command synchronous. Recurse command will stay in async for update. 318 */ 319 public void makeSync() { 320 if (!sync) { 321 if (!recurse || type == Type.DELETE) { 322 sync = true; 323 if (log.isDebugEnabled()) { 324 log.debug("Turn command into sync: " + toString()); 325 } 326 } 327 } 328 } 329 330}