001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.marklogic; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 032 033import java.io.Serializable; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Calendar; 037import java.util.Collections; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.Set; 042import java.util.UUID; 043import java.util.function.Function; 044import java.util.stream.Collectors; 045 046import javax.resource.spi.ConnectionManager; 047 048import org.apache.commons.lang.StringUtils; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.nuxeo.ecm.core.api.DocumentNotFoundException; 052import org.nuxeo.ecm.core.api.Lock; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.api.PartialList; 055import org.nuxeo.ecm.core.api.ScrollResult; 056import org.nuxeo.ecm.core.api.ScrollResultImpl; 057import org.nuxeo.ecm.core.blob.BlobManager; 058import org.nuxeo.ecm.core.model.Repository; 059import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 060import org.nuxeo.ecm.core.schema.TypeConstants; 061import org.nuxeo.ecm.core.schema.types.ComplexType; 062import org.nuxeo.ecm.core.schema.types.ListType; 063import org.nuxeo.ecm.core.schema.types.Type; 064import org.nuxeo.ecm.core.storage.State; 065import org.nuxeo.ecm.core.storage.State.StateDiff; 066import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 067import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 068import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 069import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery; 070import org.nuxeo.runtime.api.Framework; 071 072import com.google.common.base.Strings; 073import com.marklogic.xcc.AdhocQuery; 074import com.marklogic.xcc.Content; 075import com.marklogic.xcc.ContentFactory; 076import com.marklogic.xcc.ContentSource; 077import com.marklogic.xcc.ContentSourceFactory; 078import com.marklogic.xcc.ModuleInvoke; 079import com.marklogic.xcc.ResultSequence; 080import com.marklogic.xcc.Session; 081import com.marklogic.xcc.exceptions.RequestException; 082 083/** 084 * MarkLogic implementation of a {@link Repository}. 085 * 086 * @since 8.3 087 */ 088public class MarkLogicRepository extends DBSRepositoryBase { 089 090 private static final Log log = LogFactory.getLog(MarkLogicRepository.class); 091 092 private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id); 093 094 public static final String DB_DEFAULT = "nuxeo"; 095 096 protected static final String NOSCROLL_ID = "noscroll"; 097 098 protected ContentSource xccContentSource; 099 100 /** Last value used from the in-memory sequence. Used by unit tests. */ 101 protected long sequenceLastValue; 102 103 protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes; 104 105 public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) { 106 super(cm, descriptor.name, descriptor); 107 xccContentSource = newMarkLogicContentSource(descriptor); 108 rangeElementIndexes = descriptor.rangeElementIndexes.stream() 109 .map(MarkLogicRangeElementIndexDescriptor::new) 110 .collect(Collectors.toList()); 111 initRepository(); 112 } 113 114 @Override 115 public List<IdType> getAllowedIdTypes() { 116 return Collections.singletonList(IdType.varchar); 117 } 118 119 // used also by unit tests 120 public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) { 121 String host = descriptor.host; 122 Integer port = descriptor.port; 123 if (StringUtils.isBlank(host) || port == null) { 124 throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor"); 125 } 126 String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT); 127 String user = descriptor.user; 128 String password = descriptor.password; 129 return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname); 130 } 131 132 protected void initRepository() { 133 if (readState(getRootId()) == null) { 134 initRoot(); 135 } 136 } 137 138 @Override 139 public String generateNewId() { 140 if (DEBUG_UUIDS) { 141 Long id = getNextSequenceId(); 142 return "UUID_" + id; 143 } 144 return UUID.randomUUID().toString(); 145 } 146 147 // Used by unit tests 148 protected synchronized Long getNextSequenceId() { 149 sequenceLastValue++; 150 return Long.valueOf(sequenceLastValue); 151 } 152 153 @Override 154 public State readState(String id) { 155 if (log.isTraceEnabled()) { 156 log.trace("MarkLogic: READ " + id); 157 } 158 try (Session session = xccContentSource.newSession()) { 159 String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')"; 160 AdhocQuery request = session.newAdhocQuery(query); 161 // ResultSequence will be closed by Session close 162 ResultSequence rs = session.submitRequest(request); 163 if (rs.hasNext()) { 164 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 165 } 166 return null; 167 } catch (RequestException e) { 168 throw new NuxeoException("An exception happened during xcc call", e); 169 } 170 } 171 172 @Override 173 public List<State> readStates(List<String> ids) { 174 if (log.isTraceEnabled()) { 175 log.trace("MarkLogic: READ " + ids); 176 } 177 try (Session session = xccContentSource.newSession()) { 178 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 179 Collectors.joining(",", "fn:doc((", "))")); 180 AdhocQuery request = session.newAdhocQuery(query); 181 // ResultSequence will be closed by Session close 182 ResultSequence rs = session.submitRequest(request); 183 return Arrays.stream(rs.asStrings()) 184 .map(MarkLogicStateDeserializer::deserialize) 185 .collect(Collectors.toList()); 186 } catch (RequestException e) { 187 throw new NuxeoException("An exception happened during xcc call", e); 188 } 189 } 190 191 @Override 192 public void createState(State state) { 193 String id = state.get(KEY_ID).toString(); 194 if (log.isTraceEnabled()) { 195 log.trace("MarkLogic: CREATE " + id + ": " + state); 196 } 197 try (Session session = xccContentSource.newSession()) { 198 session.insertContent(convert(state)); 199 } catch (RequestException e) { 200 throw new NuxeoException("An exception happened during xcc call", e); 201 } 202 } 203 204 @Override 205 public void createStates(List<State> states) { 206 if (log.isTraceEnabled()) { 207 log.trace("MarkLogic: CREATE [" 208 + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", ")) 209 + "]: " + states); 210 } 211 try (Session session = xccContentSource.newSession()) { 212 Content[] contents = states.stream().map(this::convert).toArray(Content[]::new); 213 session.insertContent(contents); 214 } catch (RequestException e) { 215 throw new NuxeoException("An exception happened during xcc call", e); 216 } 217 } 218 219 private Content convert(State state) { 220 String id = state.get(KEY_ID).toString(); 221 return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null); 222 } 223 224 @Override 225 public void updateState(String id, StateDiff diff) { 226 String patch = MarkLogicStateSerializer.serialize(diff); 227 if (log.isTraceEnabled()) { 228 log.trace("MarkLogic: UPDATE " + id + ": " + patch); 229 } 230 try (Session session = xccContentSource.newSession()) { 231 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy"); 232 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 233 request.setNewStringVariable("patch-string", patch); 234 // ResultSequence will be closed by Session close 235 session.submitRequest(request); 236 } catch (RequestException e) { 237 throw new NuxeoException("An exception happened during xcc call", e); 238 } 239 } 240 241 @Override 242 public void deleteStates(Set<String> ids) { 243 if (log.isTraceEnabled()) { 244 log.trace("MarkLogic: DELETE " + ids); 245 } 246 try (Session session = xccContentSource.newSession()) { 247 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 248 Collectors.joining(",", "xdmp:document-delete((", "))")); 249 AdhocQuery request = session.newAdhocQuery(query); 250 // ResultSequence will be closed by Session close 251 session.submitRequest(request); 252 } catch (RequestException e) { 253 throw new NuxeoException("An exception happened during xcc call", e); 254 } 255 } 256 257 @Override 258 public State readChildState(String parentId, String name, Set<String> ignored) { 259 String query = getChildQuery(parentId, name, ignored); 260 return findOne(query); 261 } 262 263 @Override 264 public boolean hasChild(String parentId, String name, Set<String> ignored) { 265 String query = getChildQuery(parentId, name, ignored); 266 return exist(query); 267 } 268 269 private String getChildQuery(String parentId, String name, Set<String> ignored) { 270 return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId) 271 .eq(KEY_NAME, name) 272 .notIn(KEY_ID, ignored) 273 .build(); 274 } 275 276 @Override 277 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 278 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 279 builder.eq(key, value); 280 builder.notIn(KEY_ID, ignored); 281 return findAll(builder.build()); 282 } 283 284 @Override 285 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 286 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 287 builder.eq(key1, value1); 288 builder.eq(key2, value2); 289 builder.notIn(KEY_ID, ignored); 290 return findAll(builder.build()); 291 } 292 293 @Override 294 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 295 Map<String, Object[]> targetProxies) { 296 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 297 builder.eq(key, value); 298 for (State state : findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, KEY_PROXY_IDS)) { 299 String id = (String) state.get(KEY_ID); 300 ids.add(id); 301 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 302 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 303 proxyTargets.put(id, targetId); 304 } 305 if (targetProxies != null) { 306 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 307 if (proxyIds != null) { 308 targetProxies.put(id, proxyIds); 309 } 310 } 311 } 312 } 313 314 @Override 315 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 316 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 317 builder.eq(key, value); 318 builder.notIn(KEY_ID, ignored); 319 return exist(builder.build()); 320 } 321 322 @Override 323 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 324 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 325 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments, 326 rangeElementIndexes); 327 MarkLogicQuery query = builder.buildQuery(); 328 // Don't do manual projection if there are no projection wildcards, as this brings no new 329 // information and is costly. The only difference is several identical rows instead of one. 330 boolean manualProjection = builder.doManualProjection(); 331 if (manualProjection) { 332 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 333 // so we need the full state from the database 334 evaluator.parse(); 335 } 336 String searchQuery = query.getSearchQuery(limit, offset); 337 if (log.isTraceEnabled()) { 338 logQuery(searchQuery); 339 } 340 // Run query 341 try (Session session = xccContentSource.newSession()) { 342 AdhocQuery request = session.newAdhocQuery(searchQuery); 343 // ResultSequence will be closed by Session close 344 ResultSequence rs = session.submitRequest(request); 345 346 List<Map<String, Serializable>> projections = new ArrayList<>(limit); 347 for (String rsItem : rs.asStrings()) { 348 State state = MarkLogicStateDeserializer.deserialize(rsItem); 349 if (manualProjection) { 350 projections.addAll(evaluator.matches(state)); 351 } else { 352 projections.add(DBSStateFlattener.flatten(state)); 353 } 354 } 355 long totalSize; 356 if (countUpTo == -1) { 357 // count full size 358 if (limit == 0) { 359 totalSize = projections.size(); 360 } else { 361 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery()); 362 // ResultSequence will be closed by Session close 363 ResultSequence countRs = session.submitRequest(countRequest); 364 totalSize = Long.parseLong(countRs.asStrings()[0]); 365 } 366 } else if (countUpTo == 0) { 367 // no count 368 totalSize = -1; // not counted 369 } else { 370 // count only if less than countUpTo 371 if (limit == 0) { 372 totalSize = projections.size(); 373 } else { 374 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1)); 375 // ResultSequence will be closed by Session close 376 ResultSequence countRs = session.submitRequest(countRequest); 377 totalSize = Long.parseLong(countRs.asStrings()[0]); 378 } 379 if (totalSize > countUpTo) { 380 totalSize = -2; // truncated 381 } 382 } 383 384 if (log.isTraceEnabled() && projections.size() != 0) { 385 log.trace("MarkLogic: -> " + projections.size()); 386 } 387 return new PartialList<>(projections, totalSize); 388 } catch (RequestException e) { 389 throw new NuxeoException("An exception happened during xcc call", e); 390 } 391 } 392 393 @Override 394 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) { 395 // Not yet implemented, return all result in one shot for now 396 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes); 397 String query = builder.buildQuery().getSearchQuery(); 398 // Run query 399 try (Session session = xccContentSource.newSession()) { 400 AdhocQuery request = session.newAdhocQuery(query); 401 // ResultSequence will be closed by Session close 402 ResultSequence rs = session.submitRequest(request); 403 return Arrays.stream(rs.asStrings()) 404 .map(MarkLogicStateDeserializer::deserialize) 405 .map(state -> state.get(KEY_ID).toString()) 406 .collect(Collectors.collectingAndThen(Collectors.toList(), 407 ids -> new ScrollResultImpl(NOSCROLL_ID, ids))); 408 } catch (RequestException e) { 409 throw new NuxeoException("An exception happened during xcc call", e); 410 } 411 } 412 413 @Override 414 public ScrollResult scroll(String scrollId) { 415 if (NOSCROLL_ID.equals(scrollId)) { 416 // there is only one batch 417 return emptyResult(); 418 } 419 throw new NuxeoException("Unknown or timed out scrollId"); 420 } 421 422 @Override 423 public Lock getLock(String id) { 424 // TODO test performance : retrieve document with read or search document with extract 425 // TODO retrieve only some field 426 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 427 State state = readState(id); 428 if (state == null) { 429 throw new DocumentNotFoundException(id); 430 } 431 String owner = (String) state.get(KEY_LOCK_OWNER); 432 if (owner == null) { 433 // not locked 434 return null; 435 } 436 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 437 return new Lock(owner, created); 438 } 439 440 @Override 441 public Lock setLock(String id, Lock lock) { 442 State state = new State(); 443 state.put(KEY_LOCK_OWNER, lock.getOwner()); 444 state.put(KEY_LOCK_CREATED, lock.getCreated()); 445 String lockString = MarkLogicStateSerializer.serialize(state); 446 if (log.isTraceEnabled()) { 447 log.trace("MarkLogic: SETLOCK " + id + ": " + lockString); 448 } 449 try (Session session = xccContentSource.newSession()) { 450 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy"); 451 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 452 request.setNewStringVariable("lock-string", lockString); 453 // ResultSequence will be closed by Session close 454 ResultSequence result = session.submitRequest(request); 455 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 456 return extractLock(resultState); 457 } catch (RequestException e) { 458 if ("Document not found".equals(e.getMessage())) { 459 throw new DocumentNotFoundException(id, e); 460 } 461 throw new NuxeoException("An exception happened during xcc call", e); 462 } 463 // TODO check how the concurrent exception is raised 464 } 465 466 @Override 467 public Lock removeLock(String id, String owner) { 468 if (log.isTraceEnabled()) { 469 log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner); 470 } 471 try (Session session = xccContentSource.newSession()) { 472 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy"); 473 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 474 request.setNewStringVariable("owner", Strings.nullToEmpty(owner)); 475 // ResultSequence will be closed by Session close 476 ResultSequence result = session.submitRequest(request); 477 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 478 return extractLock(resultState); 479 } catch (RequestException e) { 480 if ("Document not found".equals(e.getMessage())) { 481 throw new DocumentNotFoundException(id, e); 482 } 483 throw new NuxeoException("An exception happened during xcc call", e); 484 } 485 } 486 487 private Lock extractLock(State state) { 488 if (state.isEmpty()) { 489 return null; 490 } 491 String owner = (String) state.get(KEY_LOCK_OWNER); 492 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 493 Boolean failed = (Boolean) state.get("failed"); 494 return new Lock(owner, created, Boolean.TRUE.equals(failed)); 495 } 496 497 @Override 498 public void closeLockManager() { 499 } 500 501 @Override 502 public void clearLockManagerCaches() { 503 } 504 505 protected List<String> binaryPaths; 506 507 @Override 508 protected void initBlobsPaths() { 509 MarkLogicBlobFinder finder = new MarkLogicBlobFinder(); 510 finder.visit(); 511 binaryPaths = finder.binaryPaths; 512 } 513 514 protected static class MarkLogicBlobFinder extends BlobFinder { 515 protected List<String> binaryPaths = new ArrayList<>(); 516 517 @Override 518 protected void recordBlobPath() { 519 path.addLast(KEY_BLOB_DATA); 520 StringBuilder binaryPath = new StringBuilder(); 521 MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager(); 522 Type previousType = null; 523 for (String element : path) { 524 if (binaryPath.length() > 0) { 525 binaryPath.append('/'); 526 } 527 // Append current element to path 528 binaryPath.append(element); 529 if (previousType == null) { 530 // No previous type - it's the first element 531 previousType = schemaManager.computeField(String.join(".", path), element).getType(); 532 } else if (previousType.isComplexType()) { 533 // Previous type is a complex type - retrieve the type of current element 534 if (TypeConstants.isContentType(previousType)) { 535 // The complex type hold the binary data, it's the last element, so break the loop 536 break; 537 } 538 previousType = ((ComplexType) previousType).getField(element).getType(); 539 } 540 // Add the item array element if type is a list (here previousType is the type of current element) 541 // Here we re allocate previousType to item array type as there's no element in this.path to select item 542 if (previousType.isListType()) { 543 binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX); 544 previousType = ((ListType) previousType).getFieldType(); 545 } 546 } 547 binaryPaths.add(binaryPath.toString()); 548 path.removeLast(); 549 } 550 } 551 552 @Override 553 public void markReferencedBinaries() { 554 BlobManager blobManager = Framework.getService(BlobManager.class); 555 // TODO add a query to not scan all documents 556 String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build(); 557 for (State state : findAll(query, binaryPaths.toArray(new String[0]))) { 558 markReferencedBinaries(state, blobManager); 559 } 560 } 561 562 protected void markReferencedBinaries(State state, BlobManager blobManager) { 563 for (Entry<String, Serializable> entry: state.entrySet()) { 564 Serializable value = entry.getValue(); 565 if (value instanceof List) { 566 List<?> list = (List<?>) value; 567 for (Object v : list) { 568 if (v instanceof State) { 569 markReferencedBinaries((State) v, blobManager); 570 } else { 571 markReferencedBinary(v, blobManager); 572 } 573 } 574 } else if (value instanceof Object[]) { 575 for (Object v : (Object[]) value) { 576 markReferencedBinary(v, blobManager); 577 } 578 } else if (value instanceof State) { 579 markReferencedBinaries((State) value, blobManager); 580 } else { 581 markReferencedBinary(value, blobManager); 582 } 583 } 584 } 585 586 protected void markReferencedBinary(Object value, BlobManager blobManager) { 587 if (!(value instanceof String)) { 588 return; 589 } 590 String key = (String) value; 591 blobManager.markReferencedBinary(key, repositoryName); 592 } 593 594 private void logQuery(String query) { 595 log.trace("MarkLogic: QUERY " + query); 596 } 597 598 private boolean exist(String ctsQuery) { 599 // first build exist query from cts query 600 String query = "xdmp:exists(" + ctsQuery + ")"; 601 if (log.isTraceEnabled()) { 602 logQuery(query); 603 } 604 // Run query 605 try (Session session = xccContentSource.newSession()) { 606 AdhocQuery request = session.newAdhocQuery(query); 607 // ResultSequence will be closed by Session close 608 ResultSequence rs = session.submitRequest(request); 609 return Boolean.parseBoolean(rs.asString()); 610 } catch (RequestException e) { 611 throw new NuxeoException("An exception happened during xcc call", e); 612 } 613 } 614 615 private State findOne(String ctsQuery) { 616 // first add limit to ctsQuery 617 String query = ctsQuery + "[1 to 1]"; 618 if (log.isTraceEnabled()) { 619 logQuery(query); 620 } 621 // Run query 622 try (Session session = xccContentSource.newSession()) { 623 AdhocQuery request = session.newAdhocQuery(query); 624 // ResultSequence will be closed by Session close 625 ResultSequence rs = session.submitRequest(request); 626 if (rs.hasNext()) { 627 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 628 } 629 return null; 630 } catch (RequestException e) { 631 throw new NuxeoException("An exception happened during xcc call", e); 632 } 633 } 634 635 private List<State> findAll(String ctsQuery, String... selects) { 636 String query = ctsQuery; 637 if (selects.length > 0) { 638 query = "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n" 639 + "let $paths := (" + Arrays.stream(selects) 640 .map(MarkLogicHelper::serializeKey) 641 .map(select -> "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/" + select 642 + "\"") 643 .collect(Collectors.joining(",\n")) 644 + ")let $namespaces := ()\n" + "for $i in " + query 645 + " return extract:extract-nodes($i, $paths, $namespaces)"; 646 } 647 if (log.isTraceEnabled()) { 648 logQuery(query); 649 } 650 // Run query 651 try (Session session = xccContentSource.newSession()) { 652 AdhocQuery request = session.newAdhocQuery(query); 653 // ResultSequence will be closed by Session close 654 ResultSequence rs = session.submitRequest(request); 655 return Arrays.stream(rs.asStrings()) 656 .map(MarkLogicStateDeserializer::deserialize) 657 .collect(Collectors.toList()); 658 } catch (RequestException e) { 659 throw new NuxeoException("An exception happened during xcc call", e); 660 } 661 } 662 663}