001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.commands;
021
022import java.io.IOException;
023import java.io.PrintWriter;
024import java.io.Serializable;
025import java.io.StringWriter;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.atomic.AtomicLong;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.CoreSessionService;
036import org.nuxeo.ecm.core.api.DocumentModel;
037import org.nuxeo.ecm.core.api.DocumentRef;
038import org.nuxeo.ecm.core.api.IdRef;
039import org.nuxeo.runtime.api.Framework;
040
041import com.fasterxml.jackson.core.JsonFactory;
042import com.fasterxml.jackson.core.JsonGenerator;
043import com.fasterxml.jackson.databind.JsonNode;
044import com.fasterxml.jackson.databind.ObjectMapper;
045
046/**
047 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly"
048 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to
049 * process them.
050 */
051public class IndexingCommand implements Serializable {
052
053    private static final Log log = LogFactory.getLog(IndexingCommand.class);
054
055    private static final long serialVersionUID = 1L;
056
057    public enum Type {
058        INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN,
059    }
060
061    public static final String PREFIX = "IxCd-";
062
063    protected String id;
064
065    protected Type type;
066
067    protected boolean sync;
068
069    protected boolean recurse;
070
071    protected String targetDocumentId;
072
073    protected String path;
074
075    protected String repositoryName;
076
077    protected List<String> schemas;
078
079    protected long order;
080
081    protected transient String sessionId;
082
083    protected transient static AtomicLong seq = new AtomicLong(0);
084
085    protected IndexingCommand() {
086    }
087
088    /**
089     * Create an indexing command
090     *
091     * @param document the target document
092     * @param commandType the type of command
093     * @param sync if true the command will be processed on the same thread after transaction completion and the
094     *            Elasticsearch index will be refresh
095     * @param recurse the command affect the document and all its descendants
096     */
097    public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) {
098        id = PREFIX + seq.incrementAndGet();
099        type = commandType;
100        this.sync = sync;
101        this.recurse = recurse;
102        if ((sync && recurse) && commandType != Type.DELETE) {
103            // we don't want sync and recursive command
104            throw new IllegalArgumentException(
105                    "Recurse and synchronous command is not allowed: cmd: " + this + ", doc: " + document);
106        }
107        if (document == null) {
108            throw new IllegalArgumentException("Target document is null for: " + this);
109        }
110        DocumentModel targetDocument = getValidTargetDocument(document);
111        repositoryName = targetDocument.getRepositoryName();
112        targetDocumentId = targetDocument.getId();
113        sessionId = targetDocument.getSessionId();
114        path = targetDocument.getPathAsString();
115        if (targetDocumentId == null) {
116            throw new IllegalArgumentException("Target document has a null uid: " + this);
117        }
118    }
119
120    protected DocumentModel getValidTargetDocument(DocumentModel target) {
121        if (target.getId() != null) {
122            return target;
123        }
124        if (target.getRef() == null || target.getCoreSession() == null) {
125            throw new IllegalArgumentException("Invalid target document: " + target);
126        }
127        // transient document try to get it from its path
128        DocumentRef documentRef = target.getRef();
129        log.warn("Creating indexing command on a document with a null id, ref: " + documentRef
130                + ", trying to get the docId from its path, activate trace level for more info " + this);
131        if (log.isTraceEnabled()) {
132            Throwable throwable = new Throwable();
133            StringWriter stack = new StringWriter();
134            throwable.printStackTrace(new PrintWriter(stack));
135            log.trace("You should use a document returned by session.createDocument, stack " + stack.toString());
136        }
137        return target.getCoreSession().getDocument(documentRef);
138    }
139
140    public void attach(CoreSession session) {
141        if (!session.getRepositoryName().equals(repositoryName)) {
142            throw new IllegalArgumentException(
143                    "Invalid session, expected repo: " + repositoryName + " actual: " + session.getRepositoryName());
144        }
145        sessionId = session.getSessionId();
146        assert sessionId != null : "Attach to session with a null sessionId";
147    }
148
149    /**
150     * Return the document or null if it does not exists anymore.
151     *
152     * @throws java.lang.IllegalStateException if there is no session attached
153     */
154    public DocumentModel getTargetDocument() {
155        CoreSession session = null;
156        if (sessionId != null) {
157            session = Framework.getService(CoreSessionService.class).getCoreSession(sessionId);
158        }
159        if (session == null) {
160            throw new IllegalStateException("Command is not attached to a valid session: " + this);
161        }
162        IdRef idref = new IdRef(targetDocumentId);
163        if (!session.exists(idref)) {
164            // Doc was deleted : no way we can fetch it
165            return null;
166        }
167        return session.getDocument(idref);
168    }
169
170    public String getRepositoryName() {
171        return repositoryName;
172    }
173
174    /**
175     * @return true if merged
176     */
177    public boolean merge(IndexingCommand other) {
178        if (canBeMerged(other)) {
179            merge(other.sync, other.recurse);
180            return true;
181        }
182        return false;
183    }
184
185    protected void merge(boolean sync, boolean recurse) {
186        this.sync = this.sync || sync;
187        this.recurse = this.recurse || recurse;
188    }
189
190    protected boolean canBeMerged(IndexingCommand other) {
191        if (type != other.type) {
192            return false;
193        }
194        if (type == Type.DELETE) {
195            // we support recursive sync deletion
196            return true;
197        }
198        // only if the result is not a sync and recurse command
199        return !((other.sync || sync) && (other.recurse || recurse));
200    }
201
202    public boolean isSync() {
203        return sync;
204    }
205
206    public boolean isRecurse() {
207        return recurse;
208    }
209
210    public Type getType() {
211        return type;
212    }
213
214    public String toJSON() throws IOException {
215        StringWriter out = new StringWriter();
216        JsonFactory factory = new JsonFactory();
217        JsonGenerator jsonGen = factory.createGenerator(out);
218        toJSON(jsonGen);
219        out.flush();
220        jsonGen.close();
221        return out.toString();
222    }
223
224    public void toJSON(JsonGenerator jsonGen) throws IOException {
225        jsonGen.writeStartObject();
226        jsonGen.writeStringField("id", id);
227        jsonGen.writeStringField("type", String.format("%s", type));
228        jsonGen.writeStringField("docId", getTargetDocumentId());
229        jsonGen.writeStringField("path", path);
230        jsonGen.writeStringField("repo", getRepositoryName());
231        jsonGen.writeBooleanField("recurse", recurse);
232        jsonGen.writeBooleanField("sync", sync);
233        jsonGen.writeNumberField("order", getOrder());
234        jsonGen.writeEndObject();
235    }
236
237    /**
238     * Create a command from a JSON string.
239     *
240     * @throws IllegalArgumentException if json is invalid or command is invalid
241     */
242    public static IndexingCommand fromJSON(String json) {
243        JsonFactory jsonFactory = new JsonFactory();
244        ObjectMapper mapper = new ObjectMapper(jsonFactory);
245        try {
246            return fromJSON(mapper.readTree(json));
247        } catch (IOException e) {
248            throw new IllegalArgumentException("Invalid JSON: " + json, e);
249        }
250    }
251
252    public static IndexingCommand fromJSON(JsonNode jsonNode) {
253        IndexingCommand cmd = new IndexingCommand();
254        Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.fields();
255        while (fieldsIterator.hasNext()) {
256            Map.Entry<String, JsonNode> field = fieldsIterator.next();
257            String key = field.getKey();
258            JsonNode value = field.getValue();
259            if (value.isNull()) {
260                continue;
261            }
262            if ("type".equals(key)) {
263                cmd.type = Type.valueOf(value.textValue());
264            } else if ("docId".equals(key)) {
265                cmd.targetDocumentId = value.textValue();
266            } else if ("path".equals(key)) {
267                cmd.path = value.textValue();
268            } else if ("repo".equals(key)) {
269                cmd.repositoryName = value.textValue();
270            } else if ("id".equals(key)) {
271                cmd.id = value.textValue();
272            } else if ("recurse".equals(key)) {
273                cmd.recurse = value.booleanValue();
274            } else if ("sync".equals(key)) {
275                cmd.sync = value.booleanValue();
276            }
277        }
278        if (cmd.targetDocumentId == null) {
279            throw new IllegalArgumentException("Document uid is null: " + cmd);
280        }
281        if (cmd.type == null) {
282            throw new IllegalArgumentException("Invalid type: " + cmd);
283        }
284        return cmd;
285    }
286
287    public String getId() {
288        return id;
289    }
290
291    public String getTargetDocumentId() {
292        return targetDocumentId;
293    }
294
295    public IndexingCommand clone(DocumentModel newDoc) {
296        return new IndexingCommand(newDoc, type, sync, recurse);
297    }
298
299    public String[] getSchemas() {
300        String[] ret = null;
301        if (schemas != null && schemas.size() > 0) {
302            ret = schemas.toArray(new String[schemas.size()]);
303        }
304        return ret;
305    }
306
307    public void addSchemas(String schema) {
308        if (schemas == null) {
309            schemas = new ArrayList<>();
310        }
311        if (!schemas.contains(schema)) {
312            schemas.add(schema);
313        }
314    }
315
316    @Override
317    public String toString() {
318        try {
319            return toJSON();
320        } catch (IOException e) {
321            return super.toString();
322        }
323    }
324
325    /**
326     * Try to make the command synchronous. Recurse command will stay in async for update.
327     */
328    public void makeSync() {
329        if (!sync) {
330            if (!recurse || type == Type.DELETE) {
331                sync = true;
332                if (log.isDebugEnabled()) {
333                    log.debug("Turn command into sync: " + toString());
334                }
335            }
336        }
337    }
338
339    // @since 8.3
340    public long getOrder() {
341        return order;
342    }
343
344    // @since 8.3
345    public void setOrder(long order) {
346        this.order = order;
347    }
348}