001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.commands;
021
022import java.io.IOException;
023import java.io.PrintWriter;
024import java.io.Serializable;
025import java.io.StringWriter;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.atomic.AtomicLong;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.codehaus.jackson.JsonFactory;
035import org.codehaus.jackson.JsonGenerator;
036import org.codehaus.jackson.JsonNode;
037import org.codehaus.jackson.map.ObjectMapper;
038import org.nuxeo.ecm.core.api.CoreSession;
039import org.nuxeo.ecm.core.api.CoreSessionService;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.DocumentRef;
042import org.nuxeo.ecm.core.api.IdRef;
043import org.nuxeo.runtime.api.Framework;
044
045/**
046 * Holds information about what type of indexing operation must be processed. IndexingCommands are create "on the fly"
047 * via a Synchronous event listener and at post commit time the system will merge the commands and execute worker to
048 * process them.
049 */
050public class IndexingCommand implements Serializable {
051
052    private static final Log log = LogFactory.getLog(IndexingCommand.class);
053
054    private static final long serialVersionUID = 1L;
055
056    public enum Type {
057        INSERT, UPDATE, UPDATE_SECURITY, DELETE, UPDATE_DIRECT_CHILDREN,
058    }
059
060    public static final String PREFIX = "IxCd-";
061
062    protected String id;
063
064    protected Type type;
065
066    protected boolean sync;
067
068    protected boolean recurse;
069
070    protected String targetDocumentId;
071
072    protected String path;
073
074    protected String repositoryName;
075
076    protected List<String> schemas;
077
078    protected long order;
079
080    protected transient String sessionId;
081
082    protected transient static AtomicLong seq = new AtomicLong(0);
083
084    protected IndexingCommand() {
085    }
086
087    /**
088     * Create an indexing command
089     *
090     * @param document the target document
091     * @param commandType the type of command
092     * @param sync if true the command will be processed on the same thread after transaction completion and the
093     *            Elasticsearch index will be refresh
094     * @param recurse the command affect the document and all its descendants
095     */
096    public IndexingCommand(DocumentModel document, Type commandType, boolean sync, boolean recurse) {
097        id = PREFIX + seq.incrementAndGet();
098        type = commandType;
099        this.sync = sync;
100        this.recurse = recurse;
101        if ((sync && recurse) && commandType != Type.DELETE) {
102            // we don't want sync and recursive command
103            throw new IllegalArgumentException(
104                    "Recurse and synchronous command is not allowed: cmd: " + this + ", doc: " + document);
105        }
106        if (document == null) {
107            throw new IllegalArgumentException("Target document is null for: " + this);
108        }
109        DocumentModel targetDocument = getValidTargetDocument(document);
110        repositoryName = targetDocument.getRepositoryName();
111        targetDocumentId = targetDocument.getId();
112        sessionId = targetDocument.getSessionId();
113        path = targetDocument.getPathAsString();
114        if (targetDocumentId == null) {
115            throw new IllegalArgumentException("Target document has a null uid: " + this);
116        }
117    }
118
119    protected DocumentModel getValidTargetDocument(DocumentModel target) {
120        if (target.getId() != null) {
121            return target;
122        }
123        if (target.getRef() == null || target.getCoreSession() == null) {
124            throw new IllegalArgumentException("Invalid target document: " + target);
125        }
126        // transient document try to get it from its path
127        DocumentRef documentRef = target.getRef();
128        log.warn("Creating indexing command on a document with a null id, ref: " + documentRef
129                + ", trying to get the docId from its path, activate trace level for more info " + this);
130        if (log.isTraceEnabled()) {
131            Throwable throwable = new Throwable();
132            StringWriter stack = new StringWriter();
133            throwable.printStackTrace(new PrintWriter(stack));
134            log.trace("You should use a document returned by session.createDocument, stack " + stack.toString());
135        }
136        return target.getCoreSession().getDocument(documentRef);
137    }
138
139    public void attach(CoreSession session) {
140        if (!session.getRepositoryName().equals(repositoryName)) {
141            throw new IllegalArgumentException(
142                    "Invalid session, expected repo: " + repositoryName + " actual: " + session.getRepositoryName());
143        }
144        sessionId = session.getSessionId();
145        assert sessionId != null : "Attach to session with a null sessionId";
146    }
147
148    /**
149     * Return the document or null if it does not exists anymore.
150     *
151     * @throws java.lang.IllegalStateException if there is no session attached
152     */
153    public DocumentModel getTargetDocument() {
154        CoreSession session = null;
155        if (sessionId != null) {
156            session = Framework.getService(CoreSessionService.class).getCoreSession(sessionId);
157        }
158        if (session == null) {
159            throw new IllegalStateException("Command is not attached to a valid session: " + this);
160        }
161        IdRef idref = new IdRef(targetDocumentId);
162        if (!session.exists(idref)) {
163            // Doc was deleted : no way we can fetch it
164            return null;
165        }
166        return session.getDocument(idref);
167    }
168
169    public String getRepositoryName() {
170        return repositoryName;
171    }
172
173    /**
174     * @return true if merged
175     */
176    public boolean merge(IndexingCommand other) {
177        if (canBeMerged(other)) {
178            merge(other.sync, other.recurse);
179            return true;
180        }
181        return false;
182    }
183
184    protected void merge(boolean sync, boolean recurse) {
185        this.sync = this.sync || sync;
186        this.recurse = this.recurse || recurse;
187    }
188
189    protected boolean canBeMerged(IndexingCommand other) {
190        if (type != other.type) {
191            return false;
192        }
193        if (type == Type.DELETE) {
194            // we support recursive sync deletion
195            return true;
196        }
197        // only if the result is not a sync and recurse command
198        return !((other.sync || sync) && (other.recurse || recurse));
199    }
200
201    public boolean isSync() {
202        return sync;
203    }
204
205    public boolean isRecurse() {
206        return recurse;
207    }
208
209    public Type getType() {
210        return type;
211    }
212
213    public String toJSON() throws IOException {
214        StringWriter out = new StringWriter();
215        JsonFactory factory = new JsonFactory();
216        JsonGenerator jsonGen = factory.createJsonGenerator(out);
217        toJSON(jsonGen);
218        out.flush();
219        jsonGen.close();
220        return out.toString();
221    }
222
223    public void toJSON(JsonGenerator jsonGen) throws IOException {
224        jsonGen.writeStartObject();
225        jsonGen.writeStringField("id", id);
226        jsonGen.writeStringField("type", String.format("%s", type));
227        jsonGen.writeStringField("docId", getTargetDocumentId());
228        jsonGen.writeStringField("path", path);
229        jsonGen.writeStringField("repo", getRepositoryName());
230        jsonGen.writeBooleanField("recurse", recurse);
231        jsonGen.writeBooleanField("sync", sync);
232        jsonGen.writeNumberField("order", getOrder());
233        jsonGen.writeEndObject();
234    }
235
236    /**
237     * Create a command from a JSON string.
238     *
239     * @throws IllegalArgumentException if json is invalid or command is invalid
240     */
241    public static IndexingCommand fromJSON(String json) {
242        JsonFactory jsonFactory = new JsonFactory();
243        ObjectMapper mapper = new ObjectMapper(jsonFactory);
244        try {
245            return fromJSON(mapper.readTree(json));
246        } catch (IOException e) {
247            throw new IllegalArgumentException("Invalid JSON: " + json, e);
248        }
249    }
250
251    public static IndexingCommand fromJSON(JsonNode jsonNode) {
252        IndexingCommand cmd = new IndexingCommand();
253        Iterator<Map.Entry<String, JsonNode>> fieldsIterator = jsonNode.getFields();
254        while (fieldsIterator.hasNext()) {
255            Map.Entry<String, JsonNode> field = fieldsIterator.next();
256            String key = field.getKey();
257            JsonNode value = field.getValue();
258            if (value.isNull()) {
259                continue;
260            }
261            if ("type".equals(key)) {
262                cmd.type = Type.valueOf(value.getTextValue());
263            } else if ("docId".equals(key)) {
264                cmd.targetDocumentId = value.getTextValue();
265            } else if ("path".equals(key)) {
266                cmd.path = value.getTextValue();
267            } else if ("repo".equals(key)) {
268                cmd.repositoryName = value.getTextValue();
269            } else if ("id".equals(key)) {
270                cmd.id = value.getTextValue();
271            } else if ("recurse".equals(key)) {
272                cmd.recurse = value.getBooleanValue();
273            } else if ("sync".equals(key)) {
274                cmd.sync = value.getBooleanValue();
275            }
276        }
277        if (cmd.targetDocumentId == null) {
278            throw new IllegalArgumentException("Document uid is null: " + cmd);
279        }
280        if (cmd.type == null) {
281            throw new IllegalArgumentException("Invalid type: " + cmd);
282        }
283        return cmd;
284    }
285
286    public String getId() {
287        return id;
288    }
289
290    public String getTargetDocumentId() {
291        return targetDocumentId;
292    }
293
294    public IndexingCommand clone(DocumentModel newDoc) {
295        return new IndexingCommand(newDoc, type, sync, recurse);
296    }
297
298    public String[] getSchemas() {
299        String[] ret = null;
300        if (schemas != null && schemas.size() > 0) {
301            ret = schemas.toArray(new String[schemas.size()]);
302        }
303        return ret;
304    }
305
306    public void addSchemas(String schema) {
307        if (schemas == null) {
308            schemas = new ArrayList<>();
309        }
310        if (!schemas.contains(schema)) {
311            schemas.add(schema);
312        }
313    }
314
315    @Override
316    public String toString() {
317        try {
318            return toJSON();
319        } catch (IOException e) {
320            return super.toString();
321        }
322    }
323
324    /**
325     * Try to make the command synchronous. Recurse command will stay in async for update.
326     */
327    public void makeSync() {
328        if (!sync) {
329            if (!recurse || type == Type.DELETE) {
330                sync = true;
331                if (log.isDebugEnabled()) {
332                    log.debug("Turn command into sync: " + toString());
333                }
334            }
335        }
336    }
337
338    // @since 8.3
339    public long getOrder() {
340        return order;
341    }
342
343    // @since 8.3
344    public void setOrder(long order) {
345        this.order = order;
346    }
347}