001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.storage.marklogic;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
030
031import java.io.Serializable;
032import java.util.ArrayList;
033import java.util.Calendar;
034import java.util.Collections;
035import java.util.List;
036import java.util.Map;
037import java.util.Optional;
038import java.util.Set;
039import java.util.UUID;
040import java.util.function.Function;
041import java.util.stream.Collectors;
042import java.util.stream.StreamSupport;
043
044import javax.resource.spi.ConnectionManager;
045
046import org.apache.commons.lang.StringUtils;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
050import org.nuxeo.ecm.core.api.DocumentNotFoundException;
051import org.nuxeo.ecm.core.api.Lock;
052import org.nuxeo.ecm.core.api.NuxeoException;
053import org.nuxeo.ecm.core.api.PartialList;
054import org.nuxeo.ecm.core.model.LockManager;
055import org.nuxeo.ecm.core.model.Repository;
056import org.nuxeo.ecm.core.query.QueryParseException;
057import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
058import org.nuxeo.ecm.core.storage.State;
059import org.nuxeo.ecm.core.storage.State.StateDiff;
060import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
061import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
062import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
063
064import com.marklogic.client.DatabaseClient;
065import com.marklogic.client.DatabaseClientFactory;
066import com.marklogic.client.DatabaseClientFactory.Authentication;
067import com.marklogic.client.FailedRequestException;
068import com.marklogic.client.ResourceNotFoundException;
069import com.marklogic.client.admin.ServerConfigurationManager;
070import com.marklogic.client.admin.ServerConfigurationManager.UpdatePolicy;
071import com.marklogic.client.document.DocumentDescriptor;
072import com.marklogic.client.document.DocumentMetadataPatchBuilder.PatchHandle;
073import com.marklogic.client.document.DocumentPage;
074import com.marklogic.client.document.DocumentRecord;
075import com.marklogic.client.document.DocumentWriteSet;
076import com.marklogic.client.document.XMLDocumentManager;
077import com.marklogic.client.query.RawQueryDefinition;
078
079/**
080 * MarkLogic implementation of a {@link Repository}.
081 *
082 * @since 8.3
083 */
084public class MarkLogicRepository extends DBSRepositoryBase {
085
086    private static final Log log = LogFactory.getLog(MarkLogicRepository.class);
087
088    private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id);
089
090    public static final String DB_DEFAULT = "nuxeo";
091
092    protected DatabaseClient markLogicClient;
093
094    public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) {
095        super(cm, descriptor.name, descriptor);
096        markLogicClient = newMarkLogicClient(descriptor);
097        initRepository();
098    }
099
100    @Override
101    public List<IdType> getAllowedIdTypes() {
102        return Collections.singletonList(IdType.varchar);
103    }
104
105    @Override
106    public void shutdown() {
107        super.shutdown();
108        markLogicClient.release();
109    }
110
111    // used also by unit tests
112    public static DatabaseClient newMarkLogicClient(MarkLogicRepositoryDescriptor descriptor) {
113        String host = descriptor.host;
114        Integer port = descriptor.port;
115        if (StringUtils.isBlank(host) || port == null) {
116            throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor");
117        }
118        String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT);
119        String user = descriptor.user;
120        String password = descriptor.password;
121        if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
122            return DatabaseClientFactory.newClient(host, port, dbname, user, password, Authentication.DIGEST);
123        }
124        return DatabaseClientFactory.newClient(host, port, dbname);
125    }
126
127    protected void initRepository() {
128        // Activate Optimistic Locking
129        // https://docs.marklogic.com/guide/java/transactions#id_81051
130        ServerConfigurationManager configMgr = markLogicClient.newServerConfigManager();
131        configMgr.readConfiguration();
132        configMgr.setUpdatePolicy(UpdatePolicy.VERSION_OPTIONAL);
133        // write the server configuration to the database
134        configMgr.writeConfiguration();
135        if (readState(getRootId()) == null) {
136            initRoot();
137        }
138    }
139
140    @Override
141    protected void initBlobsPaths() {
142        // throw new IllegalStateException("Not implemented yet");
143    }
144
145    @Override
146    public String generateNewId() {
147        return UUID.randomUUID().toString();
148    }
149
150    @Override
151    public State readState(String id) {
152        if (log.isTraceEnabled()) {
153            log.trace("MarkLogic: READ " + id);
154        }
155        try {
156            return markLogicClient.newXMLDocumentManager().read(ID_FORMATTER.apply(id), new StateHandle()).get();
157        } catch (ResourceNotFoundException e) {
158            return null;
159        }
160    }
161
162    @Override
163    public List<State> readStates(List<String> ids) {
164        if (log.isTraceEnabled()) {
165            log.trace("MarkLogic: READ " + ids);
166        }
167        String[] markLogicIds = ids.stream().map(ID_FORMATTER).toArray(String[]::new);
168        DocumentPage page = markLogicClient.newXMLDocumentManager().read(markLogicIds);
169        return StreamSupport.stream(page.spliterator(), false)
170                            .map(document -> document.getContent(new StateHandle()).get())
171                            .collect(Collectors.toList());
172    }
173
174    @Override
175    public void createState(State state) {
176        String id = state.get(KEY_ID).toString();
177        if (log.isTraceEnabled()) {
178            log.trace("MarkLogic: CREATE " + id + ": " + state);
179        }
180        markLogicClient.newXMLDocumentManager().write(ID_FORMATTER.apply(id), new StateHandle(state));
181    }
182
183    @Override
184    public void createStates(List<State> states) {
185        XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager();
186        DocumentWriteSet writeSet = docManager.newWriteSet();
187        for (State state : states) {
188            String id = state.get(KEY_ID).toString();
189            writeSet.add(ID_FORMATTER.apply(id), new StateHandle(state));
190        }
191        if (log.isTraceEnabled()) {
192            log.trace("MarkLogic: CREATE ["
193                    + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", "))
194                    + "]: " + states);
195        }
196        docManager.write(writeSet);
197    }
198
199    @Override
200    public void updateState(String id, StateDiff diff) {
201        XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager();
202        PatchHandle patch = new MarkLogicStateUpdateBuilder(docManager::newPatchBuilder).apply(diff);
203        if (log.isTraceEnabled()) {
204            log.trace("MarkLogic: UPDATE " + id + ": " + patch.toString());
205        }
206        docManager.patch(ID_FORMATTER.apply(id), patch);
207    }
208
209    @Override
210    public void deleteStates(Set<String> ids) {
211        if (log.isTraceEnabled()) {
212            log.trace("MarkLogic: DELETE " + ids);
213        }
214        String[] markLogicIds = ids.stream().map(ID_FORMATTER).toArray(String[]::new);
215        markLogicClient.newXMLDocumentManager().delete(markLogicIds);
216    }
217
218    @Override
219    public State readChildState(String parentId, String name, Set<String> ignored) {
220        RawQueryDefinition query = getChildQuery(parentId, name, ignored);
221        return findOne(query);
222    }
223
224    @Override
225    public boolean hasChild(String parentId, String name, Set<String> ignored) {
226        RawQueryDefinition query = getChildQuery(parentId, name, ignored);
227        return exist(query);
228    }
229
230    private RawQueryDefinition getChildQuery(String parentId, String name, Set<String> ignored) {
231        return new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager()).eq(KEY_PARENT_ID, parentId)
232                                                                                 .eq(KEY_NAME, name)
233                                                                                 .notIn(KEY_ID, ignored)
234                                                                                 .build();
235    }
236
237    @Override
238    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
239        return queryKeyValue(key, value, ignored, this::findAll);
240    }
241
242    @Override
243    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
244        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager());
245        builder.eq(key1, value1);
246        builder.eq(key2, value2);
247        builder.notIn(KEY_ID, ignored);
248        return findAll(builder.build());
249    }
250
251    @Override
252    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
253            Map<String, Object[]> targetProxies) {
254        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager());
255        builder.eq(key, value);
256        builder.select(KEY_ID);
257        builder.select(KEY_IS_PROXY);
258        builder.select(KEY_PROXY_TARGET_ID);
259        builder.select(KEY_PROXY_IDS);
260        RawQueryDefinition query = builder.build();
261        if (log.isTraceEnabled()) {
262            logQuery(query);
263        }
264
265        try (DocumentPage page = markLogicClient.newXMLDocumentManager().search(query, 0)) {
266            for (DocumentRecord record : page) {
267                State state = record.getContent(new StateHandle()).get();
268                String id = (String) state.get(KEY_ID);
269                ids.add(id);
270                if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
271                    String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
272                    proxyTargets.put(id, targetId);
273                }
274                if (targetProxies != null) {
275                    Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
276                    if (proxyIds != null) {
277                        targetProxies.put(id, proxyIds);
278                    }
279                }
280            }
281        }
282    }
283
284    @Override
285    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
286        return queryKeyValue(key, value, ignored, this::exist);
287    }
288
289    @Override
290    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
291            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
292        MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(markLogicClient.newQueryManager(), evaluator,
293                orderByClause, distinctDocuments);
294        RawQueryDefinition query = builder.buildQuery();
295        // Don't do manual projection if there are no projection wildcards, as this brings no new
296        // information and is costly. The only difference is several identical rows instead of one.
297        boolean manualProjection = builder.doManualProjection();
298        if (manualProjection) {
299            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
300            // so we need the full state from the database
301            evaluator.parse();
302        }
303        if (log.isTraceEnabled()) {
304            logQuery(query, limit, offset);
305        }
306        XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager();
307        docManager.setPageLength(limit == 0 ? 50 : limit);
308        try (DocumentPage page = docManager.search(query, offset)) {
309            List<Map<String, Serializable>> projections = new ArrayList<>((int) page.size());
310            for (DocumentRecord record : page) {
311                State state = record.getContent(new StateHandle()).get();
312                if (manualProjection) {
313                    projections.addAll(evaluator.matches(state));
314                } else {
315                    projections.add(DBSStateFlattener.flatten(state));
316                }
317            }
318            long totalSize;
319            if (countUpTo == -1) {
320                // count full size
321                if (limit == 0) {
322                    totalSize = projections.size();
323                } else {
324                    totalSize = page.getTotalSize();
325                }
326            } else if (countUpTo == 0) {
327                // no count
328                totalSize = -1; // not counted
329            } else {
330                // count only if less than countUpTo
331                if (limit == 0) {
332                    totalSize = projections.size();
333                } else {
334                    totalSize = page.getTotalSize();
335                }
336                if (totalSize > countUpTo) {
337                    totalSize = -2; // truncated
338                }
339            }
340
341            if (log.isTraceEnabled() && projections.size() != 0) {
342                log.trace("MarkLogic:    -> " + projections.size());
343            }
344            return new PartialList<>(projections, totalSize);
345        } catch (FailedRequestException fre) {
346            throw new QueryParseException("Request was rejected by server", fre);
347        }
348    }
349
350    @Override
351    public Lock getLock(String id) {
352        // TODO test performance : retrieve document with read or search document with extract
353        // TODO retrieve only some field
354        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
355        State state = readState(id);
356        if (state == null) {
357            throw new DocumentNotFoundException(id);
358        }
359        String owner = (String) state.get(KEY_LOCK_OWNER);
360        if (owner == null) {
361            // not locked
362            return null;
363        }
364        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
365        return new Lock(owner, created);
366    }
367
368    @Override
369    public Lock setLock(String id, Lock lock) {
370        // Here we use Optimistic Locking to set the lock
371        // https://docs.marklogic.com/guide/java/transactions#id_81051
372        XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager();
373        DocumentDescriptor descriptor = docManager.newDescriptor(ID_FORMATTER.apply(id));
374        // TODO test performance : retrieve document with read or search document with extract
375        // TODO retrieve only some field
376        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
377        try {
378            if (log.isTraceEnabled()) {
379                log.trace("MarkLogic: READ " + id);
380            }
381            State state = docManager.read(descriptor, new StateHandle()).get();
382            Optional<Lock> oldLock = extractLock(state);
383            if (oldLock.isPresent()) {
384                // Lock owner already set
385                return oldLock.get();
386            }
387            // Set the lock
388            PatchHandle patch = new MarkLogicLockUpdateBuilder(docManager::newPatchBuilder).set(lock);
389            if (log.isTraceEnabled()) {
390                log.trace("MarkLogic: UPDATE " + id + ": " + patch.toString());
391            }
392            docManager.patch(descriptor, patch);
393            // doc is now locked
394            return null;
395        } catch (ResourceNotFoundException e) {
396            // Document not found
397            throw new DocumentNotFoundException(id, e);
398        } catch (FailedRequestException e) {
399            // There was a race condition - another lock was set
400            return extractLock(readState(id)).orElseThrow(() -> new ConcurrentUpdateException("Lock " + id));
401        }
402    }
403
404    @Override
405    public Lock removeLock(String id, String owner) {
406        // Here we use Optimistic Locking to set the lock
407        // https://docs.marklogic.com/guide/java/transactions#id_81051
408        XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager();
409        DocumentDescriptor descriptor = docManager.newDescriptor(ID_FORMATTER.apply(id));
410        // TODO test performance : retrieve document with read or search document with extract
411        // TODO retrieve only some field
412        // https://docs.marklogic.com/guide/search-dev/qbe#id_54044
413        try {
414            if (log.isTraceEnabled()) {
415                log.trace("MarkLogic: READ " + id);
416            }
417            // Retrieve state of document
418            State state = docManager.read(descriptor, new StateHandle()).get();
419            Optional<Lock> oldLockOpt = extractLock(state);
420            if (oldLockOpt.isPresent()) {
421                // A Lock exist on document
422                Lock oldLock = oldLockOpt.get();
423                if (LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
424                    // Delete the lock
425                    PatchHandle patch = new MarkLogicLockUpdateBuilder(docManager::newPatchBuilder).delete();
426                    if (log.isTraceEnabled()) {
427                        log.trace("MarkLogic: UPDATE " + id + ": " + patch.toString());
428                    }
429                    docManager.patch(descriptor, patch);
430                    // Return previous lock
431                    return oldLock;
432                } else {
433                    // existing mismatched lock, flag failure
434                    return new Lock(oldLock.getOwner(), oldLock.getCreated(), true);
435                }
436            } else {
437                // document was not locked
438                return null;
439            }
440        } catch (ResourceNotFoundException e) {
441            // Document not found
442            throw new DocumentNotFoundException(id, e);
443        }
444    }
445
446    private Optional<Lock> extractLock(State state) {
447        String owner = (String) state.get(KEY_LOCK_OWNER);
448        if (owner == null) {
449            return Optional.empty();
450        }
451        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
452        return Optional.of(new Lock(owner, oldCreated));
453    }
454
455    @Override
456    public void closeLockManager() {
457    }
458
459    @Override
460    public void clearLockManagerCaches() {
461    }
462
463    @Override
464    public void markReferencedBinaries() {
465        throw new IllegalStateException("Not implemented yet");
466    }
467
468    private <T> T queryKeyValue(String key, Object value, Set<String> ignored, Function<RawQueryDefinition, T> executor) {
469        MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager());
470        builder.eq(key, value);
471        builder.notIn(KEY_ID, ignored);
472        return executor.apply(builder.build());
473    }
474
475    private boolean exist(RawQueryDefinition query) {
476        if (log.isTraceEnabled()) {
477            logQuery(query);
478        }
479        return markLogicClient.newQueryManager().findOne(query) != null;
480    }
481
482    private State findOne(RawQueryDefinition query) {
483        if (log.isTraceEnabled()) {
484            logQuery(query);
485        }
486        XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager();
487        docManager.setPageLength(1);
488        try (DocumentPage page = docManager.search(query, 0)) {
489            if (page.hasNext()) {
490                return page.nextContent(new StateHandle()).get();
491            }
492            return null;
493        }
494    }
495
496    private List<State> findAll(RawQueryDefinition query) {
497        if (log.isTraceEnabled()) {
498            logQuery(query);
499        }
500        return findAll(query, 1);
501    }
502
503    private List<State> findAll(RawQueryDefinition query, long start) {
504        try (DocumentPage page = markLogicClient.newXMLDocumentManager().search(query, start)) {
505            List<State> states = new ArrayList<>((int) (page.getTotalSize() - start + 1));
506            for (DocumentRecord record : page) {
507                states.add(record.getContent(new StateHandle()).get());
508            }
509            if (page.hasNextPage()) {
510                states.addAll(findAll(query, start + page.getPageSize()));
511            }
512            return states;
513        }
514    }
515
516    private void logQuery(RawQueryDefinition query) {
517        log.trace("MarkLogic: QUERY " + query.getHandle());
518    }
519
520    private void logQuery(RawQueryDefinition query, int limit, int offset) {
521        log.trace("MarkLogic: QUERY " + query.getHandle() + " OFFSET " + offset + " LIMIT " + limit);
522    }
523
524}