001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.marklogic;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
032
033import java.io.Serializable;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Calendar;
037import java.util.Collections;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.Set;
042import java.util.UUID;
043import java.util.function.Function;
044import java.util.stream.Collectors;
045
046import javax.resource.spi.ConnectionManager;
047
048import org.apache.commons.lang.StringUtils;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.nuxeo.ecm.core.api.DocumentNotFoundException;
052import org.nuxeo.ecm.core.api.Lock;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.api.PartialList;
055import org.nuxeo.ecm.core.api.ScrollResult;
056import org.nuxeo.ecm.core.api.ScrollResultImpl;
057import org.nuxeo.ecm.core.blob.BlobManager;
058import org.nuxeo.ecm.core.model.Repository;
059import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
060import org.nuxeo.ecm.core.schema.TypeConstants;
061import org.nuxeo.ecm.core.schema.types.ComplexType;
062import org.nuxeo.ecm.core.schema.types.ListType;
063import org.nuxeo.ecm.core.schema.types.Type;
064import org.nuxeo.ecm.core.storage.State;
065import org.nuxeo.ecm.core.storage.State.StateDiff;
066import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
067import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
068import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
069import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
070import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery;
071import org.nuxeo.runtime.api.Framework;
072
073import com.google.common.base.Strings;
074import com.marklogic.xcc.AdhocQuery;
075import com.marklogic.xcc.Content;
076import com.marklogic.xcc.ContentFactory;
077import com.marklogic.xcc.ContentSource;
078import com.marklogic.xcc.ContentSourceFactory;
079import com.marklogic.xcc.ModuleInvoke;
080import com.marklogic.xcc.ResultSequence;
081import com.marklogic.xcc.Session;
082import com.marklogic.xcc.exceptions.RequestException;
083
084/**
085 * MarkLogic implementation of a {@link Repository}.
086 *
087 * @since 8.3
088 */
089public class MarkLogicRepository extends DBSRepositoryBase {
090
091    private static final Log log = LogFactory.getLog(MarkLogicRepository.class);
092
093    private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id);
094
095    public static final String DB_DEFAULT = "nuxeo";
096
097    protected static final String NOSCROLL_ID = "noscroll";
098
099    protected ContentSource xccContentSource;
100
101    /** Last value used from the in-memory sequence. Used by unit tests. */
102    protected long sequenceLastValue;
103
104    protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes;
105
106    public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) {
107        super(cm, descriptor.name, descriptor);
108        xccContentSource = newMarkLogicContentSource(descriptor);
109        rangeElementIndexes = descriptor.rangeElementIndexes.stream()
110                                                            .map(MarkLogicRangeElementIndexDescriptor::new)
111                                                            .collect(Collectors.toList());
112        initRepository();
113    }
114
115    @Override
116    public List<IdType> getAllowedIdTypes() {
117        return Collections.singletonList(IdType.varchar);
118    }
119
120    // used also by unit tests
121    public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) {
122        String host = descriptor.host;
123        Integer port = descriptor.port;
124        if (StringUtils.isBlank(host) || port == null) {
125            throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor");
126        }
127        String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT);
128        String user = descriptor.user;
129        String password = descriptor.password;
130        return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname);
131    }
132
133    protected void initRepository() {
134        if (readState(getRootId()) == null) {
135            initRoot();
136        }
137    }
138
139    @Override
140    public String generateNewId() {
141        if (DEBUG_UUIDS) {
142            Long id = getNextSequenceId();
143            return "UUID_" + id;
144        }
145        return UUID.randomUUID().toString();
146    }
147
148    // Used by unit tests
149    protected synchronized Long getNextSequenceId() {
150        sequenceLastValue++;
151        return Long.valueOf(sequenceLastValue);
152    }
153
154    @Override
155    public State readState(String id) {
156        if (log.isTraceEnabled()) {
157            log.trace("MarkLogic: READ " + id);
158        }
159        try (Session session = xccContentSource.newSession()) {
160            String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')";
161            AdhocQuery request = session.newAdhocQuery(query);
162            // ResultSequence will be closed by Session close
163            ResultSequence rs = session.submitRequest(request);
164            if (rs.hasNext()) {
165                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
166            }
167            return null;
168        } catch (RequestException e) {
169            throw new NuxeoException("An exception happened during xcc call", e);
170        }
171    }
172
173    @Override
174    public List<State> readStates(List<String> ids) {
175        if (log.isTraceEnabled()) {
176            log.trace("MarkLogic: READ " + ids);
177        }
178        try (Session session = xccContentSource.newSession()) {
179            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
180                    Collectors.joining(",", "fn:doc((", "))"));
181            AdhocQuery request = session.newAdhocQuery(query);
182            // ResultSequence will be closed by Session close
183            ResultSequence rs = session.submitRequest(request);
184            return Arrays.stream(rs.asStrings())
185                         .map(MarkLogicStateDeserializer::deserialize)
186                         .collect(Collectors.toList());
187        } catch (RequestException e) {
188            throw new NuxeoException("An exception happened during xcc call", e);
189        }
190    }
191
192    @Override
193    public void createState(State state) {
194        String id = state.get(KEY_ID).toString();
195        if (log.isTraceEnabled()) {
196            log.trace("MarkLogic: CREATE " + id + ": " + state);
197        }
198        try (Session session = xccContentSource.newSession()) {
199            session.insertContent(convert(state));
200        } catch (RequestException e) {
201            throw new NuxeoException("An exception happened during xcc call", e);
202        }
203    }
204
205    @Override
206    public void createStates(List<State> states) {
207        if (log.isTraceEnabled()) {
208            log.trace("MarkLogic: CREATE ["
209                    + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", "))
210                    + "]: " + states);
211        }
212        try (Session session = xccContentSource.newSession()) {
213            Content[] contents = states.stream().map(this::convert).toArray(Content[]::new);
214            session.insertContent(contents);
215        } catch (RequestException e) {
216            throw new NuxeoException("An exception happened during xcc call", e);
217        }
218    }
219
220    private Content convert(State state) {
221        String id = state.get(KEY_ID).toString();
222        return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null);
223    }
224
225    @Override
226    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
227        // TODO changeTokenUpdater
228        String patch = MarkLogicStateSerializer.serialize(diff);
229        if (log.isTraceEnabled()) {
230            log.trace("MarkLogic: UPDATE " + id + ": " + patch);
231        }
232        try (Session session = xccContentSource.newSession()) {
233            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy");
234            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
235            request.setNewStringVariable("patch-string", patch);
236            // ResultSequence will be closed by Session close
237            session.submitRequest(request);
238        } catch (RequestException e) {
239            throw new NuxeoException("An exception happened during xcc call", e);
240        }
241    }
242
243    @Override
244    public void deleteStates(Set<String> ids) {
245        if (log.isTraceEnabled()) {
246            log.trace("MarkLogic: DELETE " + ids);
247        }
248        try (Session session = xccContentSource.newSession()) {
249            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
250                    Collectors.joining(",", "xdmp:document-delete((", "))"));
251            AdhocQuery request = session.newAdhocQuery(query);
252            // ResultSequence will be closed by Session close
253            session.submitRequest(request);
254        } catch (RequestException e) {
255            throw new NuxeoException("An exception happened during xcc call", e);
256        }
257    }
258
259    @Override
260    public State readChildState(String parentId, String name, Set<String> ignored) {
261        String query = getChildQuery(parentId, name, ignored);
262        return findOne(query);
263    }
264
265    @Override
266    public boolean hasChild(String parentId, String name, Set<String> ignored) {
267        String query = getChildQuery(parentId, name, ignored);
268        return exist(query);
269    }
270
271    private String getChildQuery(String parentId, String name, Set<String> ignored) {
272        return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId)
273                                                                   .eq(KEY_NAME, name)
274                                                                   .notIn(KEY_ID, ignored)
275                                                                   .build();
276    }
277
278    @Override
279    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
280        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
281        builder.eq(key, value);
282        builder.notIn(KEY_ID, ignored);
283        return findAll(builder.build());
284    }
285
286    @Override
287    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
288        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
289        builder.eq(key1, value1);
290        builder.eq(key2, value2);
291        builder.notIn(KEY_ID, ignored);
292        return findAll(builder.build());
293    }
294
295    @Override
296    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
297            Map<String, Object[]> targetProxies) {
298        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
299        builder.eq(key, value);
300        for (State state : findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, KEY_PROXY_IDS)) {
301            String id = (String) state.get(KEY_ID);
302            ids.add(id);
303            if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
304                String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
305                proxyTargets.put(id, targetId);
306            }
307            if (targetProxies != null) {
308                Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
309                if (proxyIds != null) {
310                    targetProxies.put(id, proxyIds);
311                }
312            }
313        }
314    }
315
316    @Override
317    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
318        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
319        builder.eq(key, value);
320        builder.notIn(KEY_ID, ignored);
321        return exist(builder.build());
322    }
323
324    @Override
325    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
326            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
327        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments,
328                rangeElementIndexes);
329        MarkLogicQuery query = builder.buildQuery();
330        // Don't do manual projection if there are no projection wildcards, as this brings no new
331        // information and is costly. The only difference is several identical rows instead of one.
332        boolean manualProjection = builder.doManualProjection();
333        if (manualProjection) {
334            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
335            // so we need the full state from the database
336            evaluator.parse();
337        }
338        String searchQuery = query.getSearchQuery(limit, offset);
339        if (log.isTraceEnabled()) {
340            logQuery(searchQuery);
341        }
342        // Run query
343        try (Session session = xccContentSource.newSession()) {
344            AdhocQuery request = session.newAdhocQuery(searchQuery);
345            // ResultSequence will be closed by Session close
346            ResultSequence rs = session.submitRequest(request);
347
348            List<Map<String, Serializable>> projections = new ArrayList<>(limit);
349            for (String rsItem : rs.asStrings()) {
350                State state = MarkLogicStateDeserializer.deserialize(rsItem);
351                if (manualProjection) {
352                    projections.addAll(evaluator.matches(state));
353                } else {
354                    projections.add(DBSStateFlattener.flatten(state));
355                }
356            }
357            long totalSize;
358            if (countUpTo == -1) {
359                // count full size
360                if (limit == 0) {
361                    totalSize = projections.size();
362                } else {
363                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery());
364                    // ResultSequence will be closed by Session close
365                    ResultSequence countRs = session.submitRequest(countRequest);
366                    totalSize = Long.parseLong(countRs.asStrings()[0]);
367                }
368            } else if (countUpTo == 0) {
369                // no count
370                totalSize = -1; // not counted
371            } else {
372                // count only if less than countUpTo
373                if (limit == 0) {
374                    totalSize = projections.size();
375                } else {
376                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1));
377                    // ResultSequence will be closed by Session close
378                    ResultSequence countRs = session.submitRequest(countRequest);
379                    totalSize = Long.parseLong(countRs.asStrings()[0]);
380                }
381                if (totalSize > countUpTo) {
382                    totalSize = -2; // truncated
383                }
384            }
385
386            if (log.isTraceEnabled() && projections.size() != 0) {
387                log.trace("MarkLogic:    -> " + projections.size());
388            }
389            return new PartialList<>(projections, totalSize);
390        } catch (RequestException e) {
391            throw new NuxeoException("An exception happened during xcc call when executing '" + evaluator + "'", e);
392        }
393    }
394
395    @Override
396    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) {
397        // Not yet implemented, return all result in one shot for now
398        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes);
399        String query = builder.buildQuery().getSearchQuery();
400        // Run query
401        try (Session session = xccContentSource.newSession()) {
402            AdhocQuery request = session.newAdhocQuery(query);
403            // ResultSequence will be closed by Session close
404            ResultSequence rs = session.submitRequest(request);
405            return Arrays.stream(rs.asStrings())
406                         .map(MarkLogicStateDeserializer::deserialize)
407                         .map(state -> state.get(KEY_ID).toString())
408                         .collect(Collectors.collectingAndThen(Collectors.toList(),
409                                 ids -> new ScrollResultImpl(NOSCROLL_ID, ids)));
410        } catch (RequestException e) {
411            throw new NuxeoException("An exception happened during xcc call", e);
412        }
413    }
414
415    @Override
416    public ScrollResult scroll(String scrollId) {
417        if (NOSCROLL_ID.equals(scrollId)) {
418            // there is only one batch
419            return emptyResult();
420        }
421        throw new NuxeoException("Unknown or timed out scrollId");
422    }
423
424    @Override
425    public Lock getLock(String id) {
426        // TODO test performance : retrieve document with read or search document with extract
427        // TODO retrieve only some field
428        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
429        State state = readState(id);
430        if (state == null) {
431            throw new DocumentNotFoundException(id);
432        }
433        String owner = (String) state.get(KEY_LOCK_OWNER);
434        if (owner == null) {
435            // not locked
436            return null;
437        }
438        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
439        return new Lock(owner, created);
440    }
441
442    @Override
443    public Lock setLock(String id, Lock lock) {
444        State state = new State();
445        state.put(KEY_LOCK_OWNER, lock.getOwner());
446        state.put(KEY_LOCK_CREATED, lock.getCreated());
447        String lockString = MarkLogicStateSerializer.serialize(state);
448        if (log.isTraceEnabled()) {
449            log.trace("MarkLogic: SETLOCK " + id + ": " + lockString);
450        }
451        try (Session session = xccContentSource.newSession()) {
452            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy");
453            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
454            request.setNewStringVariable("lock-string", lockString);
455            // ResultSequence will be closed by Session close
456            ResultSequence result = session.submitRequest(request);
457            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
458            return extractLock(resultState);
459        } catch (RequestException e) {
460            if ("Document not found".equals(e.getMessage())) {
461                throw new DocumentNotFoundException(id, e);
462            }
463            throw new NuxeoException("An exception happened during xcc call", e);
464        }
465        // TODO check how the concurrent exception is raised
466    }
467
468    @Override
469    public Lock removeLock(String id, String owner) {
470        if (log.isTraceEnabled()) {
471            log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner);
472        }
473        try (Session session = xccContentSource.newSession()) {
474            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy");
475            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
476            request.setNewStringVariable("owner", Strings.nullToEmpty(owner));
477            // ResultSequence will be closed by Session close
478            ResultSequence result = session.submitRequest(request);
479            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
480            return extractLock(resultState);
481        } catch (RequestException e) {
482            if ("Document not found".equals(e.getMessage())) {
483                throw new DocumentNotFoundException(id, e);
484            }
485            throw new NuxeoException("An exception happened during xcc call", e);
486        }
487    }
488
489    private Lock extractLock(State state) {
490        if (state.isEmpty()) {
491            return null;
492        }
493        String owner = (String) state.get(KEY_LOCK_OWNER);
494        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
495        Boolean failed = (Boolean) state.get("failed");
496        return new Lock(owner, created, Boolean.TRUE.equals(failed));
497    }
498
499    @Override
500    public void closeLockManager() {
501    }
502
503    @Override
504    public void clearLockManagerCaches() {
505    }
506
507    protected List<String> binaryPaths;
508
509    @Override
510    protected void initBlobsPaths() {
511        MarkLogicBlobFinder finder = new MarkLogicBlobFinder();
512        finder.visit();
513        binaryPaths = finder.binaryPaths;
514    }
515
516    protected static class MarkLogicBlobFinder extends BlobFinder {
517        protected List<String> binaryPaths = new ArrayList<>();
518
519        @Override
520        protected void recordBlobPath() {
521            path.addLast(KEY_BLOB_DATA);
522            StringBuilder binaryPath = new StringBuilder();
523            MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager();
524            Type previousType = null;
525            for (String element : path) {
526                if (binaryPath.length() > 0) {
527                    binaryPath.append('/');
528                }
529                // Append current element to path
530                binaryPath.append(element);
531                if (previousType == null) {
532                    // No previous type - it's the first element
533                    previousType = schemaManager.computeField(String.join(".", path), element).getType();
534                } else if (previousType.isComplexType()) {
535                    // Previous type is a complex type - retrieve the type of current element
536                    if (TypeConstants.isContentType(previousType)) {
537                        // The complex type hold the binary data, it's the last element, so break the loop
538                        break;
539                    }
540                    previousType = ((ComplexType) previousType).getField(element).getType();
541                }
542                // Add the item array element if type is a list (here previousType is the type of current element)
543                // Here we re allocate previousType to item array type as there's no element in this.path to select item
544                if (previousType.isListType()) {
545                    binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX);
546                    previousType = ((ListType) previousType).getFieldType();
547                }
548            }
549            binaryPaths.add(binaryPath.toString());
550            path.removeLast();
551        }
552    }
553
554    @Override
555    public void markReferencedBinaries() {
556        BlobManager blobManager = Framework.getService(BlobManager.class);
557        // TODO add a query to not scan all documents
558        String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build();
559        for (State state : findAll(query, binaryPaths.toArray(new String[0]))) {
560            markReferencedBinaries(state, blobManager);
561        }
562    }
563
564    protected void markReferencedBinaries(State state, BlobManager blobManager) {
565        for (Entry<String, Serializable> entry: state.entrySet()) {
566            Serializable value = entry.getValue();
567            if (value instanceof List) {
568                List<?> list = (List<?>) value;
569                for (Object v : list) {
570                    if (v instanceof State) {
571                        markReferencedBinaries((State) v, blobManager);
572                    } else {
573                        markReferencedBinary(v, blobManager);
574                    }
575                }
576            } else if (value instanceof Object[]) {
577                for (Object v : (Object[]) value) {
578                    markReferencedBinary(v, blobManager);
579                }
580            } else if (value instanceof State) {
581                markReferencedBinaries((State) value, blobManager);
582            } else {
583                markReferencedBinary(value, blobManager);
584            }
585        }
586    }
587
588    protected void markReferencedBinary(Object value, BlobManager blobManager) {
589        if (!(value instanceof String)) {
590            return;
591        }
592        String key = (String) value;
593        blobManager.markReferencedBinary(key, repositoryName);
594    }
595
596    private void logQuery(String query) {
597        log.trace("MarkLogic: QUERY " + query);
598    }
599
600    private boolean exist(String ctsQuery) {
601        // first build exist query from cts query
602        String query = "xdmp:exists(" + ctsQuery + ")";
603        if (log.isTraceEnabled()) {
604            logQuery(query);
605        }
606        // Run query
607        try (Session session = xccContentSource.newSession()) {
608            AdhocQuery request = session.newAdhocQuery(query);
609            // ResultSequence will be closed by Session close
610            ResultSequence rs = session.submitRequest(request);
611            return Boolean.parseBoolean(rs.asString());
612        } catch (RequestException e) {
613            throw new NuxeoException("An exception happened during xcc call", e);
614        }
615    }
616
617    private State findOne(String ctsQuery) {
618        // first add limit to ctsQuery
619        String query = ctsQuery + "[1 to 1]";
620        if (log.isTraceEnabled()) {
621            logQuery(query);
622        }
623        // Run query
624        try (Session session = xccContentSource.newSession()) {
625            AdhocQuery request = session.newAdhocQuery(query);
626            // ResultSequence will be closed by Session close
627            ResultSequence rs = session.submitRequest(request);
628            if (rs.hasNext()) {
629                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
630            }
631            return null;
632        } catch (RequestException e) {
633            throw new NuxeoException("An exception happened during xcc call", e);
634        }
635    }
636
637    private List<State> findAll(String ctsQuery, String... selects) {
638        String query = ctsQuery;
639        if (selects.length > 0) {
640            query = "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n"
641                    + "let $paths := (" + Arrays.stream(selects)
642                                                .map(MarkLogicHelper::serializeKey)
643                                                .map(select -> "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/" + select
644                                                        + "\"")
645                                                .collect(Collectors.joining(",\n"))
646                    + ")let $namespaces := ()\n" + "for $i in " + query
647                    + " return extract:extract-nodes($i, $paths, $namespaces)";
648        }
649        if (log.isTraceEnabled()) {
650            logQuery(query);
651        }
652        // Run query
653        try (Session session = xccContentSource.newSession()) {
654            AdhocQuery request = session.newAdhocQuery(query);
655            // ResultSequence will be closed by Session close
656            ResultSequence rs = session.submitRequest(request);
657            return Arrays.stream(rs.asStrings())
658                         .map(MarkLogicStateDeserializer::deserialize)
659                         .collect(Collectors.toList());
660        } catch (RequestException e) {
661            throw new NuxeoException("An exception happened during xcc call", e);
662        }
663    }
664
665}