001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.marklogic; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 032 033import java.io.Serializable; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Calendar; 037import java.util.Collections; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.Set; 042import java.util.UUID; 043import java.util.function.Function; 044import java.util.stream.Collectors; 045 046import javax.resource.spi.ConnectionManager; 047 048import org.apache.commons.lang.StringUtils; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.nuxeo.ecm.core.api.DocumentNotFoundException; 052import org.nuxeo.ecm.core.api.Lock; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.api.PartialList; 055import org.nuxeo.ecm.core.api.ScrollResult; 056import org.nuxeo.ecm.core.api.ScrollResultImpl; 057import org.nuxeo.ecm.core.blob.BlobManager; 058import org.nuxeo.ecm.core.model.Repository; 059import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 060import org.nuxeo.ecm.core.schema.TypeConstants; 061import org.nuxeo.ecm.core.schema.types.ComplexType; 062import org.nuxeo.ecm.core.schema.types.ListType; 063import org.nuxeo.ecm.core.schema.types.Type; 064import org.nuxeo.ecm.core.storage.State; 065import org.nuxeo.ecm.core.storage.State.StateDiff; 066import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 067import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 068import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 069import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 070import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery; 071import org.nuxeo.runtime.api.Framework; 072 073import com.google.common.base.Strings; 074import com.marklogic.xcc.AdhocQuery; 075import com.marklogic.xcc.Content; 076import com.marklogic.xcc.ContentFactory; 077import com.marklogic.xcc.ContentSource; 078import com.marklogic.xcc.ContentSourceFactory; 079import com.marklogic.xcc.ModuleInvoke; 080import com.marklogic.xcc.ResultSequence; 081import com.marklogic.xcc.Session; 082import com.marklogic.xcc.exceptions.RequestException; 083 084/** 085 * MarkLogic implementation of a {@link Repository}. 086 * 087 * @since 8.3 088 */ 089public class MarkLogicRepository extends DBSRepositoryBase { 090 091 private static final Log log = LogFactory.getLog(MarkLogicRepository.class); 092 093 private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id); 094 095 public static final String DB_DEFAULT = "nuxeo"; 096 097 protected static final String NOSCROLL_ID = "noscroll"; 098 099 protected ContentSource xccContentSource; 100 101 /** Last value used from the in-memory sequence. Used by unit tests. */ 102 protected long sequenceLastValue; 103 104 protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes; 105 106 public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) { 107 super(cm, descriptor.name, descriptor); 108 xccContentSource = newMarkLogicContentSource(descriptor); 109 rangeElementIndexes = descriptor.rangeElementIndexes.stream() 110 .map(MarkLogicRangeElementIndexDescriptor::new) 111 .collect(Collectors.toList()); 112 initRepository(); 113 } 114 115 @Override 116 public List<IdType> getAllowedIdTypes() { 117 return Collections.singletonList(IdType.varchar); 118 } 119 120 // used also by unit tests 121 public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) { 122 String host = descriptor.host; 123 Integer port = descriptor.port; 124 if (StringUtils.isBlank(host) || port == null) { 125 throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor"); 126 } 127 String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT); 128 String user = descriptor.user; 129 String password = descriptor.password; 130 return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname); 131 } 132 133 protected void initRepository() { 134 if (readState(getRootId()) == null) { 135 initRoot(); 136 } 137 } 138 139 @Override 140 public String generateNewId() { 141 if (DEBUG_UUIDS) { 142 Long id = getNextSequenceId(); 143 return "UUID_" + id; 144 } 145 return UUID.randomUUID().toString(); 146 } 147 148 // Used by unit tests 149 protected synchronized Long getNextSequenceId() { 150 sequenceLastValue++; 151 return Long.valueOf(sequenceLastValue); 152 } 153 154 @Override 155 public State readState(String id) { 156 if (log.isTraceEnabled()) { 157 log.trace("MarkLogic: READ " + id); 158 } 159 try (Session session = xccContentSource.newSession()) { 160 String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')"; 161 AdhocQuery request = session.newAdhocQuery(query); 162 // ResultSequence will be closed by Session close 163 ResultSequence rs = session.submitRequest(request); 164 if (rs.hasNext()) { 165 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 166 } 167 return null; 168 } catch (RequestException e) { 169 throw new NuxeoException("An exception happened during xcc call", e); 170 } 171 } 172 173 @Override 174 public List<State> readStates(List<String> ids) { 175 if (log.isTraceEnabled()) { 176 log.trace("MarkLogic: READ " + ids); 177 } 178 try (Session session = xccContentSource.newSession()) { 179 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 180 Collectors.joining(",", "fn:doc((", "))")); 181 AdhocQuery request = session.newAdhocQuery(query); 182 // ResultSequence will be closed by Session close 183 ResultSequence rs = session.submitRequest(request); 184 return Arrays.stream(rs.asStrings()) 185 .map(MarkLogicStateDeserializer::deserialize) 186 .collect(Collectors.toList()); 187 } catch (RequestException e) { 188 throw new NuxeoException("An exception happened during xcc call", e); 189 } 190 } 191 192 @Override 193 public void createState(State state) { 194 String id = state.get(KEY_ID).toString(); 195 if (log.isTraceEnabled()) { 196 log.trace("MarkLogic: CREATE " + id + ": " + state); 197 } 198 try (Session session = xccContentSource.newSession()) { 199 session.insertContent(convert(state)); 200 } catch (RequestException e) { 201 throw new NuxeoException("An exception happened during xcc call", e); 202 } 203 } 204 205 @Override 206 public void createStates(List<State> states) { 207 if (log.isTraceEnabled()) { 208 log.trace("MarkLogic: CREATE [" 209 + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", ")) 210 + "]: " + states); 211 } 212 try (Session session = xccContentSource.newSession()) { 213 Content[] contents = states.stream().map(this::convert).toArray(Content[]::new); 214 session.insertContent(contents); 215 } catch (RequestException e) { 216 throw new NuxeoException("An exception happened during xcc call", e); 217 } 218 } 219 220 private Content convert(State state) { 221 String id = state.get(KEY_ID).toString(); 222 return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null); 223 } 224 225 @Override 226 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 227 // TODO changeTokenUpdater 228 String patch = MarkLogicStateSerializer.serialize(diff); 229 if (log.isTraceEnabled()) { 230 log.trace("MarkLogic: UPDATE " + id + ": " + patch); 231 } 232 try (Session session = xccContentSource.newSession()) { 233 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy"); 234 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 235 request.setNewStringVariable("patch-string", patch); 236 // ResultSequence will be closed by Session close 237 session.submitRequest(request); 238 } catch (RequestException e) { 239 throw new NuxeoException("An exception happened during xcc call", e); 240 } 241 } 242 243 @Override 244 public void deleteStates(Set<String> ids) { 245 if (log.isTraceEnabled()) { 246 log.trace("MarkLogic: DELETE " + ids); 247 } 248 try (Session session = xccContentSource.newSession()) { 249 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 250 Collectors.joining(",", "xdmp:document-delete((", "))")); 251 AdhocQuery request = session.newAdhocQuery(query); 252 // ResultSequence will be closed by Session close 253 session.submitRequest(request); 254 } catch (RequestException e) { 255 throw new NuxeoException("An exception happened during xcc call", e); 256 } 257 } 258 259 @Override 260 public State readChildState(String parentId, String name, Set<String> ignored) { 261 String query = getChildQuery(parentId, name, ignored); 262 return findOne(query); 263 } 264 265 @Override 266 public boolean hasChild(String parentId, String name, Set<String> ignored) { 267 String query = getChildQuery(parentId, name, ignored); 268 return exist(query); 269 } 270 271 private String getChildQuery(String parentId, String name, Set<String> ignored) { 272 return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId) 273 .eq(KEY_NAME, name) 274 .notIn(KEY_ID, ignored) 275 .build(); 276 } 277 278 @Override 279 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 280 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 281 builder.eq(key, value); 282 builder.notIn(KEY_ID, ignored); 283 return findAll(builder.build()); 284 } 285 286 @Override 287 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 288 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 289 builder.eq(key1, value1); 290 builder.eq(key2, value2); 291 builder.notIn(KEY_ID, ignored); 292 return findAll(builder.build()); 293 } 294 295 @Override 296 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 297 Map<String, Object[]> targetProxies) { 298 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 299 builder.eq(key, value); 300 for (State state : findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, KEY_PROXY_IDS)) { 301 String id = (String) state.get(KEY_ID); 302 ids.add(id); 303 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 304 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 305 proxyTargets.put(id, targetId); 306 } 307 if (targetProxies != null) { 308 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 309 if (proxyIds != null) { 310 targetProxies.put(id, proxyIds); 311 } 312 } 313 } 314 } 315 316 @Override 317 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 318 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 319 builder.eq(key, value); 320 builder.notIn(KEY_ID, ignored); 321 return exist(builder.build()); 322 } 323 324 @Override 325 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 326 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 327 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments, 328 rangeElementIndexes); 329 MarkLogicQuery query = builder.buildQuery(); 330 // Don't do manual projection if there are no projection wildcards, as this brings no new 331 // information and is costly. The only difference is several identical rows instead of one. 332 boolean manualProjection = builder.doManualProjection(); 333 if (manualProjection) { 334 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 335 // so we need the full state from the database 336 evaluator.parse(); 337 } 338 String searchQuery = query.getSearchQuery(limit, offset); 339 if (log.isTraceEnabled()) { 340 logQuery(searchQuery); 341 } 342 // Run query 343 try (Session session = xccContentSource.newSession()) { 344 AdhocQuery request = session.newAdhocQuery(searchQuery); 345 // ResultSequence will be closed by Session close 346 ResultSequence rs = session.submitRequest(request); 347 348 List<Map<String, Serializable>> projections = new ArrayList<>(limit); 349 for (String rsItem : rs.asStrings()) { 350 State state = MarkLogicStateDeserializer.deserialize(rsItem); 351 if (manualProjection) { 352 projections.addAll(evaluator.matches(state)); 353 } else { 354 projections.add(DBSStateFlattener.flatten(state)); 355 } 356 } 357 long totalSize; 358 if (countUpTo == -1) { 359 // count full size 360 if (limit == 0) { 361 totalSize = projections.size(); 362 } else { 363 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery()); 364 // ResultSequence will be closed by Session close 365 ResultSequence countRs = session.submitRequest(countRequest); 366 totalSize = Long.parseLong(countRs.asStrings()[0]); 367 } 368 } else if (countUpTo == 0) { 369 // no count 370 totalSize = -1; // not counted 371 } else { 372 // count only if less than countUpTo 373 if (limit == 0) { 374 totalSize = projections.size(); 375 } else { 376 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1)); 377 // ResultSequence will be closed by Session close 378 ResultSequence countRs = session.submitRequest(countRequest); 379 totalSize = Long.parseLong(countRs.asStrings()[0]); 380 } 381 if (totalSize > countUpTo) { 382 totalSize = -2; // truncated 383 } 384 } 385 386 if (log.isTraceEnabled() && projections.size() != 0) { 387 log.trace("MarkLogic: -> " + projections.size()); 388 } 389 return new PartialList<>(projections, totalSize); 390 } catch (RequestException e) { 391 throw new NuxeoException("An exception happened during xcc call when executing '" + evaluator + "'", e); 392 } 393 } 394 395 @Override 396 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) { 397 // Not yet implemented, return all result in one shot for now 398 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes); 399 String query = builder.buildQuery().getSearchQuery(); 400 // Run query 401 try (Session session = xccContentSource.newSession()) { 402 AdhocQuery request = session.newAdhocQuery(query); 403 // ResultSequence will be closed by Session close 404 ResultSequence rs = session.submitRequest(request); 405 return Arrays.stream(rs.asStrings()) 406 .map(MarkLogicStateDeserializer::deserialize) 407 .map(state -> state.get(KEY_ID).toString()) 408 .collect(Collectors.collectingAndThen(Collectors.toList(), 409 ids -> new ScrollResultImpl(NOSCROLL_ID, ids))); 410 } catch (RequestException e) { 411 throw new NuxeoException("An exception happened during xcc call", e); 412 } 413 } 414 415 @Override 416 public ScrollResult scroll(String scrollId) { 417 if (NOSCROLL_ID.equals(scrollId)) { 418 // there is only one batch 419 return emptyResult(); 420 } 421 throw new NuxeoException("Unknown or timed out scrollId"); 422 } 423 424 @Override 425 public Lock getLock(String id) { 426 // TODO test performance : retrieve document with read or search document with extract 427 // TODO retrieve only some field 428 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 429 State state = readState(id); 430 if (state == null) { 431 throw new DocumentNotFoundException(id); 432 } 433 String owner = (String) state.get(KEY_LOCK_OWNER); 434 if (owner == null) { 435 // not locked 436 return null; 437 } 438 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 439 return new Lock(owner, created); 440 } 441 442 @Override 443 public Lock setLock(String id, Lock lock) { 444 State state = new State(); 445 state.put(KEY_LOCK_OWNER, lock.getOwner()); 446 state.put(KEY_LOCK_CREATED, lock.getCreated()); 447 String lockString = MarkLogicStateSerializer.serialize(state); 448 if (log.isTraceEnabled()) { 449 log.trace("MarkLogic: SETLOCK " + id + ": " + lockString); 450 } 451 try (Session session = xccContentSource.newSession()) { 452 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy"); 453 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 454 request.setNewStringVariable("lock-string", lockString); 455 // ResultSequence will be closed by Session close 456 ResultSequence result = session.submitRequest(request); 457 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 458 return extractLock(resultState); 459 } catch (RequestException e) { 460 if ("Document not found".equals(e.getMessage())) { 461 throw new DocumentNotFoundException(id, e); 462 } 463 throw new NuxeoException("An exception happened during xcc call", e); 464 } 465 // TODO check how the concurrent exception is raised 466 } 467 468 @Override 469 public Lock removeLock(String id, String owner) { 470 if (log.isTraceEnabled()) { 471 log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner); 472 } 473 try (Session session = xccContentSource.newSession()) { 474 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy"); 475 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 476 request.setNewStringVariable("owner", Strings.nullToEmpty(owner)); 477 // ResultSequence will be closed by Session close 478 ResultSequence result = session.submitRequest(request); 479 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 480 return extractLock(resultState); 481 } catch (RequestException e) { 482 if ("Document not found".equals(e.getMessage())) { 483 throw new DocumentNotFoundException(id, e); 484 } 485 throw new NuxeoException("An exception happened during xcc call", e); 486 } 487 } 488 489 private Lock extractLock(State state) { 490 if (state.isEmpty()) { 491 return null; 492 } 493 String owner = (String) state.get(KEY_LOCK_OWNER); 494 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 495 Boolean failed = (Boolean) state.get("failed"); 496 return new Lock(owner, created, Boolean.TRUE.equals(failed)); 497 } 498 499 @Override 500 public void closeLockManager() { 501 } 502 503 @Override 504 public void clearLockManagerCaches() { 505 } 506 507 protected List<String> binaryPaths; 508 509 @Override 510 protected void initBlobsPaths() { 511 MarkLogicBlobFinder finder = new MarkLogicBlobFinder(); 512 finder.visit(); 513 binaryPaths = finder.binaryPaths; 514 } 515 516 protected static class MarkLogicBlobFinder extends BlobFinder { 517 protected List<String> binaryPaths = new ArrayList<>(); 518 519 @Override 520 protected void recordBlobPath() { 521 path.addLast(KEY_BLOB_DATA); 522 StringBuilder binaryPath = new StringBuilder(); 523 MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager(); 524 Type previousType = null; 525 for (String element : path) { 526 if (binaryPath.length() > 0) { 527 binaryPath.append('/'); 528 } 529 // Append current element to path 530 binaryPath.append(element); 531 if (previousType == null) { 532 // No previous type - it's the first element 533 previousType = schemaManager.computeField(String.join(".", path), element).getType(); 534 } else if (previousType.isComplexType()) { 535 // Previous type is a complex type - retrieve the type of current element 536 if (TypeConstants.isContentType(previousType)) { 537 // The complex type hold the binary data, it's the last element, so break the loop 538 break; 539 } 540 previousType = ((ComplexType) previousType).getField(element).getType(); 541 } 542 // Add the item array element if type is a list (here previousType is the type of current element) 543 // Here we re allocate previousType to item array type as there's no element in this.path to select item 544 if (previousType.isListType()) { 545 binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX); 546 previousType = ((ListType) previousType).getFieldType(); 547 } 548 } 549 binaryPaths.add(binaryPath.toString()); 550 path.removeLast(); 551 } 552 } 553 554 @Override 555 public void markReferencedBinaries() { 556 BlobManager blobManager = Framework.getService(BlobManager.class); 557 // TODO add a query to not scan all documents 558 String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build(); 559 for (State state : findAll(query, binaryPaths.toArray(new String[0]))) { 560 markReferencedBinaries(state, blobManager); 561 } 562 } 563 564 protected void markReferencedBinaries(State state, BlobManager blobManager) { 565 for (Entry<String, Serializable> entry: state.entrySet()) { 566 Serializable value = entry.getValue(); 567 if (value instanceof List) { 568 List<?> list = (List<?>) value; 569 for (Object v : list) { 570 if (v instanceof State) { 571 markReferencedBinaries((State) v, blobManager); 572 } else { 573 markReferencedBinary(v, blobManager); 574 } 575 } 576 } else if (value instanceof Object[]) { 577 for (Object v : (Object[]) value) { 578 markReferencedBinary(v, blobManager); 579 } 580 } else if (value instanceof State) { 581 markReferencedBinaries((State) value, blobManager); 582 } else { 583 markReferencedBinary(value, blobManager); 584 } 585 } 586 } 587 588 protected void markReferencedBinary(Object value, BlobManager blobManager) { 589 if (!(value instanceof String)) { 590 return; 591 } 592 String key = (String) value; 593 blobManager.markReferencedBinary(key, repositoryName); 594 } 595 596 private void logQuery(String query) { 597 log.trace("MarkLogic: QUERY " + query); 598 } 599 600 private boolean exist(String ctsQuery) { 601 // first build exist query from cts query 602 String query = "xdmp:exists(" + ctsQuery + ")"; 603 if (log.isTraceEnabled()) { 604 logQuery(query); 605 } 606 // Run query 607 try (Session session = xccContentSource.newSession()) { 608 AdhocQuery request = session.newAdhocQuery(query); 609 // ResultSequence will be closed by Session close 610 ResultSequence rs = session.submitRequest(request); 611 return Boolean.parseBoolean(rs.asString()); 612 } catch (RequestException e) { 613 throw new NuxeoException("An exception happened during xcc call", e); 614 } 615 } 616 617 private State findOne(String ctsQuery) { 618 // first add limit to ctsQuery 619 String query = ctsQuery + "[1 to 1]"; 620 if (log.isTraceEnabled()) { 621 logQuery(query); 622 } 623 // Run query 624 try (Session session = xccContentSource.newSession()) { 625 AdhocQuery request = session.newAdhocQuery(query); 626 // ResultSequence will be closed by Session close 627 ResultSequence rs = session.submitRequest(request); 628 if (rs.hasNext()) { 629 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 630 } 631 return null; 632 } catch (RequestException e) { 633 throw new NuxeoException("An exception happened during xcc call", e); 634 } 635 } 636 637 private List<State> findAll(String ctsQuery, String... selects) { 638 String query = ctsQuery; 639 if (selects.length > 0) { 640 query = "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n" 641 + "let $paths := (" + Arrays.stream(selects) 642 .map(MarkLogicHelper::serializeKey) 643 .map(select -> "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/" + select 644 + "\"") 645 .collect(Collectors.joining(",\n")) 646 + ")let $namespaces := ()\n" + "for $i in " + query 647 + " return extract:extract-nodes($i, $paths, $namespaces)"; 648 } 649 if (log.isTraceEnabled()) { 650 logQuery(query); 651 } 652 // Run query 653 try (Session session = xccContentSource.newSession()) { 654 AdhocQuery request = session.newAdhocQuery(query); 655 // ResultSequence will be closed by Session close 656 ResultSequence rs = session.submitRequest(request); 657 return Arrays.stream(rs.asStrings()) 658 .map(MarkLogicStateDeserializer::deserialize) 659 .collect(Collectors.toList()); 660 } catch (RequestException e) { 661 throw new NuxeoException("An exception happened during xcc call", e); 662 } 663 } 664 665}