001/*
002 * (C) Copyright 2016-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.marklogic;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
028
029import java.io.Serializable;
030import java.security.GeneralSecurityException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Calendar;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Set;
040import java.util.Spliterator;
041import java.util.UUID;
042import java.util.function.Function;
043import java.util.function.Supplier;
044import java.util.stream.Collector;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import java.util.stream.StreamSupport;
048
049import javax.net.ssl.SSLContext;
050import javax.resource.spi.ConnectionManager;
051
052import org.apache.commons.lang3.StringUtils;
053import org.apache.commons.logging.Log;
054import org.apache.commons.logging.LogFactory;
055import org.nuxeo.ecm.core.api.CursorService;
056import org.nuxeo.ecm.core.api.DocumentNotFoundException;
057import org.nuxeo.ecm.core.api.Lock;
058import org.nuxeo.ecm.core.api.NuxeoException;
059import org.nuxeo.ecm.core.api.PartialList;
060import org.nuxeo.ecm.core.api.ScrollResult;
061import org.nuxeo.ecm.core.blob.DocumentBlobManager;
062import org.nuxeo.ecm.core.model.Repository;
063import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
064import org.nuxeo.ecm.core.schema.TypeConstants;
065import org.nuxeo.ecm.core.schema.types.ComplexType;
066import org.nuxeo.ecm.core.schema.types.ListType;
067import org.nuxeo.ecm.core.schema.types.Type;
068import org.nuxeo.ecm.core.storage.State;
069import org.nuxeo.ecm.core.storage.State.StateDiff;
070import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
071import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
072import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
073import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
074import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery;
075import org.nuxeo.runtime.api.Framework;
076
077import com.google.common.base.Strings;
078import com.marklogic.xcc.AdhocQuery;
079import com.marklogic.xcc.Content;
080import com.marklogic.xcc.ContentFactory;
081import com.marklogic.xcc.ContentSource;
082import com.marklogic.xcc.ContentSourceFactory;
083import com.marklogic.xcc.ModuleInvoke;
084import com.marklogic.xcc.RequestOptions;
085import com.marklogic.xcc.ResultItem;
086import com.marklogic.xcc.ResultSequence;
087import com.marklogic.xcc.SecurityOptions;
088import com.marklogic.xcc.Session;
089import com.marklogic.xcc.exceptions.RequestException;
090import com.marklogic.xcc.types.XdmItem;
091
092/**
093 * MarkLogic implementation of a {@link Repository}.
094 *
095 * @since 8.3
096 */
097public class MarkLogicRepository extends DBSRepositoryBase {
098
099    private static final Log log = LogFactory.getLog(MarkLogicRepository.class);
100
101    private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id);
102
103    public static final String DB_DEFAULT = "nuxeo";
104
105    protected ContentSource xccContentSource;
106
107    /** Last value used from the in-memory sequence. Used by unit tests. */
108    protected long sequenceLastValue;
109
110    protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes;
111
112    protected final CursorService<ResultSequence, ResultItem, String> cursorService;
113
114    public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) {
115        super(cm, descriptor.name, descriptor);
116        xccContentSource = newMarkLogicContentSource(descriptor);
117        rangeElementIndexes = descriptor.rangeElementIndexes.stream()
118                                                            .map(MarkLogicRangeElementIndexDescriptor::new)
119                                                            .collect(Collectors.toList());
120        cursorService = new CursorService<>(item -> {
121            State state = MarkLogicStateDeserializer.deserialize(item.asInputStream());
122            return state.get(KEY_ID).toString();
123        });
124        initRepository();
125    }
126
127    @Override
128    public List<IdType> getAllowedIdTypes() {
129        return Collections.singletonList(IdType.varchar);
130    }
131
132    // used also by unit tests
133    public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) {
134        String host = descriptor.host;
135        Integer port = descriptor.port;
136        if (StringUtils.isBlank(host) || port == null) {
137            throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor");
138        }
139        String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT);
140        String user = descriptor.user;
141        String password = descriptor.password;
142        // handle SSL
143        SecurityOptions securityOptions = descriptor.sslEnabled ? newTrustOptions() : null;
144        return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname, securityOptions);
145    }
146
147    protected static SecurityOptions newTrustOptions() {
148        try {
149            SSLContext sslContext = SSLContext.getDefault();
150            return new SecurityOptions(sslContext);
151        } catch (GeneralSecurityException e) {
152            throw new NuxeoException("Unable to initialize Security options for MarkLogic connection.", e);
153        }
154    }
155
156    protected void initRepository() {
157        if (readState(getRootId()) == null) {
158            initRoot();
159        }
160    }
161
162    @Override
163    public String generateNewId() {
164        if (DEBUG_UUIDS) {
165            Long id = getNextSequenceId();
166            return "UUID_" + id;
167        }
168        return UUID.randomUUID().toString();
169    }
170
171    // Used by unit tests
172    protected synchronized Long getNextSequenceId() {
173        sequenceLastValue++;
174        return Long.valueOf(sequenceLastValue);
175    }
176
177    @Override
178    public void shutdown() {
179        super.shutdown();
180        cursorService.clear();
181    }
182
183    @Override
184    public State readState(String id) {
185        return readPartialState(id, null);
186    }
187
188    @Override
189    public State readPartialState(String id, Collection<String> keys) {
190        if (log.isTraceEnabled()) {
191            log.trace("MarkLogic: READ " + id);
192        }
193        try (Session session = xccContentSource.newSession()) {
194            String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')";
195            if (keys != null && !keys.isEmpty()) {
196                query = getProjectedQuery(query, keys);
197            }
198            AdhocQuery request = session.newAdhocQuery(query);
199            // ResultSequence will be closed by Session close
200            ResultSequence rs = session.submitRequest(request);
201            if (rs.hasNext()) {
202                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
203            }
204            return null;
205        } catch (RequestException e) {
206            throw new NuxeoException("An exception happened during xcc call", e);
207        }
208    }
209
210    @Override
211    public List<State> readStates(List<String> ids) {
212        if (log.isTraceEnabled()) {
213            log.trace("MarkLogic: READ " + ids);
214        }
215        try (Session session = xccContentSource.newSession()) {
216            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
217                    Collectors.joining(",", "fn:doc((", "))"));
218            AdhocQuery request = session.newAdhocQuery(query);
219            // ResultSequence will be closed by Session close
220            ResultSequence rs = session.submitRequest(request);
221            return Arrays.stream(rs.asStrings())
222                         .map(MarkLogicStateDeserializer::deserialize)
223                         .collect(Collectors.toList());
224        } catch (RequestException e) {
225            throw new NuxeoException("An exception happened during xcc call", e);
226        }
227    }
228
229    @Override
230    public void createState(State state) {
231        String id = state.get(KEY_ID).toString();
232        if (log.isTraceEnabled()) {
233            log.trace("MarkLogic: CREATE " + id + ": " + state);
234        }
235        try (Session session = xccContentSource.newSession()) {
236            session.insertContent(convert(state));
237        } catch (RequestException e) {
238            throw new NuxeoException("An exception happened during xcc call", e);
239        }
240    }
241
242    @Override
243    public void createStates(List<State> states) {
244        if (log.isTraceEnabled()) {
245            log.trace("MarkLogic: CREATE ["
246                    + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", "))
247                    + "]: " + states);
248        }
249        try (Session session = xccContentSource.newSession()) {
250            Content[] contents = states.stream().map(this::convert).toArray(Content[]::new);
251            session.insertContent(contents);
252        } catch (RequestException e) {
253            throw new NuxeoException("An exception happened during xcc call", e);
254        }
255    }
256
257    private Content convert(State state) {
258        String id = state.get(KEY_ID).toString();
259        return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null);
260    }
261
262    @Override
263    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
264        // TODO changeTokenUpdater
265        String patch = MarkLogicStateSerializer.serialize(diff);
266        if (log.isTraceEnabled()) {
267            log.trace("MarkLogic: UPDATE " + id + ": " + patch);
268        }
269        try (Session session = xccContentSource.newSession()) {
270            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy");
271            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
272            request.setNewStringVariable("patch-string", patch);
273            // ResultSequence will be closed by Session close
274            session.submitRequest(request);
275        } catch (RequestException e) {
276            throw new NuxeoException("An exception happened during xcc call", e);
277        }
278    }
279
280    @Override
281    public void deleteStates(Set<String> ids) {
282        if (log.isTraceEnabled()) {
283            log.trace("MarkLogic: DELETE " + ids);
284        }
285        try (Session session = xccContentSource.newSession()) {
286            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
287                    Collectors.joining(",", "xdmp:document-delete((", "))"));
288            AdhocQuery request = session.newAdhocQuery(query);
289            // ResultSequence will be closed by Session close
290            session.submitRequest(request);
291        } catch (RequestException e) {
292            throw new NuxeoException("An exception happened during xcc call", e);
293        }
294    }
295
296    @Override
297    public State readChildState(String parentId, String name, Set<String> ignored) {
298        String query = getChildQuery(parentId, name, ignored);
299        return findOne(query);
300    }
301
302    @Override
303    public boolean hasChild(String parentId, String name, Set<String> ignored) {
304        String query = getChildQuery(parentId, name, ignored);
305        return exist(query);
306    }
307
308    private String getChildQuery(String parentId, String name, Set<String> ignored) {
309        return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId)
310                                                                   .eq(KEY_NAME, name)
311                                                                   .notIn(KEY_ID, ignored)
312                                                                   .build();
313    }
314
315    @Override
316    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
317        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
318        builder.eq(key, value);
319        builder.notIn(KEY_ID, ignored);
320        try (Stream<State> states = findAll(builder.build())) {
321            return states.collect(Collectors.toList());
322        }
323    }
324
325    @Override
326    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
327        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
328        builder.eq(key1, value1);
329        builder.eq(key2, value2);
330        builder.notIn(KEY_ID, ignored);
331        try (Stream<State> states = findAll(builder.build())) {
332            return states.collect(Collectors.toList());
333        }
334    }
335
336    @Override
337    public Stream<State> getDescendants(String rootId, Set<String> keys) {
338        return getDescendants(rootId, keys, 0);
339    }
340
341    @Override
342    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
343        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
344        builder.eq(KEY_ANCESTOR_IDS, rootId).limit(limit);
345        List<String> selects = new ArrayList<>();
346        selects.add(KEY_ID);
347        selects.addAll(keys);
348        return findAll(builder.build(), selects.toArray(new String[0]));
349    }
350
351    @Override
352    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
353        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
354        builder.eq(key, value);
355        builder.notIn(KEY_ID, ignored);
356        return exist(builder.build());
357    }
358
359    @Override
360    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
361            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
362        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments,
363                rangeElementIndexes);
364        MarkLogicQuery query = builder.buildQuery();
365        // Don't do manual projection if there are no projection wildcards, as this brings no new
366        // information and is costly. The only difference is several identical rows instead of one.
367        boolean manualProjection = builder.doManualProjection();
368        if (manualProjection) {
369            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
370            // so we need the full state from the database
371            evaluator.parse();
372        }
373        String searchQuery = query.getSearchQuery(limit, offset);
374        if (log.isTraceEnabled()) {
375            logQuery(searchQuery);
376        }
377        // Run query
378        try (Session session = xccContentSource.newSession()) {
379            AdhocQuery request = session.newAdhocQuery(searchQuery);
380            // ResultSequence will be closed by Session close
381            ResultSequence rs = session.submitRequest(request);
382
383            List<Map<String, Serializable>> projections = new ArrayList<>(limit);
384            DBSStateFlattener flattener = new DBSStateFlattener();
385            for (String rsItem : rs.asStrings()) {
386                State state = MarkLogicStateDeserializer.deserialize(rsItem);
387                if (manualProjection) {
388                    projections.addAll(evaluator.matches(state));
389                } else {
390                    projections.add(flattener.flatten(state));
391                }
392            }
393            long totalSize;
394            if (countUpTo == -1) {
395                // count full size
396                if (limit == 0) {
397                    totalSize = projections.size();
398                } else if (manualProjection) {
399                    totalSize = -1; // unknown due to manual projection
400                } else {
401                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery());
402                    // ResultSequence will be closed by Session close
403                    ResultSequence countRs = session.submitRequest(countRequest);
404                    totalSize = Long.parseLong(countRs.asStrings()[0]);
405                }
406            } else if (countUpTo == 0) {
407                // no count
408                totalSize = -1; // not counted
409            } else {
410                // count only if less than countUpTo
411                if (limit == 0) {
412                    totalSize = projections.size();
413                } else if (manualProjection) {
414                    totalSize = -1; // unknown due to manual projection
415                } else {
416                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1));
417                    // ResultSequence will be closed by Session close
418                    ResultSequence countRs = session.submitRequest(countRequest);
419                    totalSize = Long.parseLong(countRs.asStrings()[0]);
420                }
421                if (totalSize > countUpTo) {
422                    totalSize = -2; // truncated
423                }
424            }
425
426            if (log.isTraceEnabled() && projections.size() != 0) {
427                log.trace("MarkLogic:    -> " + projections.size());
428            }
429            return new PartialList<>(projections, totalSize);
430        } catch (RequestException e) {
431            throw new NuxeoException("An exception happened during xcc call when executing '" + evaluator + "'", e);
432        }
433    }
434
435    @Override
436    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) {
437        cursorService.checkForTimedOutScroll();
438        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes);
439        String query = builder.buildQuery().getSearchQuery();
440        // Don't auto-close the session as we need to keep it open for next scroll
441        try {
442            Session session = xccContentSource.newSession(); // kept open by cursor, do not close
443            // Build option to scroll results and not buffer them
444            RequestOptions options = new RequestOptions();
445            options.setCacheResult(false);
446            AdhocQuery request = session.newAdhocQuery(query, options);
447            // ResultSequence will be closed by Session close
448            ResultSequence rs = session.submitRequest(request);
449            String scrollId = cursorService.registerCursorResult(
450                    new MarkLogicCursorResult(session, rs, batchSize, keepAliveInSecond));
451            return scroll(scrollId);
452        } catch (RequestException e) {
453            throw new NuxeoException("An exception happened during xcc call", e);
454        }
455    }
456
457    @Override
458    public ScrollResult<String> scroll(String scrollId) {
459        return cursorService.scroll(scrollId);
460    }
461
462    @Override
463    public Lock getLock(String id) {
464        // TODO test performance : retrieve document with read or search document with extract
465        // TODO retrieve only some field
466        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
467        State state = readState(id);
468        if (state == null) {
469            throw new DocumentNotFoundException(id);
470        }
471        String owner = (String) state.get(KEY_LOCK_OWNER);
472        if (owner == null) {
473            // not locked
474            return null;
475        }
476        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
477        return new Lock(owner, created);
478    }
479
480    @Override
481    public Lock setLock(String id, Lock lock) {
482        State state = new State();
483        state.put(KEY_LOCK_OWNER, lock.getOwner());
484        state.put(KEY_LOCK_CREATED, lock.getCreated());
485        String lockString = MarkLogicStateSerializer.serialize(state);
486        if (log.isTraceEnabled()) {
487            log.trace("MarkLogic: SETLOCK " + id + ": " + lockString);
488        }
489        try (Session session = xccContentSource.newSession()) {
490            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy");
491            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
492            request.setNewStringVariable("lock-string", lockString);
493            // ResultSequence will be closed by Session close
494            ResultSequence result = session.submitRequest(request);
495            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
496            return extractLock(resultState);
497        } catch (RequestException e) {
498            if ("Document not found".equals(e.getMessage())) {
499                throw new DocumentNotFoundException(id, e);
500            }
501            throw new NuxeoException("An exception happened during xcc call", e);
502        }
503        // TODO check how the concurrent exception is raised
504    }
505
506    @Override
507    public Lock removeLock(String id, String owner) {
508        if (log.isTraceEnabled()) {
509            log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner);
510        }
511        try (Session session = xccContentSource.newSession()) {
512            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy");
513            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
514            request.setNewStringVariable("owner", Strings.nullToEmpty(owner));
515            // ResultSequence will be closed by Session close
516            ResultSequence result = session.submitRequest(request);
517            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
518            return extractLock(resultState);
519        } catch (RequestException e) {
520            if ("Document not found".equals(e.getMessage())) {
521                throw new DocumentNotFoundException(id, e);
522            }
523            throw new NuxeoException("An exception happened during xcc call", e);
524        }
525    }
526
527    private Lock extractLock(State state) {
528        if (state.isEmpty()) {
529            return null;
530        }
531        String owner = (String) state.get(KEY_LOCK_OWNER);
532        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
533        Boolean failed = (Boolean) state.get("failed");
534        return new Lock(owner, created, Boolean.TRUE.equals(failed));
535    }
536
537    @Override
538    public void closeLockManager() {
539    }
540
541    @Override
542    public void clearLockManagerCaches() {
543    }
544
545    protected List<String> binaryPaths;
546
547    @Override
548    protected void initBlobsPaths() {
549        MarkLogicBlobFinder finder = new MarkLogicBlobFinder();
550        finder.visit();
551        binaryPaths = finder.binaryPaths;
552    }
553
554    protected static class MarkLogicBlobFinder extends BlobFinder {
555        protected List<String> binaryPaths = new ArrayList<>();
556
557        @Override
558        protected void recordBlobPath() {
559            path.addLast(KEY_BLOB_DATA);
560            StringBuilder binaryPath = new StringBuilder();
561            MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager();
562            Type previousType = null;
563            for (String element : path) {
564                if (binaryPath.length() > 0) {
565                    binaryPath.append('/');
566                }
567                // Append current element to path
568                binaryPath.append(element);
569                if (previousType == null) {
570                    // No previous type - it's the first element
571                    previousType = schemaManager.computeField(String.join(".", path), element).getType();
572                } else if (previousType.isComplexType()) {
573                    // Previous type is a complex type - retrieve the type of current element
574                    if (TypeConstants.isContentType(previousType)) {
575                        // The complex type hold the binary data, it's the last element, so break the loop
576                        break;
577                    }
578                    previousType = ((ComplexType) previousType).getField(element).getType();
579                }
580                // Add the item array element if type is a list (here previousType is the type of current element)
581                // Here we re allocate previousType to item array type as there's no element in this.path to select item
582                if (previousType.isListType()) {
583                    binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX);
584                    previousType = ((ListType) previousType).getFieldType();
585                }
586            }
587            binaryPaths.add(binaryPath.toString());
588            path.removeLast();
589        }
590    }
591
592    @Override
593    public void markReferencedBinaries() {
594        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
595        // TODO add a query to not scan all documents
596        String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build();
597        try (Stream<State> states = findAll(query, binaryPaths.toArray(new String[0]))) {
598            states.forEach(state -> markReferencedBinaries(state, blobManager));
599        }
600    }
601
602    protected void markReferencedBinaries(State state, DocumentBlobManager blobManager) {
603        for (Entry<String, Serializable> entry : state.entrySet()) {
604            Serializable value = entry.getValue();
605            if (value instanceof List) {
606                List<?> list = (List<?>) value;
607                for (Object v : list) {
608                    if (v instanceof State) {
609                        markReferencedBinaries((State) v, blobManager);
610                    } else {
611                        markReferencedBinary(v, blobManager);
612                    }
613                }
614            } else if (value instanceof Object[]) {
615                for (Object v : (Object[]) value) {
616                    markReferencedBinary(v, blobManager);
617                }
618            } else if (value instanceof State) {
619                markReferencedBinaries((State) value, blobManager);
620            } else {
621                markReferencedBinary(value, blobManager);
622            }
623        }
624    }
625
626    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
627        if (!(value instanceof String)) {
628            return;
629        }
630        String key = (String) value;
631        blobManager.markReferencedBinary(key, repositoryName);
632    }
633
634    private void logQuery(String query) {
635        log.trace("MarkLogic: QUERY " + query);
636    }
637
638    private boolean exist(String ctsQuery) {
639        // first build exist query from cts query
640        String query = "xdmp:exists(" + ctsQuery + ")";
641        if (log.isTraceEnabled()) {
642            logQuery(query);
643        }
644        // Run query
645        try (Session session = xccContentSource.newSession()) {
646            AdhocQuery request = session.newAdhocQuery(query);
647            // ResultSequence will be closed by Session close
648            ResultSequence rs = session.submitRequest(request);
649            return Boolean.parseBoolean(rs.asString());
650        } catch (RequestException e) {
651            throw new NuxeoException("An exception happened during xcc call", e);
652        }
653    }
654
655    private State findOne(String ctsQuery) {
656        // first add limit to ctsQuery
657        String query = ctsQuery + "[1 to 1]";
658        if (log.isTraceEnabled()) {
659            logQuery(query);
660        }
661        // Run query
662        try (Session session = xccContentSource.newSession()) {
663            AdhocQuery request = session.newAdhocQuery(query);
664            // ResultSequence will be closed by Session close
665            ResultSequence rs = session.submitRequest(request);
666            if (rs.hasNext()) {
667                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
668            }
669            return null;
670        } catch (RequestException e) {
671            throw new NuxeoException("An exception happened during xcc call", e);
672        }
673    }
674
675    protected Stream<State> findAll(String ctsQuery, String... selects) {
676        String query = ctsQuery;
677        if (selects.length > 0) {
678            query = getProjectedQuery(query, Arrays.asList(selects));
679        }
680        if (log.isTraceEnabled()) {
681            logQuery(query);
682        }
683        // Run query
684        boolean completedAbruptly = true;
685        Session session = xccContentSource.newSession(); // kept open by stream or closed on exception
686        try {
687            // As we can get a lot of results don't buffer the result
688            RequestOptions options = new RequestOptions();
689            options.setCacheResult(false);
690            AdhocQuery request = session.newAdhocQuery(query, options);
691
692            // We give 0 as characteristics because it's the value used by Iterable#spliterator() and according to
693            // StreamSupport.stream(Supplier, int, boolean) documentation, characteristics must be equal to
694            // supplier.get().characteristics()
695            Supplier<Spliterator<ResultItem>> spliteratorSupplier = () -> {
696                try {
697                    // ResultSequence will be closed by Session close (closed by stream closed)
698                    ResultSequence rs = session.submitRequest(request);
699                    Iterable<ResultItem> items = rs::iterator;
700                    return items.spliterator();
701                } catch (RequestException e) {
702                    throw new NuxeoException("An exception happened during xcc call", e);
703                }
704            };
705            Stream<State> stream = StreamSupport.stream(spliteratorSupplier, 0, false)
706                                                .onClose(session::close)
707                                                .map(ResultItem::getItem)
708                                                .map(XdmItem::asInputStream)
709                                                .map(MarkLogicStateDeserializer::deserialize);
710            // the stream takes responsibility for closing the session
711            completedAbruptly = false;
712            return stream;
713        } finally {
714            if (completedAbruptly) {
715                session.close();
716            }
717        }
718    }
719
720    protected static final String PATH_COLLECTOR_BEGIN = "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/";
721
722    protected static final String PATH_COLLECTOR_END = "\"";
723
724    protected static final String PATH_COLLECTOR_SEP = ",\n";
725
726    protected static final Collector<CharSequence, ?, String> PATH_COLLECTOR = Collectors.joining(
727            PATH_COLLECTOR_END + PATH_COLLECTOR_SEP + PATH_COLLECTOR_BEGIN, PATH_COLLECTOR_BEGIN, PATH_COLLECTOR_END);
728
729    protected String getProjectedQuery(String query, Collection<String> keys) {
730        String paths = keys.stream().map(MarkLogicHelper::serializeKey).collect(PATH_COLLECTOR);
731        return "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n"
732                + "let $paths := (" + paths + ")\n" //
733                + "let $namespaces := ()\n" //
734                + "for $i in " + query + " return extract:extract-nodes($i, $paths, $namespaces)";
735    }
736
737}