001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.commands; 021 022import java.io.IOException; 023import java.io.PrintWriter; 024import java.io.Serializable; 025import java.io.StringWriter; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.atomic.AtomicLong; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.CoreInstance; 035import org.nuxeo.ecm.core.api.CoreSession; 036import org.nuxeo.ecm.core.api.DocumentModel; 037import org.nuxeo.ecm.core.api.DocumentRef; 038import org.nuxeo.ecm.core.api.IdRef; 039 040import com.fasterxml.jackson.core.JsonFactory; 041import com.fasterxml.jackson.core.JsonGenerator; 042import com.fasterxml.jackson.databind.JsonNode; 043import com.fasterxml.jackson.databind.ObjectMapper; 044 045/** 046 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly" 047 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to 048 * process them. 049 */ 050public class IndexingCommand implements Serializable { 051 052 private static final Log log = LogFactory.getLog(IndexingCommand.class); 053 054 private static final long serialVersionUID = 1L; 055 056 public enum Type { 057 INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN, 058 } 059 060 public static final String PREFIX = "IxCd-"; 061 062 protected String id; 063 064 protected Type type; 065 066 protected boolean sync; 067 068 protected boolean recurse; 069 070 protected String targetDocumentId; 071 072 protected String path; 073 074 protected String repositoryName; 075 076 protected List<String> schemas; 077 078 protected long order; 079 080 protected transient static AtomicLong seq = new AtomicLong(0); 081 082 protected IndexingCommand() { 083 } 084 085 /** 086 * Create an indexing command 087 * 088 * @param document the target document 089 * @param commandType the type of command 090 * @param sync if true the command will be processed on the same thread after transaction completion and the 091 * Elasticsearch index will be refresh 092 * @param recurse the command affect the document and all its descendants 093 */ 094 public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) { 095 id = PREFIX + seq.incrementAndGet(); 096 type = commandType; 097 this.sync = sync; 098 this.recurse = recurse; 099 if ((sync && recurse) && commandType != Type.DELETE) { 100 // we don't want sync and recursive command 101 throw new IllegalArgumentException( 102 "Recurse and synchronous command is not allowed: cmd: " + this + ", doc: " + document); 103 } 104 if (document == null) { 105 throw new IllegalArgumentException("Target document is null for: " + this); 106 } 107 DocumentModel targetDocument = getValidTargetDocument(document); 108 repositoryName = targetDocument.getRepositoryName(); 109 targetDocumentId = targetDocument.getId(); 110 path = targetDocument.getPathAsString(); 111 if (targetDocumentId == null) { 112 throw new IllegalArgumentException("Target document has a null uid: " + this); 113 } 114 } 115 116 protected DocumentModel getValidTargetDocument(DocumentModel target) { 117 if (target.getId() != null) { 118 return target; 119 } 120 if (target.getRef() == null || target.getCoreSession() == null) { 121 throw new IllegalArgumentException("Invalid target document: " + target); 122 } 123 // transient document try to get it from its path 124 DocumentRef documentRef = target.getRef(); 125 log.warn("Creating indexing command on a document with a null id, ref: " + documentRef 126 + ", trying to get the docId from its path, activate trace level for more info " + this); 127 if (log.isTraceEnabled()) { 128 Throwable throwable = new Throwable(); 129 StringWriter stack = new StringWriter(); 130 throwable.printStackTrace(new PrintWriter(stack)); 131 log.trace("You should use a document returned by session.createDocument, stack " + stack.toString()); 132 } 133 return target.getCoreSession().getDocument(documentRef); 134 } 135 136 public void attach(CoreSession session) { 137 if (!session.getRepositoryName().equals(repositoryName)) { 138 throw new IllegalArgumentException( 139 "Invalid session, expected repo: " + repositoryName + " actual: " + session.getRepositoryName()); 140 } 141 } 142 143 /** 144 * Return the document or null if it does not exists anymore. 145 * 146 * @throws java.lang.IllegalStateException if there is no session attached 147 */ 148 public DocumentModel getTargetDocument() { 149 CoreSession session = CoreInstance.getCoreSessionSystem(repositoryName); 150 IdRef idref = new IdRef(targetDocumentId); 151 if (!session.exists(idref)) { 152 // Doc was deleted : no way we can fetch it 153 return null; 154 } 155 return session.getDocument(idref); 156 } 157 158 public String getRepositoryName() { 159 return repositoryName; 160 } 161 162 /** 163 * @return true if merged 164 */ 165 public boolean merge(IndexingCommand other) { 166 if (canBeMerged(other)) { 167 merge(other.sync, other.recurse); 168 return true; 169 } 170 return false; 171 } 172 173 protected void merge(boolean sync, boolean recurse) { 174 this.sync = this.sync || sync; 175 this.recurse = this.recurse || recurse; 176 } 177 178 protected boolean canBeMerged(IndexingCommand other) { 179 if (type != other.type) { 180 return false; 181 } 182 if (type == Type.DELETE) { 183 // we support recursive sync deletion 184 return true; 185 } 186 // only if the result is not a sync and recurse command 187 return !((other.sync || sync) && (other.recurse || recurse)); 188 } 189 190 public boolean isSync() { 191 return sync; 192 } 193 194 public boolean isRecurse() { 195 return recurse; 196 } 197 198 public Type getType() { 199 return type; 200 } 201 202 public String toJSON() throws IOException { 203 StringWriter out = new StringWriter(); 204 JsonFactory factory = new JsonFactory(); 205 JsonGenerator jsonGen = factory.createGenerator(out); 206 toJSON(jsonGen); 207 out.flush(); 208 jsonGen.close(); 209 return out.toString(); 210 } 211 212 public void toJSON(JsonGenerator jsonGen) throws IOException { 213 jsonGen.writeStartObject(); 214 jsonGen.writeStringField("id", id); 215 jsonGen.writeStringField("type", String.format("%s", type)); 216 jsonGen.writeStringField("docId", getTargetDocumentId()); 217 jsonGen.writeStringField("path", path); 218 jsonGen.writeStringField("repo", getRepositoryName()); 219 jsonGen.writeBooleanField("recurse", recurse); 220 jsonGen.writeBooleanField("sync", sync); 221 jsonGen.writeNumberField("order", getOrder()); 222 jsonGen.writeEndObject(); 223 } 224 225 /** 226 * Create a command from a JSON string. 227 * 228 * @throws IllegalArgumentException if json is invalid or command is invalid 229 */ 230 public static IndexingCommand fromJSON(String json) { 231 JsonFactory jsonFactory = new JsonFactory(); 232 ObjectMapper mapper = new ObjectMapper(jsonFactory); 233 try { 234 return fromJSON(mapper.readTree(json)); 235 } catch (IOException e) { 236 throw new IllegalArgumentException("Invalid JSON: " + json, e); 237 } 238 } 239 240 public static IndexingCommand fromJSON(JsonNode jsonNode) { 241 IndexingCommand cmd = new IndexingCommand(); 242 Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.fields(); 243 while (fieldsIterator.hasNext()) { 244 Map.Entry<String, JsonNode> field = fieldsIterator.next(); 245 String key = field.getKey(); 246 JsonNode value = field.getValue(); 247 if (value.isNull()) { 248 continue; 249 } 250 if ("type".equals(key)) { 251 cmd.type = Type.valueOf(value.textValue()); 252 } else if ("docId".equals(key)) { 253 cmd.targetDocumentId = value.textValue(); 254 } else if ("path".equals(key)) { 255 cmd.path = value.textValue(); 256 } else if ("repo".equals(key)) { 257 cmd.repositoryName = value.textValue(); 258 } else if ("id".equals(key)) { 259 cmd.id = value.textValue(); 260 } else if ("recurse".equals(key)) { 261 cmd.recurse = value.booleanValue(); 262 } else if ("sync".equals(key)) { 263 cmd.sync = value.booleanValue(); 264 } 265 } 266 if (cmd.targetDocumentId == null) { 267 throw new IllegalArgumentException("Document uid is null: " + cmd); 268 } 269 if (cmd.type == null) { 270 throw new IllegalArgumentException("Invalid type: " + cmd); 271 } 272 return cmd; 273 } 274 275 public String getId() { 276 return id; 277 } 278 279 public String getTargetDocumentId() { 280 return targetDocumentId; 281 } 282 283 public IndexingCommand clone(DocumentModel newDoc) { 284 return new IndexingCommand(newDoc, type, sync, recurse); 285 } 286 287 public String[] getSchemas() { 288 String[] ret = null; 289 if (schemas != null && schemas.size() > 0) { 290 ret = schemas.toArray(new String[schemas.size()]); 291 } 292 return ret; 293 } 294 295 public void addSchemas(String schema) { 296 if (schemas == null) { 297 schemas = new ArrayList<>(); 298 } 299 if (!schemas.contains(schema)) { 300 schemas.add(schema); 301 } 302 } 303 304 @Override 305 public String toString() { 306 try { 307 return toJSON(); 308 } catch (IOException e) { 309 return super.toString(); 310 } 311 } 312 313 /** 314 * Try to make the command synchronous. Recurse command will stay in async for update. 315 */ 316 public void makeSync() { 317 if (!sync) { 318 if (!recurse || type == Type.DELETE) { 319 sync = true; 320 if (log.isDebugEnabled()) { 321 log.debug("Turn command into sync: " + toString()); 322 } 323 } 324 } 325 } 326 327 // @since 8.3 328 public long getOrder() { 329 return order; 330 } 331 332 // @since 8.3 333 public void setOrder(long order) { 334 this.order = order; 335 } 336}