001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.marklogic;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
032
033import java.io.Serializable;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Calendar;
037import java.util.Collections;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.Set;
042import java.util.UUID;
043import java.util.function.Function;
044import java.util.stream.Collectors;
045
046import javax.resource.spi.ConnectionManager;
047
048import org.apache.commons.lang.StringUtils;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.nuxeo.ecm.core.api.DocumentNotFoundException;
052import org.nuxeo.ecm.core.api.Lock;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.api.PartialList;
055import org.nuxeo.ecm.core.api.ScrollResult;
056import org.nuxeo.ecm.core.api.ScrollResultImpl;
057import org.nuxeo.ecm.core.blob.BlobManager;
058import org.nuxeo.ecm.core.model.Repository;
059import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
060import org.nuxeo.ecm.core.schema.TypeConstants;
061import org.nuxeo.ecm.core.schema.types.ComplexType;
062import org.nuxeo.ecm.core.schema.types.ListType;
063import org.nuxeo.ecm.core.schema.types.Type;
064import org.nuxeo.ecm.core.storage.State;
065import org.nuxeo.ecm.core.storage.State.StateDiff;
066import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
067import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
068import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
069import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery;
070import org.nuxeo.runtime.api.Framework;
071
072import com.google.common.base.Strings;
073import com.marklogic.xcc.AdhocQuery;
074import com.marklogic.xcc.Content;
075import com.marklogic.xcc.ContentFactory;
076import com.marklogic.xcc.ContentSource;
077import com.marklogic.xcc.ContentSourceFactory;
078import com.marklogic.xcc.ModuleInvoke;
079import com.marklogic.xcc.ResultSequence;
080import com.marklogic.xcc.Session;
081import com.marklogic.xcc.exceptions.RequestException;
082
083/**
084 * MarkLogic implementation of a {@link Repository}.
085 *
086 * @since 8.3
087 */
088public class MarkLogicRepository extends DBSRepositoryBase {
089
090    private static final Log log = LogFactory.getLog(MarkLogicRepository.class);
091
092    private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id);
093
094    public static final String DB_DEFAULT = "nuxeo";
095
096    protected static final String NOSCROLL_ID = "noscroll";
097
098    protected ContentSource xccContentSource;
099
100    /** Last value used from the in-memory sequence. Used by unit tests. */
101    protected long sequenceLastValue;
102
103    protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes;
104
105    public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) {
106        super(cm, descriptor.name, descriptor);
107        xccContentSource = newMarkLogicContentSource(descriptor);
108        rangeElementIndexes = descriptor.rangeElementIndexes.stream()
109                                                            .map(MarkLogicRangeElementIndexDescriptor::new)
110                                                            .collect(Collectors.toList());
111        initRepository();
112    }
113
114    @Override
115    public List<IdType> getAllowedIdTypes() {
116        return Collections.singletonList(IdType.varchar);
117    }
118
119    // used also by unit tests
120    public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) {
121        String host = descriptor.host;
122        Integer port = descriptor.port;
123        if (StringUtils.isBlank(host) || port == null) {
124            throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor");
125        }
126        String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT);
127        String user = descriptor.user;
128        String password = descriptor.password;
129        return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname);
130    }
131
132    protected void initRepository() {
133        if (readState(getRootId()) == null) {
134            initRoot();
135        }
136    }
137
138    @Override
139    public String generateNewId() {
140        if (DEBUG_UUIDS) {
141            Long id = getNextSequenceId();
142            return "UUID_" + id;
143        }
144        return UUID.randomUUID().toString();
145    }
146
147    // Used by unit tests
148    protected synchronized Long getNextSequenceId() {
149        sequenceLastValue++;
150        return Long.valueOf(sequenceLastValue);
151    }
152
153    @Override
154    public State readState(String id) {
155        if (log.isTraceEnabled()) {
156            log.trace("MarkLogic: READ " + id);
157        }
158        try (Session session = xccContentSource.newSession()) {
159            String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')";
160            AdhocQuery request = session.newAdhocQuery(query);
161            // ResultSequence will be closed by Session close
162            ResultSequence rs = session.submitRequest(request);
163            if (rs.hasNext()) {
164                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
165            }
166            return null;
167        } catch (RequestException e) {
168            throw new NuxeoException("An exception happened during xcc call", e);
169        }
170    }
171
172    @Override
173    public List<State> readStates(List<String> ids) {
174        if (log.isTraceEnabled()) {
175            log.trace("MarkLogic: READ " + ids);
176        }
177        try (Session session = xccContentSource.newSession()) {
178            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
179                    Collectors.joining(",", "fn:doc((", "))"));
180            AdhocQuery request = session.newAdhocQuery(query);
181            // ResultSequence will be closed by Session close
182            ResultSequence rs = session.submitRequest(request);
183            return Arrays.stream(rs.asStrings())
184                         .map(MarkLogicStateDeserializer::deserialize)
185                         .collect(Collectors.toList());
186        } catch (RequestException e) {
187            throw new NuxeoException("An exception happened during xcc call", e);
188        }
189    }
190
191    @Override
192    public void createState(State state) {
193        String id = state.get(KEY_ID).toString();
194        if (log.isTraceEnabled()) {
195            log.trace("MarkLogic: CREATE " + id + ": " + state);
196        }
197        try (Session session = xccContentSource.newSession()) {
198            session.insertContent(convert(state));
199        } catch (RequestException e) {
200            throw new NuxeoException("An exception happened during xcc call", e);
201        }
202    }
203
204    @Override
205    public void createStates(List<State> states) {
206        if (log.isTraceEnabled()) {
207            log.trace("MarkLogic: CREATE ["
208                    + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", "))
209                    + "]: " + states);
210        }
211        try (Session session = xccContentSource.newSession()) {
212            Content[] contents = states.stream().map(this::convert).toArray(Content[]::new);
213            session.insertContent(contents);
214        } catch (RequestException e) {
215            throw new NuxeoException("An exception happened during xcc call", e);
216        }
217    }
218
219    private Content convert(State state) {
220        String id = state.get(KEY_ID).toString();
221        return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null);
222    }
223
224    @Override
225    public void updateState(String id, StateDiff diff) {
226        String patch = MarkLogicStateSerializer.serialize(diff);
227        if (log.isTraceEnabled()) {
228            log.trace("MarkLogic: UPDATE " + id + ": " + patch);
229        }
230        try (Session session = xccContentSource.newSession()) {
231            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy");
232            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
233            request.setNewStringVariable("patch-string", patch);
234            // ResultSequence will be closed by Session close
235            session.submitRequest(request);
236        } catch (RequestException e) {
237            throw new NuxeoException("An exception happened during xcc call", e);
238        }
239    }
240
241    @Override
242    public void deleteStates(Set<String> ids) {
243        if (log.isTraceEnabled()) {
244            log.trace("MarkLogic: DELETE " + ids);
245        }
246        try (Session session = xccContentSource.newSession()) {
247            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
248                    Collectors.joining(",", "xdmp:document-delete((", "))"));
249            AdhocQuery request = session.newAdhocQuery(query);
250            // ResultSequence will be closed by Session close
251            session.submitRequest(request);
252        } catch (RequestException e) {
253            throw new NuxeoException("An exception happened during xcc call", e);
254        }
255    }
256
257    @Override
258    public State readChildState(String parentId, String name, Set<String> ignored) {
259        String query = getChildQuery(parentId, name, ignored);
260        return findOne(query);
261    }
262
263    @Override
264    public boolean hasChild(String parentId, String name, Set<String> ignored) {
265        String query = getChildQuery(parentId, name, ignored);
266        return exist(query);
267    }
268
269    private String getChildQuery(String parentId, String name, Set<String> ignored) {
270        return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId)
271                                                                   .eq(KEY_NAME, name)
272                                                                   .notIn(KEY_ID, ignored)
273                                                                   .build();
274    }
275
276    @Override
277    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
278        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
279        builder.eq(key, value);
280        builder.notIn(KEY_ID, ignored);
281        return findAll(builder.build());
282    }
283
284    @Override
285    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
286        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
287        builder.eq(key1, value1);
288        builder.eq(key2, value2);
289        builder.notIn(KEY_ID, ignored);
290        return findAll(builder.build());
291    }
292
293    @Override
294    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
295            Map<String, Object[]> targetProxies) {
296        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
297        builder.eq(key, value);
298        for (State state : findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, KEY_PROXY_IDS)) {
299            String id = (String) state.get(KEY_ID);
300            ids.add(id);
301            if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
302                String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
303                proxyTargets.put(id, targetId);
304            }
305            if (targetProxies != null) {
306                Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
307                if (proxyIds != null) {
308                    targetProxies.put(id, proxyIds);
309                }
310            }
311        }
312    }
313
314    @Override
315    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
316        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
317        builder.eq(key, value);
318        builder.notIn(KEY_ID, ignored);
319        return exist(builder.build());
320    }
321
322    @Override
323    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
324            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
325        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments,
326                rangeElementIndexes);
327        MarkLogicQuery query = builder.buildQuery();
328        // Don't do manual projection if there are no projection wildcards, as this brings no new
329        // information and is costly. The only difference is several identical rows instead of one.
330        boolean manualProjection = builder.doManualProjection();
331        if (manualProjection) {
332            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
333            // so we need the full state from the database
334            evaluator.parse();
335        }
336        String searchQuery = query.getSearchQuery(limit, offset);
337        if (log.isTraceEnabled()) {
338            logQuery(searchQuery);
339        }
340        // Run query
341        try (Session session = xccContentSource.newSession()) {
342            AdhocQuery request = session.newAdhocQuery(searchQuery);
343            // ResultSequence will be closed by Session close
344            ResultSequence rs = session.submitRequest(request);
345
346            List<Map<String, Serializable>> projections = new ArrayList<>(limit);
347            for (String rsItem : rs.asStrings()) {
348                State state = MarkLogicStateDeserializer.deserialize(rsItem);
349                if (manualProjection) {
350                    projections.addAll(evaluator.matches(state));
351                } else {
352                    projections.add(DBSStateFlattener.flatten(state));
353                }
354            }
355            long totalSize;
356            if (countUpTo == -1) {
357                // count full size
358                if (limit == 0) {
359                    totalSize = projections.size();
360                } else {
361                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery());
362                    // ResultSequence will be closed by Session close
363                    ResultSequence countRs = session.submitRequest(countRequest);
364                    totalSize = Long.parseLong(countRs.asStrings()[0]);
365                }
366            } else if (countUpTo == 0) {
367                // no count
368                totalSize = -1; // not counted
369            } else {
370                // count only if less than countUpTo
371                if (limit == 0) {
372                    totalSize = projections.size();
373                } else {
374                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1));
375                    // ResultSequence will be closed by Session close
376                    ResultSequence countRs = session.submitRequest(countRequest);
377                    totalSize = Long.parseLong(countRs.asStrings()[0]);
378                }
379                if (totalSize > countUpTo) {
380                    totalSize = -2; // truncated
381                }
382            }
383
384            if (log.isTraceEnabled() && projections.size() != 0) {
385                log.trace("MarkLogic:    -> " + projections.size());
386            }
387            return new PartialList<>(projections, totalSize);
388        } catch (RequestException e) {
389            throw new NuxeoException("An exception happened during xcc call", e);
390        }
391    }
392
393    @Override
394    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) {
395        // Not yet implemented, return all result in one shot for now
396        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes);
397        String query = builder.buildQuery().getSearchQuery();
398        // Run query
399        try (Session session = xccContentSource.newSession()) {
400            AdhocQuery request = session.newAdhocQuery(query);
401            // ResultSequence will be closed by Session close
402            ResultSequence rs = session.submitRequest(request);
403            return Arrays.stream(rs.asStrings())
404                         .map(MarkLogicStateDeserializer::deserialize)
405                         .map(state -> state.get(KEY_ID).toString())
406                         .collect(Collectors.collectingAndThen(Collectors.toList(),
407                                 ids -> new ScrollResultImpl(NOSCROLL_ID, ids)));
408        } catch (RequestException e) {
409            throw new NuxeoException("An exception happened during xcc call", e);
410        }
411    }
412
413    @Override
414    public ScrollResult scroll(String scrollId) {
415        if (NOSCROLL_ID.equals(scrollId)) {
416            // there is only one batch
417            return emptyResult();
418        }
419        throw new NuxeoException("Unknown or timed out scrollId");
420    }
421
422    @Override
423    public Lock getLock(String id) {
424        // TODO test performance : retrieve document with read or search document with extract
425        // TODO retrieve only some field
426        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
427        State state = readState(id);
428        if (state == null) {
429            throw new DocumentNotFoundException(id);
430        }
431        String owner = (String) state.get(KEY_LOCK_OWNER);
432        if (owner == null) {
433            // not locked
434            return null;
435        }
436        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
437        return new Lock(owner, created);
438    }
439
440    @Override
441    public Lock setLock(String id, Lock lock) {
442        State state = new State();
443        state.put(KEY_LOCK_OWNER, lock.getOwner());
444        state.put(KEY_LOCK_CREATED, lock.getCreated());
445        String lockString = MarkLogicStateSerializer.serialize(state);
446        if (log.isTraceEnabled()) {
447            log.trace("MarkLogic: SETLOCK " + id + ": " + lockString);
448        }
449        try (Session session = xccContentSource.newSession()) {
450            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy");
451            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
452            request.setNewStringVariable("lock-string", lockString);
453            // ResultSequence will be closed by Session close
454            ResultSequence result = session.submitRequest(request);
455            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
456            return extractLock(resultState);
457        } catch (RequestException e) {
458            if ("Document not found".equals(e.getMessage())) {
459                throw new DocumentNotFoundException(id, e);
460            }
461            throw new NuxeoException("An exception happened during xcc call", e);
462        }
463        // TODO check how the concurrent exception is raised
464    }
465
466    @Override
467    public Lock removeLock(String id, String owner) {
468        if (log.isTraceEnabled()) {
469            log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner);
470        }
471        try (Session session = xccContentSource.newSession()) {
472            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy");
473            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
474            request.setNewStringVariable("owner", Strings.nullToEmpty(owner));
475            // ResultSequence will be closed by Session close
476            ResultSequence result = session.submitRequest(request);
477            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
478            return extractLock(resultState);
479        } catch (RequestException e) {
480            if ("Document not found".equals(e.getMessage())) {
481                throw new DocumentNotFoundException(id, e);
482            }
483            throw new NuxeoException("An exception happened during xcc call", e);
484        }
485    }
486
487    private Lock extractLock(State state) {
488        if (state.isEmpty()) {
489            return null;
490        }
491        String owner = (String) state.get(KEY_LOCK_OWNER);
492        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
493        Boolean failed = (Boolean) state.get("failed");
494        return new Lock(owner, created, Boolean.TRUE.equals(failed));
495    }
496
497    @Override
498    public void closeLockManager() {
499    }
500
501    @Override
502    public void clearLockManagerCaches() {
503    }
504
505    protected List<String> binaryPaths;
506
507    @Override
508    protected void initBlobsPaths() {
509        MarkLogicBlobFinder finder = new MarkLogicBlobFinder();
510        finder.visit();
511        binaryPaths = finder.binaryPaths;
512    }
513
514    protected static class MarkLogicBlobFinder extends BlobFinder {
515        protected List<String> binaryPaths = new ArrayList<>();
516
517        @Override
518        protected void recordBlobPath() {
519            path.addLast(KEY_BLOB_DATA);
520            StringBuilder binaryPath = new StringBuilder();
521            MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager();
522            Type previousType = null;
523            for (String element : path) {
524                if (binaryPath.length() > 0) {
525                    binaryPath.append('/');
526                }
527                // Append current element to path
528                binaryPath.append(element);
529                if (previousType == null) {
530                    // No previous type - it's the first element
531                    previousType = schemaManager.computeField(String.join(".", path), element).getType();
532                } else if (previousType.isComplexType()) {
533                    // Previous type is a complex type - retrieve the type of current element
534                    if (TypeConstants.isContentType(previousType)) {
535                        // The complex type hold the binary data, it's the last element, so break the loop
536                        break;
537                    }
538                    previousType = ((ComplexType) previousType).getField(element).getType();
539                }
540                // Add the item array element if type is a list (here previousType is the type of current element)
541                // Here we re allocate previousType to item array type as there's no element in this.path to select item
542                if (previousType.isListType()) {
543                    binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX);
544                    previousType = ((ListType) previousType).getFieldType();
545                }
546            }
547            binaryPaths.add(binaryPath.toString());
548            path.removeLast();
549        }
550    }
551
552    @Override
553    public void markReferencedBinaries() {
554        BlobManager blobManager = Framework.getService(BlobManager.class);
555        // TODO add a query to not scan all documents
556        String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build();
557        for (State state : findAll(query, binaryPaths.toArray(new String[0]))) {
558            markReferencedBinaries(state, blobManager);
559        }
560    }
561
562    protected void markReferencedBinaries(State state, BlobManager blobManager) {
563        for (Entry<String, Serializable> entry: state.entrySet()) {
564            Serializable value = entry.getValue();
565            if (value instanceof List) {
566                List<?> list = (List<?>) value;
567                for (Object v : list) {
568                    if (v instanceof State) {
569                        markReferencedBinaries((State) v, blobManager);
570                    } else {
571                        markReferencedBinary(v, blobManager);
572                    }
573                }
574            } else if (value instanceof Object[]) {
575                for (Object v : (Object[]) value) {
576                    markReferencedBinary(v, blobManager);
577                }
578            } else if (value instanceof State) {
579                markReferencedBinaries((State) value, blobManager);
580            } else {
581                markReferencedBinary(value, blobManager);
582            }
583        }
584    }
585
586    protected void markReferencedBinary(Object value, BlobManager blobManager) {
587        if (!(value instanceof String)) {
588            return;
589        }
590        String key = (String) value;
591        blobManager.markReferencedBinary(key, repositoryName);
592    }
593
594    private void logQuery(String query) {
595        log.trace("MarkLogic: QUERY " + query);
596    }
597
598    private boolean exist(String ctsQuery) {
599        // first build exist query from cts query
600        String query = "xdmp:exists(" + ctsQuery + ")";
601        if (log.isTraceEnabled()) {
602            logQuery(query);
603        }
604        // Run query
605        try (Session session = xccContentSource.newSession()) {
606            AdhocQuery request = session.newAdhocQuery(query);
607            // ResultSequence will be closed by Session close
608            ResultSequence rs = session.submitRequest(request);
609            return Boolean.parseBoolean(rs.asString());
610        } catch (RequestException e) {
611            throw new NuxeoException("An exception happened during xcc call", e);
612        }
613    }
614
615    private State findOne(String ctsQuery) {
616        // first add limit to ctsQuery
617        String query = ctsQuery + "[1 to 1]";
618        if (log.isTraceEnabled()) {
619            logQuery(query);
620        }
621        // Run query
622        try (Session session = xccContentSource.newSession()) {
623            AdhocQuery request = session.newAdhocQuery(query);
624            // ResultSequence will be closed by Session close
625            ResultSequence rs = session.submitRequest(request);
626            if (rs.hasNext()) {
627                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
628            }
629            return null;
630        } catch (RequestException e) {
631            throw new NuxeoException("An exception happened during xcc call", e);
632        }
633    }
634
635    private List<State> findAll(String ctsQuery, String... selects) {
636        String query = ctsQuery;
637        if (selects.length > 0) {
638            query = "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n"
639                    + "let $paths := (" + Arrays.stream(selects)
640                                                .map(MarkLogicHelper::serializeKey)
641                                                .map(select -> "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/" + select
642                                                        + "\"")
643                                                .collect(Collectors.joining(",\n"))
644                    + ")let $namespaces := ()\n" + "for $i in " + query
645                    + " return extract:extract-nodes($i, $paths, $namespaces)";
646        }
647        if (log.isTraceEnabled()) {
648            logQuery(query);
649        }
650        // Run query
651        try (Session session = xccContentSource.newSession()) {
652            AdhocQuery request = session.newAdhocQuery(query);
653            // ResultSequence will be closed by Session close
654            ResultSequence rs = session.submitRequest(request);
655            return Arrays.stream(rs.asStrings())
656                         .map(MarkLogicStateDeserializer::deserialize)
657                         .collect(Collectors.toList());
658        } catch (RequestException e) {
659            throw new NuxeoException("An exception happened during xcc call", e);
660        }
661    }
662
663}