001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.commands;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.codehaus.jackson.JsonFactory;
025import org.codehaus.jackson.JsonGenerator;
026import org.codehaus.jackson.JsonNode;
027import org.codehaus.jackson.map.ObjectMapper;
028import org.nuxeo.ecm.core.api.CoreInstance;
029import org.nuxeo.ecm.core.api.CoreSession;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.api.DocumentRef;
032import org.nuxeo.ecm.core.api.IdRef;
033
034import java.io.IOException;
035import java.io.PrintWriter;
036import java.io.Serializable;
037import java.io.StringWriter;
038import java.util.ArrayList;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.UUID;
043
044/**
045 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly"
046 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to
047 * process them.
048 */
049public class IndexingCommand implements Serializable {
050
051    private static final Log log = LogFactory.getLog(IndexingCommand.class);
052
053    private static final long serialVersionUID = 1L;
054
055    public enum Type {
056        INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN,
057    }
058
059    public static final String PREFIX = "IndexingCommand-";
060
061    protected String id;
062
063    protected Type type;
064
065    protected boolean sync;
066
067    protected boolean recurse;
068
069    protected String targetDocumentId;
070
071    protected String path;
072
073    protected String repositoryName;
074
075    protected List<String> schemas;
076
077    protected transient String sessionId;
078
079    protected IndexingCommand() {
080    }
081
082    /**
083     * Create an indexing command
084     *
085     * @param document the target document
086     * @param commandType the type of command
087     * @param sync if true the command will be processed on the same thread after transaction completion and the
088     *            Elasticsearch index will be refresh
089     * @param recurse the command affect the document and all its descendants
090     */
091    public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) {
092        id = PREFIX + UUID.randomUUID().toString();
093        type = commandType;
094        this.sync = sync;
095        this.recurse = recurse;
096        if ((sync && recurse) && commandType != Type.DELETE) {
097            // we don't want sync and recursive command
098            throw new IllegalArgumentException("Recurse and synchronous command is not allowed: cmd: " + this
099                    + ", doc: " + document);
100        }
101        if (document == null) {
102            throw new IllegalArgumentException("Target document is null for: " + this);
103        }
104        DocumentModel targetDocument = getValidTargetDocument(document);
105        repositoryName = targetDocument.getRepositoryName();
106        targetDocumentId = targetDocument.getId();
107        sessionId = targetDocument.getSessionId();
108        path = targetDocument.getPathAsString();
109        if (targetDocumentId == null) {
110            throw new IllegalArgumentException("Target document has a null uid: " + this);
111        }
112    }
113
114    protected DocumentModel getValidTargetDocument(DocumentModel target) {
115        if (target.getId() != null) {
116            return target;
117        }
118        if (target.getRef() == null || target.getCoreSession() == null) {
119            throw new IllegalArgumentException("Invalid target document: " + target);
120        }
121        // transient document try to get it from its path
122        DocumentRef documentRef = target.getRef();
123        log.warn("Creating indexing command on a document with a null id, ref: " + documentRef
124                + ", trying to get the docId from its path, activate trace level for more info " + this);
125        if (log.isTraceEnabled()) {
126            Throwable throwable = new Throwable();
127            StringWriter stack = new StringWriter();
128            throwable.printStackTrace(new PrintWriter(stack));
129            log.trace("You should use a document returned by session.createDocument, stack " + stack.toString());
130        }
131        return target.getCoreSession().getDocument(documentRef);
132    }
133
134    public void attach(CoreSession session) {
135        if (!session.getRepositoryName().equals(repositoryName)) {
136            throw new IllegalArgumentException("Invalid session, expected repo: " + repositoryName + " actual: "
137                    + session.getRepositoryName());
138        }
139        sessionId = session.getSessionId();
140        assert sessionId != null : "Attach to session with a null sessionId";
141    }
142
143    /**
144     * Return the document or null if it does not exists anymore.
145     *
146     * @throws java.lang.IllegalStateException if there is no session attached
147     */
148    public DocumentModel getTargetDocument() {
149        CoreSession session = null;
150        if (sessionId != null) {
151            session = CoreInstance.getInstance().getSession(sessionId);
152        }
153        if (session == null) {
154            throw new IllegalStateException("Command is not attached to a valid session: " + this);
155        }
156        IdRef idref = new IdRef(targetDocumentId);
157        if (!session.exists(idref)) {
158            // Doc was deleted : no way we can fetch it
159            return null;
160        }
161        return session.getDocument(idref);
162    }
163
164    public String getRepositoryName() {
165        return repositoryName;
166    }
167
168    /**
169     * @return true if merged
170     */
171    public boolean merge(IndexingCommand other) {
172        if (canBeMerged(other)) {
173            merge(other.sync, other.recurse);
174            return true;
175        }
176        return false;
177    }
178
179    protected void merge(boolean sync, boolean recurse) {
180        this.sync = this.sync || sync;
181        this.recurse = this.recurse || recurse;
182    }
183
184    protected boolean canBeMerged(IndexingCommand other) {
185        if (type != other.type) {
186            return false;
187        }
188        if (type == Type.DELETE) {
189            // we support recursive sync deletion
190            return true;
191        }
192        // only if the result is not a sync and recurse command
193        return !((other.sync || sync) && (other.recurse || recurse));
194    }
195
196    public boolean isSync() {
197        return sync;
198    }
199
200    public boolean isRecurse() {
201        return recurse;
202    }
203
204    public Type getType() {
205        return type;
206    }
207
208    public String toJSON() throws IOException {
209        StringWriter out = new StringWriter();
210        JsonFactory factory = new JsonFactory();
211        JsonGenerator jsonGen = factory.createJsonGenerator(out);
212        toJSON(jsonGen);
213        out.flush();
214        jsonGen.close();
215        return out.toString();
216    }
217
218    public void toJSON(JsonGenerator jsonGen) throws IOException {
219        jsonGen.writeStartObject();
220        jsonGen.writeStringField("id", id);
221        jsonGen.writeStringField("type", String.format("%s", type));
222        jsonGen.writeStringField("docId", getTargetDocumentId());
223        jsonGen.writeStringField("path", path);
224        jsonGen.writeStringField("repo", getRepositoryName());
225        jsonGen.writeBooleanField("recurse", recurse);
226        jsonGen.writeBooleanField("sync", sync);
227        jsonGen.writeEndObject();
228    }
229
230    /**
231     * Create a command from a JSON string.
232     *
233     * @throws IllegalArgumentException if json is invalid or command is invalid
234     */
235    public static IndexingCommand fromJSON(String json) {
236        JsonFactory jsonFactory = new JsonFactory();
237        ObjectMapper mapper = new ObjectMapper(jsonFactory);
238        try {
239            return fromJSON(mapper.readTree(json));
240        } catch (IOException e) {
241            throw new IllegalArgumentException("Invalid JSON: " + json, e);
242        }
243    }
244
245    public static IndexingCommand fromJSON(JsonNode jsonNode) {
246        IndexingCommand cmd = new IndexingCommand();
247        Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.getFields();
248        while (fieldsIterator.hasNext()) {
249            Map.Entry<String, JsonNode> field = fieldsIterator.next();
250            String key = field.getKey();
251            JsonNode value = field.getValue();
252            if (value.isNull()) {
253                continue;
254            }
255            if ("type".equals(key)) {
256                cmd.type = Type.valueOf(value.getTextValue());
257            } else if ("docId".equals(key)) {
258                cmd.targetDocumentId = value.getTextValue();
259            } else if ("path".equals(key)) {
260                cmd.path = value.getTextValue();
261            } else if ("repo".equals(key)) {
262                cmd.repositoryName = value.getTextValue();
263            } else if ("id".equals(key)) {
264                cmd.id = value.getTextValue();
265            } else if ("recurse".equals(key)) {
266                cmd.recurse = value.getBooleanValue();
267            } else if ("sync".equals(key)) {
268                cmd.sync = value.getBooleanValue();
269            }
270        }
271        if (cmd.targetDocumentId == null) {
272            throw new IllegalArgumentException("Document uid is null: " + cmd);
273        }
274        if (cmd.type == null) {
275            throw new IllegalArgumentException("Invalid type: " + cmd);
276        }
277        return cmd;
278    }
279
280    public String getId() {
281        return id;
282    }
283
284    public String getTargetDocumentId() {
285        return targetDocumentId;
286    }
287
288    public IndexingCommand clone(DocumentModel newDoc) {
289        return new IndexingCommand(newDoc, type, sync, recurse);
290    }
291
292    public String[] getSchemas() {
293        String[] ret = null;
294        if (schemas != null && schemas.size() > 0) {
295            ret = schemas.toArray(new String[schemas.size()]);
296        }
297        return ret;
298    }
299
300    public void addSchemas(String schema) {
301        if (schemas == null) {
302            schemas = new ArrayList<>();
303        }
304        if (!schemas.contains(schema)) {
305            schemas.add(schema);
306        }
307    }
308
309    @Override
310    public String toString() {
311        try {
312            return toJSON();
313        } catch (IOException e) {
314            return super.toString();
315        }
316    }
317
318    /**
319     * Try to make the command synchronous. Recurse command will stay in async for update.
320     */
321    public void makeSync() {
322        if (!sync) {
323            if (!recurse || type == Type.DELETE) {
324                sync = true;
325                if (log.isDebugEnabled()) {
326                    log.debug("Turn command into sync: " + toString());
327                }
328            }
329        }
330    }
331
332}