001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Thierry Delprat
016 *     Benoit Delbosc
017 */
018package org.nuxeo.elasticsearch.commands;
019
020import org.apache.commons.logging.Log;
021import org.apache.commons.logging.LogFactory;
022import org.codehaus.jackson.JsonFactory;
023import org.codehaus.jackson.JsonGenerator;
024import org.codehaus.jackson.JsonNode;
025import org.codehaus.jackson.map.ObjectMapper;
026import org.nuxeo.ecm.core.api.CoreInstance;
027import org.nuxeo.ecm.core.api.CoreSession;
028import org.nuxeo.ecm.core.api.DocumentModel;
029import org.nuxeo.ecm.core.api.DocumentRef;
030import org.nuxeo.ecm.core.api.IdRef;
031
032import java.io.IOException;
033import java.io.PrintWriter;
034import java.io.Serializable;
035import java.io.StringWriter;
036import java.util.ArrayList;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Map;
040import java.util.UUID;
041
042/**
043 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly"
044 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to
045 * process them.
046 */
047public class IndexingCommand implements Serializable {
048
049    private static final Log log = LogFactory.getLog(IndexingCommand.class);
050
051    private static final long serialVersionUID = 1L;
052
053    public enum Type {
054        INSERT, UPDATE, UPDATE_SECURITY, DELETE
055    }
056
057    public static final String PREFIX = "IndexingCommand-";
058
059    protected String id;
060
061    protected Type type;
062
063    protected boolean sync;
064
065    protected boolean recurse;
066
067    protected String targetDocumentId;
068
069    protected String path;
070
071    protected String repositoryName;
072
073    protected List<String> schemas;
074
075    protected transient String sessionId;
076
077    protected IndexingCommand() {
078    }
079
080    /**
081     * Create an indexing command
082     *
083     * @param document the target document
084     * @param commandType the type of command
085     * @param sync if true the command will be processed on the same thread after transaction completion and the
086     *            Elasticsearch index will be refresh
087     * @param recurse the command affect the document and all its descendants
088     */
089    public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) {
090        id = PREFIX + UUID.randomUUID().toString();
091        type = commandType;
092        this.sync = sync;
093        this.recurse = recurse;
094        if ((sync && recurse) && commandType != Type.DELETE) {
095            // we don't want sync and recursive command
096            throw new IllegalArgumentException("Recurse and synchronous command is not allowed: cmd: " + this
097                    + ", doc: " + document);
098        }
099        if (document == null) {
100            throw new IllegalArgumentException("Target document is null for: " + this);
101        }
102        DocumentModel targetDocument = getValidTargetDocument(document);
103        repositoryName = targetDocument.getRepositoryName();
104        targetDocumentId = targetDocument.getId();
105        sessionId = targetDocument.getSessionId();
106        path = targetDocument.getPathAsString();
107        if (targetDocumentId == null) {
108            throw new IllegalArgumentException("Target document has a null uid: " + this);
109        }
110    }
111
112    protected DocumentModel getValidTargetDocument(DocumentModel target) {
113        if (target.getId() != null) {
114            return target;
115        }
116        if (target.getRef() == null || target.getCoreSession() == null) {
117            throw new IllegalArgumentException("Invalid target document: " + target);
118        }
119        // transient document try to get it from its path
120        DocumentRef documentRef = target.getRef();
121        log.warn("Creating indexing command on a document with a null id, ref: " + documentRef
122                + ", trying to get the docId from its path, activate trace level for more info " + this);
123        if (log.isTraceEnabled()) {
124            Throwable throwable = new Throwable();
125            StringWriter stack = new StringWriter();
126            throwable.printStackTrace(new PrintWriter(stack));
127            log.trace("You should use a document returned by session.createDocument, stack " + stack.toString());
128        }
129        return target.getCoreSession().getDocument(documentRef);
130    }
131
132    public void attach(CoreSession session) {
133        if (!session.getRepositoryName().equals(repositoryName)) {
134            throw new IllegalArgumentException("Invalid session, expected repo: " + repositoryName + " actual: "
135                    + session.getRepositoryName());
136        }
137        sessionId = session.getSessionId();
138        assert sessionId != null : "Attach to session with a null sessionId";
139    }
140
141    /**
142     * Return the document or null if it does not exists anymore.
143     *
144     * @throws java.lang.IllegalStateException if there is no session attached
145     */
146    public DocumentModel getTargetDocument() {
147        CoreSession session = null;
148        if (sessionId != null) {
149            session = CoreInstance.getInstance().getSession(sessionId);
150        }
151        if (session == null) {
152            throw new IllegalStateException("Command is not attached to a valid session: " + this);
153        }
154        IdRef idref = new IdRef(targetDocumentId);
155        if (!session.exists(idref)) {
156            // Doc was deleted : no way we can fetch it
157            return null;
158        }
159        return session.getDocument(idref);
160    }
161
162    public String getRepositoryName() {
163        return repositoryName;
164    }
165
166    /**
167     * @return true if merged
168     */
169    public boolean merge(IndexingCommand other) {
170        if (canBeMerged(other)) {
171            merge(other.sync, other.recurse);
172            return true;
173        }
174        return false;
175    }
176
177    protected void merge(boolean sync, boolean recurse) {
178        this.sync = this.sync || sync;
179        this.recurse = this.recurse || recurse;
180    }
181
182    protected boolean canBeMerged(IndexingCommand other) {
183        if (type != other.type) {
184            return false;
185        }
186        if (type == Type.DELETE) {
187            // we support recursive sync deletion
188            return true;
189        }
190        // only if the result is not a sync and recurse command
191        return !((other.sync || sync) && (other.recurse || recurse));
192    }
193
194    public boolean isSync() {
195        return sync;
196    }
197
198    public boolean isRecurse() {
199        return recurse;
200    }
201
202    public Type getType() {
203        return type;
204    }
205
206    public String toJSON() throws IOException {
207        StringWriter out = new StringWriter();
208        JsonFactory factory = new JsonFactory();
209        JsonGenerator jsonGen = factory.createJsonGenerator(out);
210        toJSON(jsonGen);
211        out.flush();
212        jsonGen.close();
213        return out.toString();
214    }
215
216    public void toJSON(JsonGenerator jsonGen) throws IOException {
217        jsonGen.writeStartObject();
218        jsonGen.writeStringField("id", id);
219        jsonGen.writeStringField("type", String.format("%s", type));
220        jsonGen.writeStringField("docId", getTargetDocumentId());
221        jsonGen.writeStringField("path", path);
222        jsonGen.writeStringField("repo", getRepositoryName());
223        jsonGen.writeBooleanField("recurse", recurse);
224        jsonGen.writeBooleanField("sync", sync);
225        jsonGen.writeEndObject();
226    }
227
228    /**
229     * Create a command from a JSON string.
230     *
231     * @throws IllegalArgumentException if json is invalid or command is invalid
232     */
233    public static IndexingCommand fromJSON(String json) {
234        JsonFactory jsonFactory = new JsonFactory();
235        ObjectMapper mapper = new ObjectMapper(jsonFactory);
236        try {
237            return fromJSON(mapper.readTree(json));
238        } catch (IOException e) {
239            throw new IllegalArgumentException("Invalid JSON: " + json, e);
240        }
241    }
242
243    public static IndexingCommand fromJSON(JsonNode jsonNode) {
244        IndexingCommand cmd = new IndexingCommand();
245        Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.getFields();
246        while (fieldsIterator.hasNext()) {
247            Map.Entry<String, JsonNode> field = fieldsIterator.next();
248            String key = field.getKey();
249            JsonNode value = field.getValue();
250            if (value.isNull()) {
251                continue;
252            }
253            if ("type".equals(key)) {
254                cmd.type = Type.valueOf(value.getTextValue());
255            } else if ("docId".equals(key)) {
256                cmd.targetDocumentId = value.getTextValue();
257            } else if ("path".equals(key)) {
258                cmd.path = value.getTextValue();
259            } else if ("repo".equals(key)) {
260                cmd.repositoryName = value.getTextValue();
261            } else if ("id".equals(key)) {
262                cmd.id = value.getTextValue();
263            } else if ("recurse".equals(key)) {
264                cmd.recurse = value.getBooleanValue();
265            } else if ("sync".equals(key)) {
266                cmd.sync = value.getBooleanValue();
267            }
268        }
269        if (cmd.targetDocumentId == null) {
270            throw new IllegalArgumentException("Document uid is null: " + cmd);
271        }
272        if (cmd.type == null) {
273            throw new IllegalArgumentException("Invalid type: " + cmd);
274        }
275        return cmd;
276    }
277
278    public String getId() {
279        return id;
280    }
281
282    public String getTargetDocumentId() {
283        return targetDocumentId;
284    }
285
286    public IndexingCommand clone(DocumentModel newDoc) {
287        return new IndexingCommand(newDoc, type, sync, recurse);
288    }
289
290    public String[] getSchemas() {
291        String[] ret = null;
292        if (schemas != null && schemas.size() > 0) {
293            ret = schemas.toArray(new String[schemas.size()]);
294        }
295        return ret;
296    }
297
298    public void addSchemas(String schema) {
299        if (schemas == null) {
300            schemas = new ArrayList<>();
301        }
302        if (!schemas.contains(schema)) {
303            schemas.add(schema);
304        }
305    }
306
307    @Override
308    public String toString() {
309        try {
310            return toJSON();
311        } catch (IOException e) {
312            return super.toString();
313        }
314    }
315
316    /**
317     * Try to make the command synchronous. Recurse command will stay in async for update.
318     */
319    public void makeSync() {
320        if (!sync) {
321            if (!recurse || type == Type.DELETE) {
322                sync = true;
323                if (log.isDebugEnabled()) {
324                    log.debug("Turn command into sync: " + toString());
325                }
326            }
327        }
328    }
329
330}