001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.commands;
021
022import java.io.IOException;
023import java.io.PrintWriter;
024import java.io.Serializable;
025import java.io.StringWriter;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.atomic.AtomicLong;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.CoreInstance;
035import org.nuxeo.ecm.core.api.CoreSession;
036import org.nuxeo.ecm.core.api.DocumentModel;
037import org.nuxeo.ecm.core.api.DocumentRef;
038import org.nuxeo.ecm.core.api.IdRef;
039
040import com.fasterxml.jackson.core.JsonFactory;
041import com.fasterxml.jackson.core.JsonGenerator;
042import com.fasterxml.jackson.databind.JsonNode;
043import com.fasterxml.jackson.databind.ObjectMapper;
044
045/**
046 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly"
047 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to
048 * process them.
049 */
050public class IndexingCommand implements Serializable {
051
052    private static final Log log = LogFactory.getLog(IndexingCommand.class);
053
054    private static final long serialVersionUID = 1L;
055
056    public enum Type {
057        INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN,
058    }
059
060    public static final String PREFIX = "IxCd-";
061
062    protected String id;
063
064    protected Type type;
065
066    protected boolean sync;
067
068    protected boolean recurse;
069
070    protected String targetDocumentId;
071
072    protected String path;
073
074    protected String repositoryName;
075
076    protected List<String> schemas;
077
078    protected long order;
079
080    protected transient static AtomicLong seq = new AtomicLong(0);
081
082    protected IndexingCommand() {
083    }
084
085    /**
086     * Create an indexing command
087     *
088     * @param document the target document
089     * @param commandType the type of command
090     * @param sync if true the command will be processed on the same thread after transaction completion and the
091     *            Elasticsearch index will be refresh
092     * @param recurse the command affect the document and all its descendants
093     */
094    public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) {
095        id = PREFIX + seq.incrementAndGet();
096        type = commandType;
097        this.sync = sync;
098        this.recurse = recurse;
099        if ((sync && recurse) && commandType != Type.DELETE) {
100            // we don't want sync and recursive command
101            throw new IllegalArgumentException(
102                    "Recurse and synchronous command is not allowed: cmd: " + this + ", doc: " + document);
103        }
104        if (document == null) {
105            throw new IllegalArgumentException("Target document is null for: " + this);
106        }
107        DocumentModel targetDocument = getValidTargetDocument(document);
108        repositoryName = targetDocument.getRepositoryName();
109        targetDocumentId = targetDocument.getId();
110        path = targetDocument.getPathAsString();
111        if (targetDocumentId == null) {
112            throw new IllegalArgumentException("Target document has a null uid: " + this);
113        }
114    }
115
116    protected DocumentModel getValidTargetDocument(DocumentModel target) {
117        if (target.getId() != null) {
118            return target;
119        }
120        if (target.getRef() == null || target.getCoreSession() == null) {
121            throw new IllegalArgumentException("Invalid target document: " + target);
122        }
123        // transient document try to get it from its path
124        DocumentRef documentRef = target.getRef();
125        log.warn("Creating indexing command on a document with a null id, ref: " + documentRef
126                + ", trying to get the docId from its path, activate trace level for more info " + this);
127        if (log.isTraceEnabled()) {
128            Throwable throwable = new Throwable();
129            StringWriter stack = new StringWriter();
130            throwable.printStackTrace(new PrintWriter(stack));
131            log.trace("You should use a document returned by session.createDocument, stack " + stack.toString());
132        }
133        return target.getCoreSession().getDocument(documentRef);
134    }
135
136    public void attach(CoreSession session) {
137        if (!session.getRepositoryName().equals(repositoryName)) {
138            throw new IllegalArgumentException(
139                    "Invalid session, expected repo: " + repositoryName + " actual: " + session.getRepositoryName());
140        }
141    }
142
143    /**
144     * Return the document or null if it does not exists anymore.
145     *
146     * @throws java.lang.IllegalStateException if there is no session attached
147     */
148    public DocumentModel getTargetDocument() {
149        CoreSession session = CoreInstance.getCoreSessionSystem(repositoryName);
150        IdRef idref = new IdRef(targetDocumentId);
151        if (!session.exists(idref)) {
152            // Doc was deleted : no way we can fetch it
153            return null;
154        }
155        return session.getDocument(idref);
156    }
157
158    public String getRepositoryName() {
159        return repositoryName;
160    }
161
162    /**
163     * @return true if merged
164     */
165    public boolean merge(IndexingCommand other) {
166        if (canBeMerged(other)) {
167            merge(other.sync, other.recurse);
168            return true;
169        }
170        return false;
171    }
172
173    protected void merge(boolean sync, boolean recurse) {
174        this.sync = this.sync || sync;
175        this.recurse = this.recurse || recurse;
176    }
177
178    protected boolean canBeMerged(IndexingCommand other) {
179        if (type != other.type) {
180            return false;
181        }
182        if (type == Type.DELETE) {
183            // we support recursive sync deletion
184            return true;
185        }
186        // only if the result is not a sync and recurse command
187        return !((other.sync || sync) && (other.recurse || recurse));
188    }
189
190    public boolean isSync() {
191        return sync;
192    }
193
194    public boolean isRecurse() {
195        return recurse;
196    }
197
198    public Type getType() {
199        return type;
200    }
201
202    public String toJSON() throws IOException {
203        StringWriter out = new StringWriter();
204        JsonFactory factory = new JsonFactory();
205        JsonGenerator jsonGen = factory.createGenerator(out);
206        toJSON(jsonGen);
207        out.flush();
208        jsonGen.close();
209        return out.toString();
210    }
211
212    public void toJSON(JsonGenerator jsonGen) throws IOException {
213        jsonGen.writeStartObject();
214        jsonGen.writeStringField("id", id);
215        jsonGen.writeStringField("type", String.format("%s", type));
216        jsonGen.writeStringField("docId", getTargetDocumentId());
217        jsonGen.writeStringField("path", path);
218        jsonGen.writeStringField("repo", getRepositoryName());
219        jsonGen.writeBooleanField("recurse", recurse);
220        jsonGen.writeBooleanField("sync", sync);
221        jsonGen.writeNumberField("order", getOrder());
222        jsonGen.writeEndObject();
223    }
224
225    /**
226     * Create a command from a JSON string.
227     *
228     * @throws IllegalArgumentException if json is invalid or command is invalid
229     */
230    public static IndexingCommand fromJSON(String json) {
231        JsonFactory jsonFactory = new JsonFactory();
232        ObjectMapper mapper = new ObjectMapper(jsonFactory);
233        try {
234            return fromJSON(mapper.readTree(json));
235        } catch (IOException e) {
236            throw new IllegalArgumentException("Invalid JSON: " + json, e);
237        }
238    }
239
240    public static IndexingCommand fromJSON(JsonNode jsonNode) {
241        IndexingCommand cmd = new IndexingCommand();
242        Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.fields();
243        while (fieldsIterator.hasNext()) {
244            Map.Entry<String, JsonNode> field = fieldsIterator.next();
245            String key = field.getKey();
246            JsonNode value = field.getValue();
247            if (value.isNull()) {
248                continue;
249            }
250            if ("type".equals(key)) {
251                cmd.type = Type.valueOf(value.textValue());
252            } else if ("docId".equals(key)) {
253                cmd.targetDocumentId = value.textValue();
254            } else if ("path".equals(key)) {
255                cmd.path = value.textValue();
256            } else if ("repo".equals(key)) {
257                cmd.repositoryName = value.textValue();
258            } else if ("id".equals(key)) {
259                cmd.id = value.textValue();
260            } else if ("recurse".equals(key)) {
261                cmd.recurse = value.booleanValue();
262            } else if ("sync".equals(key)) {
263                cmd.sync = value.booleanValue();
264            }
265        }
266        if (cmd.targetDocumentId == null) {
267            throw new IllegalArgumentException("Document uid is null: " + cmd);
268        }
269        if (cmd.type == null) {
270            throw new IllegalArgumentException("Invalid type: " + cmd);
271        }
272        return cmd;
273    }
274
275    public String getId() {
276        return id;
277    }
278
279    public String getTargetDocumentId() {
280        return targetDocumentId;
281    }
282
283    public IndexingCommand clone(DocumentModel newDoc) {
284        return new IndexingCommand(newDoc, type, sync, recurse);
285    }
286
287    public String[] getSchemas() {
288        String[] ret = null;
289        if (schemas != null && schemas.size() > 0) {
290            ret = schemas.toArray(new String[schemas.size()]);
291        }
292        return ret;
293    }
294
295    public void addSchemas(String schema) {
296        if (schemas == null) {
297            schemas = new ArrayList<>();
298        }
299        if (!schemas.contains(schema)) {
300            schemas.add(schema);
301        }
302    }
303
304    @Override
305    public String toString() {
306        try {
307            return toJSON();
308        } catch (IOException e) {
309            return super.toString();
310        }
311    }
312
313    /**
314     * Try to make the command synchronous. Recurse command will stay in async for update.
315     */
316    public void makeSync() {
317        if (!sync) {
318            if (!recurse || type == Type.DELETE) {
319                sync = true;
320                if (log.isDebugEnabled()) {
321                    log.debug("Turn command into sync: " + toString());
322                }
323            }
324        }
325    }
326
327    // @since 8.3
328    public long getOrder() {
329        return order;
330    }
331
332    // @since 8.3
333    public void setOrder(long order) {
334        this.order = order;
335    }
336}