001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.marklogic;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
031
032import java.io.Serializable;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Calendar;
036import java.util.Collections;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.UUID;
041import java.util.function.Function;
042import java.util.stream.Collectors;
043
044import javax.resource.spi.ConnectionManager;
045
046import org.apache.commons.lang.StringUtils;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.nuxeo.ecm.core.api.DocumentNotFoundException;
050import org.nuxeo.ecm.core.api.Lock;
051import org.nuxeo.ecm.core.api.NuxeoException;
052import org.nuxeo.ecm.core.api.PartialList;
053import org.nuxeo.ecm.core.api.ScrollResult;
054import org.nuxeo.ecm.core.api.ScrollResultImpl;
055import org.nuxeo.ecm.core.model.Repository;
056import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
057import org.nuxeo.ecm.core.storage.State;
058import org.nuxeo.ecm.core.storage.State.StateDiff;
059import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
060import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
061import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
062import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery;
063
064import com.google.common.base.Strings;
065import com.marklogic.xcc.AdhocQuery;
066import com.marklogic.xcc.Content;
067import com.marklogic.xcc.ContentFactory;
068import com.marklogic.xcc.ContentSource;
069import com.marklogic.xcc.ContentSourceFactory;
070import com.marklogic.xcc.ModuleInvoke;
071import com.marklogic.xcc.ResultSequence;
072import com.marklogic.xcc.Session;
073import com.marklogic.xcc.exceptions.RequestException;
074
075/**
076 * MarkLogic implementation of a {@link Repository}.
077 *
078 * @since 8.3
079 */
080public class MarkLogicRepository extends DBSRepositoryBase {
081
082    private static final Log log = LogFactory.getLog(MarkLogicRepository.class);
083
084    private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id);
085
086    public static final String DB_DEFAULT = "nuxeo";
087
088    protected static final String NOSCROLL_ID = "noscroll";
089
090    protected ContentSource xccContentSource;
091
092    /** Last value used from the in-memory sequence. Used by unit tests. */
093    protected long sequenceLastValue;
094
095    protected List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes;
096
097    public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) {
098        super(cm, descriptor.name, descriptor);
099        xccContentSource = newMarkLogicContentSource(descriptor);
100        rangeElementIndexes = descriptor.rangeElementIndexes.stream()
101                                                            .map(MarkLogicRangeElementIndexDescriptor::new)
102                                                            .collect(Collectors.toList());
103        initRepository();
104    }
105
106    @Override
107    public List<IdType> getAllowedIdTypes() {
108        return Collections.singletonList(IdType.varchar);
109    }
110
111    // used also by unit tests
112    public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) {
113        String host = descriptor.host;
114        Integer port = descriptor.port;
115        if (StringUtils.isBlank(host) || port == null) {
116            throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor");
117        }
118        String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT);
119        String user = descriptor.user;
120        String password = descriptor.password;
121        return ContentSourceFactory.newContentSource(host, port, user, password, dbname);
122    }
123
124    protected void initRepository() {
125        if (readState(getRootId()) == null) {
126            initRoot();
127        }
128    }
129
130    @Override
131    protected void initBlobsPaths() {
132        // throw new IllegalStateException("Not implemented yet");
133    }
134
135    @Override
136    public String generateNewId() {
137        if (DEBUG_UUIDS) {
138            Long id = getNextSequenceId();
139            return "UUID_" + id;
140        }
141        return UUID.randomUUID().toString();
142    }
143
144    // Used by unit tests
145    protected synchronized Long getNextSequenceId() {
146        sequenceLastValue++;
147        return Long.valueOf(sequenceLastValue);
148    }
149
150    @Override
151    public State readState(String id) {
152        if (log.isTraceEnabled()) {
153            log.trace("MarkLogic: READ " + id);
154        }
155        try (Session session = xccContentSource.newSession()) {
156            String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')";
157            AdhocQuery request = session.newAdhocQuery(query);
158            // ResultSequence will be closed by Session close
159            ResultSequence rs = session.submitRequest(request);
160            if (rs.hasNext()) {
161                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
162            }
163            return null;
164        } catch (RequestException e) {
165            throw new NuxeoException("An exception happened during xcc call", e);
166        }
167    }
168
169    @Override
170    public List<State> readStates(List<String> ids) {
171        if (log.isTraceEnabled()) {
172            log.trace("MarkLogic: READ " + ids);
173        }
174        try (Session session = xccContentSource.newSession()) {
175            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
176                    Collectors.joining(",", "fn:doc((", "))"));
177            AdhocQuery request = session.newAdhocQuery(query);
178            // ResultSequence will be closed by Session close
179            ResultSequence rs = session.submitRequest(request);
180            return Arrays.stream(rs.asStrings())
181                         .map(MarkLogicStateDeserializer::deserialize)
182                         .collect(Collectors.toList());
183        } catch (RequestException e) {
184            throw new NuxeoException("An exception happened during xcc call", e);
185        }
186    }
187
188    @Override
189    public void createState(State state) {
190        String id = state.get(KEY_ID).toString();
191        if (log.isTraceEnabled()) {
192            log.trace("MarkLogic: CREATE " + id + ": " + state);
193        }
194        try (Session session = xccContentSource.newSession()) {
195            session.insertContent(convert(state));
196        } catch (RequestException e) {
197            throw new NuxeoException("An exception happened during xcc call", e);
198        }
199    }
200
201    @Override
202    public void createStates(List<State> states) {
203        if (log.isTraceEnabled()) {
204            log.trace("MarkLogic: CREATE ["
205                    + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", "))
206                    + "]: " + states);
207        }
208        try (Session session = xccContentSource.newSession()) {
209            Content[] contents = states.stream().map(this::convert).toArray(Content[]::new);
210            session.insertContent(contents);
211        } catch (RequestException e) {
212            throw new NuxeoException("An exception happened during xcc call", e);
213        }
214    }
215
216    private Content convert(State state) {
217        String id = state.get(KEY_ID).toString();
218        return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null);
219    }
220
221    @Override
222    public void updateState(String id, StateDiff diff) {
223        String patch = MarkLogicStateSerializer.serialize(diff);
224        if (log.isTraceEnabled()) {
225            log.trace("MarkLogic: UPDATE " + id + ": " + patch);
226        }
227        try (Session session = xccContentSource.newSession()) {
228            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy");
229            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
230            request.setNewStringVariable("patch-string", patch);
231            // ResultSequence will be closed by Session close
232            session.submitRequest(request);
233        } catch (RequestException e) {
234            throw new NuxeoException("An exception happened during xcc call", e);
235        }
236    }
237
238    @Override
239    public void deleteStates(Set<String> ids) {
240        if (log.isTraceEnabled()) {
241            log.trace("MarkLogic: DELETE " + ids);
242        }
243        try (Session session = xccContentSource.newSession()) {
244            String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect(
245                    Collectors.joining(",", "xdmp:document-delete((", "))"));
246            AdhocQuery request = session.newAdhocQuery(query);
247            // ResultSequence will be closed by Session close
248            session.submitRequest(request);
249        } catch (RequestException e) {
250            throw new NuxeoException("An exception happened during xcc call", e);
251        }
252    }
253
254    @Override
255    public State readChildState(String parentId, String name, Set<String> ignored) {
256        String query = getChildQuery(parentId, name, ignored);
257        return findOne(query);
258    }
259
260    @Override
261    public boolean hasChild(String parentId, String name, Set<String> ignored) {
262        String query = getChildQuery(parentId, name, ignored);
263        return exist(query);
264    }
265
266    private String getChildQuery(String parentId, String name, Set<String> ignored) {
267        return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId)
268                                                                   .eq(KEY_NAME, name)
269                                                                   .notIn(KEY_ID, ignored)
270                                                                   .build();
271    }
272
273    @Override
274    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
275        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
276        builder.eq(key, value);
277        builder.notIn(KEY_ID, ignored);
278        return findAll(builder.build());
279    }
280
281    @Override
282    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
283        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
284        builder.eq(key1, value1);
285        builder.eq(key2, value2);
286        builder.notIn(KEY_ID, ignored);
287        return findAll(builder.build());
288    }
289
290    @Override
291    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
292            Map<String, Object[]> targetProxies) {
293        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
294        builder.eq(key, value);
295        for (State state : findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, KEY_PROXY_IDS)) {
296            String id = (String) state.get(KEY_ID);
297            ids.add(id);
298            if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
299                String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
300                proxyTargets.put(id, targetId);
301            }
302            if (targetProxies != null) {
303                Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
304                if (proxyIds != null) {
305                    targetProxies.put(id, proxyIds);
306                }
307            }
308        }
309    }
310
311    @Override
312    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
313        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes);
314        builder.eq(key, value);
315        builder.notIn(KEY_ID, ignored);
316        return exist(builder.build());
317    }
318
319    @Override
320    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
321            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
322        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments,
323                rangeElementIndexes);
324        MarkLogicQuery query = builder.buildQuery();
325        // Don't do manual projection if there are no projection wildcards, as this brings no new
326        // information and is costly. The only difference is several identical rows instead of one.
327        boolean manualProjection = builder.doManualProjection();
328        if (manualProjection) {
329            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
330            // so we need the full state from the database
331            evaluator.parse();
332        }
333        String searchQuery = query.getSearchQuery(limit, offset);
334        if (log.isTraceEnabled()) {
335            logQuery(searchQuery);
336        }
337        // Run query
338        try (Session session = xccContentSource.newSession()) {
339            AdhocQuery request = session.newAdhocQuery(searchQuery);
340            // ResultSequence will be closed by Session close
341            ResultSequence rs = session.submitRequest(request);
342
343            List<Map<String, Serializable>> projections = new ArrayList<>(limit);
344            for (String rsItem : rs.asStrings()) {
345                State state = MarkLogicStateDeserializer.deserialize(rsItem);
346                if (manualProjection) {
347                    projections.addAll(evaluator.matches(state));
348                } else {
349                    projections.add(DBSStateFlattener.flatten(state));
350                }
351            }
352            long totalSize;
353            if (countUpTo == -1) {
354                // count full size
355                if (limit == 0) {
356                    totalSize = projections.size();
357                } else {
358                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery());
359                    // ResultSequence will be closed by Session close
360                    ResultSequence countRs = session.submitRequest(countRequest);
361                    totalSize = Long.parseLong(countRs.asStrings()[0]);
362                }
363            } else if (countUpTo == 0) {
364                // no count
365                totalSize = -1; // not counted
366            } else {
367                // count only if less than countUpTo
368                if (limit == 0) {
369                    totalSize = projections.size();
370                } else {
371                    AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1));
372                    // ResultSequence will be closed by Session close
373                    ResultSequence countRs = session.submitRequest(countRequest);
374                    totalSize = Long.parseLong(countRs.asStrings()[0]);
375                }
376                if (totalSize > countUpTo) {
377                    totalSize = -2; // truncated
378                }
379            }
380
381            if (log.isTraceEnabled() && projections.size() != 0) {
382                log.trace("MarkLogic:    -> " + projections.size());
383            }
384            return new PartialList<>(projections, totalSize);
385        } catch (RequestException e) {
386            throw new NuxeoException("An exception happened during xcc call", e);
387        }
388    }
389
390    @Override
391    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) {
392        // Not yet implemented, return all result in one shot for now
393        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes);
394        String query = builder.buildQuery().getSearchQuery();
395        // Run query
396        try (Session session = xccContentSource.newSession()) {
397            AdhocQuery request = session.newAdhocQuery(query);
398            // ResultSequence will be closed by Session close
399            ResultSequence rs = session.submitRequest(request);
400            return Arrays.stream(rs.asStrings())
401                         .map(MarkLogicStateDeserializer::deserialize)
402                         .map(state -> state.get(KEY_ID).toString())
403                         .collect(Collectors.collectingAndThen(Collectors.toList(),
404                                 ids -> new ScrollResultImpl(NOSCROLL_ID, ids)));
405        } catch (RequestException e) {
406            throw new NuxeoException("An exception happened during xcc call", e);
407        }
408    }
409
410    @Override
411    public ScrollResult scroll(String scrollId) {
412        if (NOSCROLL_ID.equals(scrollId)) {
413            // there is only one batch
414            return emptyResult();
415        }
416        throw new NuxeoException("Unknown or timed out scrollId");
417    }
418
419    @Override
420    public Lock getLock(String id) {
421        // TODO test performance : retrieve document with read or search document with extract
422        // TODO retrieve only some field
423        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
424        State state = readState(id);
425        if (state == null) {
426            throw new DocumentNotFoundException(id);
427        }
428        String owner = (String) state.get(KEY_LOCK_OWNER);
429        if (owner == null) {
430            // not locked
431            return null;
432        }
433        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
434        return new Lock(owner, created);
435    }
436
437    @Override
438    public Lock setLock(String id, Lock lock) {
439        State state = new State();
440        state.put(KEY_LOCK_OWNER, lock.getOwner());
441        state.put(KEY_LOCK_CREATED, lock.getCreated());
442        String lockString = MarkLogicStateSerializer.serialize(state);
443        if (log.isTraceEnabled()) {
444            log.trace("MarkLogic: SETLOCK " + id + ": " + lockString);
445        }
446        try (Session session = xccContentSource.newSession()) {
447            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy");
448            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
449            request.setNewStringVariable("lock-string", lockString);
450            // ResultSequence will be closed by Session close
451            ResultSequence result = session.submitRequest(request);
452            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
453            return extractLock(resultState);
454        } catch (RequestException e) {
455            if ("Document not found".equals(e.getMessage())) {
456                throw new DocumentNotFoundException(id, e);
457            }
458            throw new NuxeoException("An exception happened during xcc call", e);
459        }
460        // TODO check how the concurrent exception is raised
461    }
462
463    @Override
464    public Lock removeLock(String id, String owner) {
465        if (log.isTraceEnabled()) {
466            log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner);
467        }
468        try (Session session = xccContentSource.newSession()) {
469            ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy");
470            request.setNewStringVariable("uri", ID_FORMATTER.apply(id));
471            request.setNewStringVariable("owner", Strings.nullToEmpty(owner));
472            // ResultSequence will be closed by Session close
473            ResultSequence result = session.submitRequest(request);
474            State resultState = MarkLogicStateDeserializer.deserialize(result.asString());
475            return extractLock(resultState);
476        } catch (RequestException e) {
477            if ("Document not found".equals(e.getMessage())) {
478                throw new DocumentNotFoundException(id, e);
479            }
480            throw new NuxeoException("An exception happened during xcc call", e);
481        }
482    }
483
484    private Lock extractLock(State state) {
485        if (state.isEmpty()) {
486            return null;
487        }
488        String owner = (String) state.get(KEY_LOCK_OWNER);
489        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
490        Boolean failed = (Boolean) state.get("failed");
491        return new Lock(owner, created, Boolean.TRUE.equals(failed));
492    }
493
494    @Override
495    public void closeLockManager() {
496    }
497
498    @Override
499    public void clearLockManagerCaches() {
500    }
501
502    @Override
503    public void markReferencedBinaries() {
504        throw new IllegalStateException("Not implemented yet");
505    }
506
507    private void logQuery(String query) {
508        log.trace("MarkLogic: QUERY " + query);
509    }
510
511    private boolean exist(String ctsQuery) {
512        // first build exist query from cts query
513        String query = "xdmp:exists(" + ctsQuery + ")";
514        if (log.isTraceEnabled()) {
515            logQuery(query);
516        }
517        // Run query
518        try (Session session = xccContentSource.newSession()) {
519            AdhocQuery request = session.newAdhocQuery(query);
520            // ResultSequence will be closed by Session close
521            ResultSequence rs = session.submitRequest(request);
522            return Boolean.parseBoolean(rs.asString());
523        } catch (RequestException e) {
524            throw new NuxeoException("An exception happened during xcc call", e);
525        }
526    }
527
528    private State findOne(String ctsQuery) {
529        // first add limit to ctsQuery
530        String query = ctsQuery + "[1 to 1]";
531        if (log.isTraceEnabled()) {
532            logQuery(query);
533        }
534        // Run query
535        try (Session session = xccContentSource.newSession()) {
536            AdhocQuery request = session.newAdhocQuery(query);
537            // ResultSequence will be closed by Session close
538            ResultSequence rs = session.submitRequest(request);
539            if (rs.hasNext()) {
540                return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]);
541            }
542            return null;
543        } catch (RequestException e) {
544            throw new NuxeoException("An exception happened during xcc call", e);
545        }
546    }
547
548    private List<State> findAll(String ctsQuery, String... selects) {
549        String query = ctsQuery;
550        if (selects.length > 0) {
551            query = "for $i in " + query
552                    + " return document {element document{$i/document/@*,$i/document/*[ fn:local-name(.) = ("
553                    + Arrays.stream(selects)
554                            .map(MarkLogicHelper::serializeKey)
555                            .map(select -> "\"" + select + "\"")
556                            .collect(Collectors.joining(","))
557                    + ")]}}";
558        }
559        if (log.isTraceEnabled()) {
560            logQuery(query);
561        }
562        // Run query
563        try (Session session = xccContentSource.newSession()) {
564            AdhocQuery request = session.newAdhocQuery(query);
565            // ResultSequence will be closed by Session close
566            ResultSequence rs = session.submitRequest(request);
567            return Arrays.stream(rs.asStrings())
568                         .map(MarkLogicStateDeserializer::deserialize)
569                         .collect(Collectors.toList());
570        } catch (RequestException e) {
571            throw new NuxeoException("An exception happened during xcc call", e);
572        }
573    }
574
575}