001/*
002 * (C) Copyright 2016-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.marklogic;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
031
032import java.io.Serializable;
033import java.security.GeneralSecurityException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Calendar;
037import java.util.Collections;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.Set;
042import java.util.Spliterator;
043import java.util.UUID;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import java.util.stream.StreamSupport;
049
050import javax.net.ssl.SSLContext;
051import javax.resource.spi.ConnectionManager;
052
053import org.apache.commons.lang.StringUtils;
054import org.apache.commons.logging.Log;
055import org.apache.commons.logging.LogFactory;
056import org.nuxeo.ecm.core.api.CursorService;
057import org.nuxeo.ecm.core.api.DocumentNotFoundException;
058import org.nuxeo.ecm.core.api.Lock;
059import org.nuxeo.ecm.core.api.NuxeoException;
060import org.nuxeo.ecm.core.api.PartialList;
061import org.nuxeo.ecm.core.api.ScrollResult;
062import org.nuxeo.ecm.core.blob.DocumentBlobManager;
063import org.nuxeo.ecm.core.model.Repository;
064import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
065import org.nuxeo.ecm.core.schema.TypeConstants;
066import org.nuxeo.ecm.core.schema.types.ComplexType;
067import org.nuxeo.ecm.core.schema.types.ListType;
068import org.nuxeo.ecm.core.schema.types.Type;
069import org.nuxeo.ecm.core.storage.State;
070import org.nuxeo.ecm.core.storage.State.StateDiff;
071import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
072import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
073import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
074import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
075import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery;
076import org.nuxeo.runtime.api.Framework;
077
078import com.google.common.base.Strings;
079import com.marklogic.xcc.AdhocQuery;
080import com.marklogic.xcc.Content;
081import com.marklogic.xcc.ContentFactory;
082import com.marklogic.xcc.ContentSource;
083import com.marklogic.xcc.ContentSourceFactory;
084import com.marklogic.xcc.ModuleInvoke;
085import com.marklogic.xcc.RequestOptions;
086import com.marklogic.xcc.ResultItem;
087import com.marklogic.xcc.ResultSequence;
088import com.marklogic.xcc.SecurityOptions;
089import com.marklogic.xcc.Session;
090import com.marklogic.xcc.exceptions.RequestException;
091import com.marklogic.xcc.types.XdmItem;
092
093/**
094 * MarkLogic implementation of a {@link Repository}.
095 *
096 * @since 8.3
097 */
098public class MarkLogicRepository extends DBSRepositoryBase {
099
100    private static final Log log = LogFactory.getLog(MarkLogicRepository.class);
101
102    private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id);
103
104    public static final String DB_DEFAULT = "nuxeo";
105
106    protected ContentSource xccContentSource;
107
108    /** Last value used from the in-memory sequence. Used by unit tests. */
109    protected long sequenceLastValue;
110
111    protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes;
112
113    protected final CursorService<ResultSequence, ResultItem> cursorService = new CursorService<>();
114
115    public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) {
116        super(cm, descriptor.name, descriptor);
117        xccContentSource = newMarkLogicContentSource(descriptor);
118        rangeElementIndexes = descriptor.rangeElementIndexes.stream()
119                                                            .map(MarkLogicRangeElementIndexDescriptor::new)
120                                                            .collect(Collectors.toList());
121        initRepository();
122    }
123
124    @Override
125    public List<IdType> getAllowedIdTypes() {
126        return Collections.singletonList(IdType.varchar);
127    }
128
129    // used also by unit tests
130    public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) {
131        String host = descriptor.host;
132        Integer port = descriptor.port;
133        if (StringUtils.isBlank(host) || port == null) {
134            throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor");
135        }
136        String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT);
137        String user = descriptor.user;
138        String password = descriptor.password;
139        // handle SSL
140        SecurityOptions securityOptions = descriptor.sslEnabled ? newTrustOptions() : null;
141        return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname, securityOptions);
142    }
143
144    protected static SecurityOptions newTrustOptions() {
145        try {
146            SSLContext sslContext = SSLContext.getDefault();
147            return new SecurityOptions(sslContext);
148        } catch (GeneralSecurityException e) {
149            throw new NuxeoException("Unable to initialize Security options for MarkLogic connection.", e);
150        }
151    }
152
153    protected void initRepository() {
154        if (readState(getRootId()) == null) {
155            initRoot();
156        }
157    }
158
159    @Override
160    public String generateNewId() {
161        if (DEBUG_UUIDS) {
162            Long id = getNextSequenceId();
163            return "UUID_" + id;
164        }
165        return UUID.randomUUID().toString();
166    }
167
168    // Used by unit tests
169    protected synchronized Long getNextSequenceId() {
170        sequenceLastValue++;
171        return Long.valueOf(sequenceLastValue);
172    }
173
174    @Override
175    public void shutdown() {
176        super.shutdown();
177        cursorService.clear();
178    }
179
180    @Override
181    public State readState(String id) {
182        if (log.isTraceEnabled()) {
183            log.trace("MarkLogic: READ " + id);
184        }
185        try (Session session = xccContentSource.newSession()) {
186            String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')";
187            AdhocQuery request = session.newAdhocQuery(query);
188            // ResultSequence will be closed by Session close
189            ResultSequence rs = session.submitRequest(request);
190            if (rs.hasNext()) {
191                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
192            }
193            return null;
194        } catch (RequestException e) {
195            throw new NuxeoException("An exception happened during xcc call", e);
196        }
197    }
198
199    @Override
200    public List<State> readStates(List<String> ids) {
201        if (log.isTraceEnabled()) {
202            log.trace("MarkLogic: READ " + ids);
203        }
204        try (Session session = xccContentSource.newSession()) {
205            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
206                    Collectors.joining(",", "fn:doc((", "))"));
207            AdhocQuery request = session.newAdhocQuery(query);
208            // ResultSequence will be closed by Session close
209            ResultSequence rs = session.submitRequest(request);
210            return Arrays.stream(rs.asStrings())
211                         .map(MarkLogicStateDeserializer::deserialize)
212                         .collect(Collectors.toList());
213        } catch (RequestException e) {
214            throw new NuxeoException("An exception happened during xcc call", e);
215        }
216    }
217
218    @Override
219    public void createState(State state) {
220        String id = state.get(KEY_ID).toString();
221        if (log.isTraceEnabled()) {
222            log.trace("MarkLogic: CREATE " + id + ": " + state);
223        }
224        try (Session session = xccContentSource.newSession()) {
225            session.insertContent(convert(state));
226        } catch (RequestException e) {
227            throw new NuxeoException("An exception happened during xcc call", e);
228        }
229    }
230
231    @Override
232    public void createStates(List<State> states) {
233        if (log.isTraceEnabled()) {
234            log.trace("MarkLogic: CREATE ["
235                    + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", "))
236                    + "]: " + states);
237        }
238        try (Session session = xccContentSource.newSession()) {
239            Content[] contents = states.stream().map(this::convert).toArray(Content[]::new);
240            session.insertContent(contents);
241        } catch (RequestException e) {
242            throw new NuxeoException("An exception happened during xcc call", e);
243        }
244    }
245
246    private Content convert(State state) {
247        String id = state.get(KEY_ID).toString();
248        return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null);
249    }
250
251    @Override
252    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
253        // TODO changeTokenUpdater
254        String patch = MarkLogicStateSerializer.serialize(diff);
255        if (log.isTraceEnabled()) {
256            log.trace("MarkLogic: UPDATE " + id + ": " + patch);
257        }
258        try (Session session = xccContentSource.newSession()) {
259            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy");
260            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
261            request.setNewStringVariable("patch-string", patch);
262            // ResultSequence will be closed by Session close
263            session.submitRequest(request);
264        } catch (RequestException e) {
265            throw new NuxeoException("An exception happened during xcc call", e);
266        }
267    }
268
269    @Override
270    public void deleteStates(Set<String> ids) {
271        if (log.isTraceEnabled()) {
272            log.trace("MarkLogic: DELETE " + ids);
273        }
274        try (Session session = xccContentSource.newSession()) {
275            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
276                    Collectors.joining(",", "xdmp:document-delete((", "))"));
277            AdhocQuery request = session.newAdhocQuery(query);
278            // ResultSequence will be closed by Session close
279            session.submitRequest(request);
280        } catch (RequestException e) {
281            throw new NuxeoException("An exception happened during xcc call", e);
282        }
283    }
284
285    @Override
286    public State readChildState(String parentId, String name, Set<String> ignored) {
287        String query = getChildQuery(parentId, name, ignored);
288        return findOne(query);
289    }
290
291    @Override
292    public boolean hasChild(String parentId, String name, Set<String> ignored) {
293        String query = getChildQuery(parentId, name, ignored);
294        return exist(query);
295    }
296
297    private String getChildQuery(String parentId, String name, Set<String> ignored) {
298        return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId)
299                                                                   .eq(KEY_NAME, name)
300                                                                   .notIn(KEY_ID, ignored)
301                                                                   .build();
302    }
303
304    @Override
305    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
306        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
307        builder.eq(key, value);
308        builder.notIn(KEY_ID, ignored);
309        try (Stream<State> states = findAll(builder.build())) {
310            return states.collect(Collectors.toList());
311        }
312    }
313
314    @Override
315    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
316        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
317        builder.eq(key1, value1);
318        builder.eq(key2, value2);
319        builder.notIn(KEY_ID, ignored);
320        try (Stream<State> states = findAll(builder.build())) {
321            return states.collect(Collectors.toList());
322        }
323    }
324
325    @Override
326    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
327            Map<String, Object[]> targetProxies) {
328        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
329        builder.eq(key, value);
330        try (Stream<State> states = findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID,
331                KEY_PROXY_IDS)) {
332            states.forEach(state -> {
333                String id = (String) state.get(KEY_ID);
334                ids.add(id);
335                if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
336                    String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
337                    proxyTargets.put(id, targetId);
338                }
339                if (targetProxies != null) {
340                    Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
341                    if (proxyIds != null) {
342                        targetProxies.put(id, proxyIds);
343                    }
344                }
345            });
346        }
347    }
348
349    @Override
350    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
351        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
352        builder.eq(key, value);
353        builder.notIn(KEY_ID, ignored);
354        return exist(builder.build());
355    }
356
357    @Override
358    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
359            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
360        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments,
361                rangeElementIndexes);
362        MarkLogicQuery query = builder.buildQuery();
363        // Don't do manual projection if there are no projection wildcards, as this brings no new
364        // information and is costly. The only difference is several identical rows instead of one.
365        boolean manualProjection = builder.doManualProjection();
366        if (manualProjection) {
367            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
368            // so we need the full state from the database
369            evaluator.parse();
370        }
371        String searchQuery = query.getSearchQuery(limit, offset);
372        if (log.isTraceEnabled()) {
373            logQuery(searchQuery);
374        }
375        // Run query
376        try (Session session = xccContentSource.newSession()) {
377            AdhocQuery request = session.newAdhocQuery(searchQuery);
378            // ResultSequence will be closed by Session close
379            ResultSequence rs = session.submitRequest(request);
380
381            List<Map<String, Serializable>> projections = new ArrayList<>(limit);
382            for (String rsItem : rs.asStrings()) {
383                State state = MarkLogicStateDeserializer.deserialize(rsItem);
384                if (manualProjection) {
385                    projections.addAll(evaluator.matches(state));
386                } else {
387                    projections.add(DBSStateFlattener.flatten(state));
388                }
389            }
390            long totalSize;
391            if (countUpTo == -1) {
392                // count full size
393                if (limit == 0) {
394                    totalSize = projections.size();
395                } else {
396                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery());
397                    // ResultSequence will be closed by Session close
398                    ResultSequence countRs = session.submitRequest(countRequest);
399                    totalSize = Long.parseLong(countRs.asStrings()[0]);
400                }
401            } else if (countUpTo == 0) {
402                // no count
403                totalSize = -1; // not counted
404            } else {
405                // count only if less than countUpTo
406                if (limit == 0) {
407                    totalSize = projections.size();
408                } else {
409                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1));
410                    // ResultSequence will be closed by Session close
411                    ResultSequence countRs = session.submitRequest(countRequest);
412                    totalSize = Long.parseLong(countRs.asStrings()[0]);
413                }
414                if (totalSize > countUpTo) {
415                    totalSize = -2; // truncated
416                }
417            }
418
419            if (log.isTraceEnabled() && projections.size() != 0) {
420                log.trace("MarkLogic:    -> " + projections.size());
421            }
422            return new PartialList<>(projections, totalSize);
423        } catch (RequestException e) {
424            throw new NuxeoException("An exception happened during xcc call when executing '" + evaluator + "'", e);
425        }
426    }
427
428    @Override
429    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) {
430        cursorService.checkForTimedOutScroll();
431        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes);
432        String query = builder.buildQuery().getSearchQuery();
433        // Don't auto-close the session as we need to keep it open for next scroll
434        try {
435            Session session = xccContentSource.newSession();
436            // Build option to scroll results and not buffer them
437            RequestOptions options = new RequestOptions();
438            options.setCacheResult(false);
439            AdhocQuery request = session.newAdhocQuery(query, options);
440            // ResultSequence will be closed by Session close
441            ResultSequence rs = session.submitRequest(request);
442            String scrollId = cursorService.registerCursorResult(
443                    new MarkLogicCursorResult(session, rs, batchSize, keepAliveInSecond));
444            return scroll(scrollId);
445        } catch (RequestException e) {
446            throw new NuxeoException("An exception happened during xcc call", e);
447        }
448    }
449
450    @Override
451    public ScrollResult scroll(String scrollId) {
452        return cursorService.scroll(scrollId, item -> {
453            State state = MarkLogicStateDeserializer.deserialize(item.asInputStream());
454            return state.get(KEY_ID).toString();
455        });
456    }
457
458    @Override
459    public Lock getLock(String id) {
460        // TODO test performance : retrieve document with read or search document with extract
461        // TODO retrieve only some field
462        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
463        State state = readState(id);
464        if (state == null) {
465            throw new DocumentNotFoundException(id);
466        }
467        String owner = (String) state.get(KEY_LOCK_OWNER);
468        if (owner == null) {
469            // not locked
470            return null;
471        }
472        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
473        return new Lock(owner, created);
474    }
475
476    @Override
477    public Lock setLock(String id, Lock lock) {
478        State state = new State();
479        state.put(KEY_LOCK_OWNER, lock.getOwner());
480        state.put(KEY_LOCK_CREATED, lock.getCreated());
481        String lockString = MarkLogicStateSerializer.serialize(state);
482        if (log.isTraceEnabled()) {
483            log.trace("MarkLogic: SETLOCK " + id + ": " + lockString);
484        }
485        try (Session session = xccContentSource.newSession()) {
486            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy");
487            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
488            request.setNewStringVariable("lock-string", lockString);
489            // ResultSequence will be closed by Session close
490            ResultSequence result = session.submitRequest(request);
491            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
492            return extractLock(resultState);
493        } catch (RequestException e) {
494            if ("Document not found".equals(e.getMessage())) {
495                throw new DocumentNotFoundException(id, e);
496            }
497            throw new NuxeoException("An exception happened during xcc call", e);
498        }
499        // TODO check how the concurrent exception is raised
500    }
501
502    @Override
503    public Lock removeLock(String id, String owner) {
504        if (log.isTraceEnabled()) {
505            log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner);
506        }
507        try (Session session = xccContentSource.newSession()) {
508            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy");
509            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
510            request.setNewStringVariable("owner", Strings.nullToEmpty(owner));
511            // ResultSequence will be closed by Session close
512            ResultSequence result = session.submitRequest(request);
513            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
514            return extractLock(resultState);
515        } catch (RequestException e) {
516            if ("Document not found".equals(e.getMessage())) {
517                throw new DocumentNotFoundException(id, e);
518            }
519            throw new NuxeoException("An exception happened during xcc call", e);
520        }
521    }
522
523    private Lock extractLock(State state) {
524        if (state.isEmpty()) {
525            return null;
526        }
527        String owner = (String) state.get(KEY_LOCK_OWNER);
528        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
529        Boolean failed = (Boolean) state.get("failed");
530        return new Lock(owner, created, Boolean.TRUE.equals(failed));
531    }
532
533    @Override
534    public void closeLockManager() {
535    }
536
537    @Override
538    public void clearLockManagerCaches() {
539    }
540
541    protected List<String> binaryPaths;
542
543    @Override
544    protected void initBlobsPaths() {
545        MarkLogicBlobFinder finder = new MarkLogicBlobFinder();
546        finder.visit();
547        binaryPaths = finder.binaryPaths;
548    }
549
550    protected static class MarkLogicBlobFinder extends BlobFinder {
551        protected List<String> binaryPaths = new ArrayList<>();
552
553        @Override
554        protected void recordBlobPath() {
555            path.addLast(KEY_BLOB_DATA);
556            StringBuilder binaryPath = new StringBuilder();
557            MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager();
558            Type previousType = null;
559            for (String element : path) {
560                if (binaryPath.length() > 0) {
561                    binaryPath.append('/');
562                }
563                // Append current element to path
564                binaryPath.append(element);
565                if (previousType == null) {
566                    // No previous type - it's the first element
567                    previousType = schemaManager.computeField(String.join(".", path), element).getType();
568                } else if (previousType.isComplexType()) {
569                    // Previous type is a complex type - retrieve the type of current element
570                    if (TypeConstants.isContentType(previousType)) {
571                        // The complex type hold the binary data, it's the last element, so break the loop
572                        break;
573                    }
574                    previousType = ((ComplexType) previousType).getField(element).getType();
575                }
576                // Add the item array element if type is a list (here previousType is the type of current element)
577                // Here we re allocate previousType to item array type as there's no element in this.path to select item
578                if (previousType.isListType()) {
579                    binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX);
580                    previousType = ((ListType) previousType).getFieldType();
581                }
582            }
583            binaryPaths.add(binaryPath.toString());
584            path.removeLast();
585        }
586    }
587
588    @Override
589    public void markReferencedBinaries() {
590        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
591        // TODO add a query to not scan all documents
592        String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build();
593        try (Stream<State> states = findAll(query, binaryPaths.toArray(new String[0]))) {
594            states.forEach(state -> markReferencedBinaries(state, blobManager));
595        }
596    }
597
598    protected void markReferencedBinaries(State state, DocumentBlobManager blobManager) {
599        for (Entry<String, Serializable> entry : state.entrySet()) {
600            Serializable value = entry.getValue();
601            if (value instanceof List) {
602                List<?> list = (List<?>) value;
603                for (Object v : list) {
604                    if (v instanceof State) {
605                        markReferencedBinaries((State) v, blobManager);
606                    } else {
607                        markReferencedBinary(v, blobManager);
608                    }
609                }
610            } else if (value instanceof Object[]) {
611                for (Object v : (Object[]) value) {
612                    markReferencedBinary(v, blobManager);
613                }
614            } else if (value instanceof State) {
615                markReferencedBinaries((State) value, blobManager);
616            } else {
617                markReferencedBinary(value, blobManager);
618            }
619        }
620    }
621
622    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
623        if (!(value instanceof String)) {
624            return;
625        }
626        String key = (String) value;
627        blobManager.markReferencedBinary(key, repositoryName);
628    }
629
630    private void logQuery(String query) {
631        log.trace("MarkLogic: QUERY " + query);
632    }
633
634    private boolean exist(String ctsQuery) {
635        // first build exist query from cts query
636        String query = "xdmp:exists(" + ctsQuery + ")";
637        if (log.isTraceEnabled()) {
638            logQuery(query);
639        }
640        // Run query
641        try (Session session = xccContentSource.newSession()) {
642            AdhocQuery request = session.newAdhocQuery(query);
643            // ResultSequence will be closed by Session close
644            ResultSequence rs = session.submitRequest(request);
645            return Boolean.parseBoolean(rs.asString());
646        } catch (RequestException e) {
647            throw new NuxeoException("An exception happened during xcc call", e);
648        }
649    }
650
651    private State findOne(String ctsQuery) {
652        // first add limit to ctsQuery
653        String query = ctsQuery + "[1 to 1]";
654        if (log.isTraceEnabled()) {
655            logQuery(query);
656        }
657        // Run query
658        try (Session session = xccContentSource.newSession()) {
659            AdhocQuery request = session.newAdhocQuery(query);
660            // ResultSequence will be closed by Session close
661            ResultSequence rs = session.submitRequest(request);
662            if (rs.hasNext()) {
663                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
664            }
665            return null;
666        } catch (RequestException e) {
667            throw new NuxeoException("An exception happened during xcc call", e);
668        }
669    }
670
671    protected Stream<State> findAll(String ctsQuery, String... selects) {
672        String query = ctsQuery;
673        if (selects.length > 0) {
674            query = "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n"
675                    + "let $paths := (" + Arrays.stream(selects)
676                                                .map(MarkLogicHelper::serializeKey)
677                                                .map(select -> "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/" + select
678                                                        + "\"")
679                                                .collect(Collectors.joining(",\n"))
680                    + ")let $namespaces := ()\n" + "for $i in " + query
681                    + " return extract:extract-nodes($i, $paths, $namespaces)";
682        }
683        if (log.isTraceEnabled()) {
684            logQuery(query);
685        }
686        // Run query
687        boolean completedAbruptly = true;
688        Session session = xccContentSource.newSession();
689        try {
690            // As we can get a lot of results don't buffer the result
691            RequestOptions options = new RequestOptions();
692            options.setCacheResult(false);
693            AdhocQuery request = session.newAdhocQuery(query, options);
694
695            // We give 0 as characteristics because it's the value used by Iterable#spliterator() and according to
696            // StreamSupport.stream(Supplier, int, boolean) documentation, characteristics must be equal to
697            // supplier.get().characteristics()
698            Supplier<Spliterator<ResultItem>> spliteratorSupplier = () -> {
699                try {
700                    // ResultSequence will be closed by Session close (closed by stream closed)
701                    ResultSequence rs = session.submitRequest(request);
702                    Iterable<ResultItem> items = rs::iterator;
703                    return items.spliterator();
704                } catch (RequestException e) {
705                    throw new NuxeoException("An exception happened during xcc call", e);
706                }
707            };
708            Stream<State> stream = StreamSupport.stream(spliteratorSupplier, 0, false)
709                                                .onClose(session::close)
710                                                .map(ResultItem::getItem)
711                                                .map(XdmItem::asInputStream)
712                                                .map(MarkLogicStateDeserializer::deserialize);
713            // the stream takes responsibility for closing the session
714            completedAbruptly = false;
715            return stream;
716        } finally {
717            if (completedAbruptly) {
718                session.close();
719            }
720        }
721    }
722
723}