001/* 002 * (C) Copyright 2016-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.marklogic; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 031 032import java.io.Serializable; 033import java.security.GeneralSecurityException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Calendar; 037import java.util.Collections; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.Set; 042import java.util.Spliterator; 043import java.util.UUID; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import java.util.stream.StreamSupport; 049 050import javax.net.ssl.SSLContext; 051import javax.resource.spi.ConnectionManager; 052 053import org.apache.commons.lang.StringUtils; 054import org.apache.commons.logging.Log; 055import org.apache.commons.logging.LogFactory; 056import org.nuxeo.ecm.core.api.CursorService; 057import org.nuxeo.ecm.core.api.DocumentNotFoundException; 058import org.nuxeo.ecm.core.api.Lock; 059import org.nuxeo.ecm.core.api.NuxeoException; 060import org.nuxeo.ecm.core.api.PartialList; 061import org.nuxeo.ecm.core.api.ScrollResult; 062import org.nuxeo.ecm.core.blob.DocumentBlobManager; 063import org.nuxeo.ecm.core.model.Repository; 064import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 065import org.nuxeo.ecm.core.schema.TypeConstants; 066import org.nuxeo.ecm.core.schema.types.ComplexType; 067import org.nuxeo.ecm.core.schema.types.ListType; 068import org.nuxeo.ecm.core.schema.types.Type; 069import org.nuxeo.ecm.core.storage.State; 070import org.nuxeo.ecm.core.storage.State.StateDiff; 071import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 072import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 073import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 074import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 075import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery; 076import org.nuxeo.runtime.api.Framework; 077 078import com.google.common.base.Strings; 079import com.marklogic.xcc.AdhocQuery; 080import com.marklogic.xcc.Content; 081import com.marklogic.xcc.ContentFactory; 082import com.marklogic.xcc.ContentSource; 083import com.marklogic.xcc.ContentSourceFactory; 084import com.marklogic.xcc.ModuleInvoke; 085import com.marklogic.xcc.RequestOptions; 086import com.marklogic.xcc.ResultItem; 087import com.marklogic.xcc.ResultSequence; 088import com.marklogic.xcc.SecurityOptions; 089import com.marklogic.xcc.Session; 090import com.marklogic.xcc.exceptions.RequestException; 091import com.marklogic.xcc.types.XdmItem; 092 093/** 094 * MarkLogic implementation of a {@link Repository}. 095 * 096 * @since 8.3 097 */ 098public class MarkLogicRepository extends DBSRepositoryBase { 099 100 private static final Log log = LogFactory.getLog(MarkLogicRepository.class); 101 102 private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id); 103 104 public static final String DB_DEFAULT = "nuxeo"; 105 106 protected ContentSource xccContentSource; 107 108 /** Last value used from the in-memory sequence. Used by unit tests. */ 109 protected long sequenceLastValue; 110 111 protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes; 112 113 protected final CursorService<ResultSequence, ResultItem> cursorService = new CursorService<>(); 114 115 public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) { 116 super(cm, descriptor.name, descriptor); 117 xccContentSource = newMarkLogicContentSource(descriptor); 118 rangeElementIndexes = descriptor.rangeElementIndexes.stream() 119 .map(MarkLogicRangeElementIndexDescriptor::new) 120 .collect(Collectors.toList()); 121 initRepository(); 122 } 123 124 @Override 125 public List<IdType> getAllowedIdTypes() { 126 return Collections.singletonList(IdType.varchar); 127 } 128 129 // used also by unit tests 130 public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) { 131 String host = descriptor.host; 132 Integer port = descriptor.port; 133 if (StringUtils.isBlank(host) || port == null) { 134 throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor"); 135 } 136 String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT); 137 String user = descriptor.user; 138 String password = descriptor.password; 139 // handle SSL 140 SecurityOptions securityOptions = descriptor.sslEnabled ? newTrustOptions() : null; 141 return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname, securityOptions); 142 } 143 144 protected static SecurityOptions newTrustOptions() { 145 try { 146 SSLContext sslContext = SSLContext.getDefault(); 147 return new SecurityOptions(sslContext); 148 } catch (GeneralSecurityException e) { 149 throw new NuxeoException("Unable to initialize Security options for MarkLogic connection.", e); 150 } 151 } 152 153 protected void initRepository() { 154 if (readState(getRootId()) == null) { 155 initRoot(); 156 } 157 } 158 159 @Override 160 public String generateNewId() { 161 if (DEBUG_UUIDS) { 162 Long id = getNextSequenceId(); 163 return "UUID_" + id; 164 } 165 return UUID.randomUUID().toString(); 166 } 167 168 // Used by unit tests 169 protected synchronized Long getNextSequenceId() { 170 sequenceLastValue++; 171 return Long.valueOf(sequenceLastValue); 172 } 173 174 @Override 175 public void shutdown() { 176 super.shutdown(); 177 cursorService.clear(); 178 } 179 180 @Override 181 public State readState(String id) { 182 if (log.isTraceEnabled()) { 183 log.trace("MarkLogic: READ " + id); 184 } 185 try (Session session = xccContentSource.newSession()) { 186 String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')"; 187 AdhocQuery request = session.newAdhocQuery(query); 188 // ResultSequence will be closed by Session close 189 ResultSequence rs = session.submitRequest(request); 190 if (rs.hasNext()) { 191 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 192 } 193 return null; 194 } catch (RequestException e) { 195 throw new NuxeoException("An exception happened during xcc call", e); 196 } 197 } 198 199 @Override 200 public List<State> readStates(List<String> ids) { 201 if (log.isTraceEnabled()) { 202 log.trace("MarkLogic: READ " + ids); 203 } 204 try (Session session = xccContentSource.newSession()) { 205 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 206 Collectors.joining(",", "fn:doc((", "))")); 207 AdhocQuery request = session.newAdhocQuery(query); 208 // ResultSequence will be closed by Session close 209 ResultSequence rs = session.submitRequest(request); 210 return Arrays.stream(rs.asStrings()) 211 .map(MarkLogicStateDeserializer::deserialize) 212 .collect(Collectors.toList()); 213 } catch (RequestException e) { 214 throw new NuxeoException("An exception happened during xcc call", e); 215 } 216 } 217 218 @Override 219 public void createState(State state) { 220 String id = state.get(KEY_ID).toString(); 221 if (log.isTraceEnabled()) { 222 log.trace("MarkLogic: CREATE " + id + ": " + state); 223 } 224 try (Session session = xccContentSource.newSession()) { 225 session.insertContent(convert(state)); 226 } catch (RequestException e) { 227 throw new NuxeoException("An exception happened during xcc call", e); 228 } 229 } 230 231 @Override 232 public void createStates(List<State> states) { 233 if (log.isTraceEnabled()) { 234 log.trace("MarkLogic: CREATE [" 235 + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", ")) 236 + "]: " + states); 237 } 238 try (Session session = xccContentSource.newSession()) { 239 Content[] contents = states.stream().map(this::convert).toArray(Content[]::new); 240 session.insertContent(contents); 241 } catch (RequestException e) { 242 throw new NuxeoException("An exception happened during xcc call", e); 243 } 244 } 245 246 private Content convert(State state) { 247 String id = state.get(KEY_ID).toString(); 248 return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null); 249 } 250 251 @Override 252 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 253 // TODO changeTokenUpdater 254 String patch = MarkLogicStateSerializer.serialize(diff); 255 if (log.isTraceEnabled()) { 256 log.trace("MarkLogic: UPDATE " + id + ": " + patch); 257 } 258 try (Session session = xccContentSource.newSession()) { 259 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy"); 260 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 261 request.setNewStringVariable("patch-string", patch); 262 // ResultSequence will be closed by Session close 263 session.submitRequest(request); 264 } catch (RequestException e) { 265 throw new NuxeoException("An exception happened during xcc call", e); 266 } 267 } 268 269 @Override 270 public void deleteStates(Set<String> ids) { 271 if (log.isTraceEnabled()) { 272 log.trace("MarkLogic: DELETE " + ids); 273 } 274 try (Session session = xccContentSource.newSession()) { 275 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 276 Collectors.joining(",", "xdmp:document-delete((", "))")); 277 AdhocQuery request = session.newAdhocQuery(query); 278 // ResultSequence will be closed by Session close 279 session.submitRequest(request); 280 } catch (RequestException e) { 281 throw new NuxeoException("An exception happened during xcc call", e); 282 } 283 } 284 285 @Override 286 public State readChildState(String parentId, String name, Set<String> ignored) { 287 String query = getChildQuery(parentId, name, ignored); 288 return findOne(query); 289 } 290 291 @Override 292 public boolean hasChild(String parentId, String name, Set<String> ignored) { 293 String query = getChildQuery(parentId, name, ignored); 294 return exist(query); 295 } 296 297 private String getChildQuery(String parentId, String name, Set<String> ignored) { 298 return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId) 299 .eq(KEY_NAME, name) 300 .notIn(KEY_ID, ignored) 301 .build(); 302 } 303 304 @Override 305 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 306 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 307 builder.eq(key, value); 308 builder.notIn(KEY_ID, ignored); 309 try (Stream<State> states = findAll(builder.build())) { 310 return states.collect(Collectors.toList()); 311 } 312 } 313 314 @Override 315 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 316 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 317 builder.eq(key1, value1); 318 builder.eq(key2, value2); 319 builder.notIn(KEY_ID, ignored); 320 try (Stream<State> states = findAll(builder.build())) { 321 return states.collect(Collectors.toList()); 322 } 323 } 324 325 @Override 326 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 327 Map<String, Object[]> targetProxies) { 328 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 329 builder.eq(key, value); 330 try (Stream<State> states = findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, 331 KEY_PROXY_IDS)) { 332 states.forEach(state -> { 333 String id = (String) state.get(KEY_ID); 334 ids.add(id); 335 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 336 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 337 proxyTargets.put(id, targetId); 338 } 339 if (targetProxies != null) { 340 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 341 if (proxyIds != null) { 342 targetProxies.put(id, proxyIds); 343 } 344 } 345 }); 346 } 347 } 348 349 @Override 350 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 351 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 352 builder.eq(key, value); 353 builder.notIn(KEY_ID, ignored); 354 return exist(builder.build()); 355 } 356 357 @Override 358 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 359 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 360 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments, 361 rangeElementIndexes); 362 MarkLogicQuery query = builder.buildQuery(); 363 // Don't do manual projection if there are no projection wildcards, as this brings no new 364 // information and is costly. The only difference is several identical rows instead of one. 365 boolean manualProjection = builder.doManualProjection(); 366 if (manualProjection) { 367 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 368 // so we need the full state from the database 369 evaluator.parse(); 370 } 371 String searchQuery = query.getSearchQuery(limit, offset); 372 if (log.isTraceEnabled()) { 373 logQuery(searchQuery); 374 } 375 // Run query 376 try (Session session = xccContentSource.newSession()) { 377 AdhocQuery request = session.newAdhocQuery(searchQuery); 378 // ResultSequence will be closed by Session close 379 ResultSequence rs = session.submitRequest(request); 380 381 List<Map<String, Serializable>> projections = new ArrayList<>(limit); 382 for (String rsItem : rs.asStrings()) { 383 State state = MarkLogicStateDeserializer.deserialize(rsItem); 384 if (manualProjection) { 385 projections.addAll(evaluator.matches(state)); 386 } else { 387 projections.add(DBSStateFlattener.flatten(state)); 388 } 389 } 390 long totalSize; 391 if (countUpTo == -1) { 392 // count full size 393 if (limit == 0) { 394 totalSize = projections.size(); 395 } else { 396 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery()); 397 // ResultSequence will be closed by Session close 398 ResultSequence countRs = session.submitRequest(countRequest); 399 totalSize = Long.parseLong(countRs.asStrings()[0]); 400 } 401 } else if (countUpTo == 0) { 402 // no count 403 totalSize = -1; // not counted 404 } else { 405 // count only if less than countUpTo 406 if (limit == 0) { 407 totalSize = projections.size(); 408 } else { 409 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1)); 410 // ResultSequence will be closed by Session close 411 ResultSequence countRs = session.submitRequest(countRequest); 412 totalSize = Long.parseLong(countRs.asStrings()[0]); 413 } 414 if (totalSize > countUpTo) { 415 totalSize = -2; // truncated 416 } 417 } 418 419 if (log.isTraceEnabled() && projections.size() != 0) { 420 log.trace("MarkLogic: -> " + projections.size()); 421 } 422 return new PartialList<>(projections, totalSize); 423 } catch (RequestException e) { 424 throw new NuxeoException("An exception happened during xcc call when executing '" + evaluator + "'", e); 425 } 426 } 427 428 @Override 429 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) { 430 cursorService.checkForTimedOutScroll(); 431 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes); 432 String query = builder.buildQuery().getSearchQuery(); 433 // Don't auto-close the session as we need to keep it open for next scroll 434 try { 435 Session session = xccContentSource.newSession(); 436 // Build option to scroll results and not buffer them 437 RequestOptions options = new RequestOptions(); 438 options.setCacheResult(false); 439 AdhocQuery request = session.newAdhocQuery(query, options); 440 // ResultSequence will be closed by Session close 441 ResultSequence rs = session.submitRequest(request); 442 String scrollId = cursorService.registerCursorResult( 443 new MarkLogicCursorResult(session, rs, batchSize, keepAliveInSecond)); 444 return scroll(scrollId); 445 } catch (RequestException e) { 446 throw new NuxeoException("An exception happened during xcc call", e); 447 } 448 } 449 450 @Override 451 public ScrollResult scroll(String scrollId) { 452 return cursorService.scroll(scrollId, item -> { 453 State state = MarkLogicStateDeserializer.deserialize(item.asInputStream()); 454 return state.get(KEY_ID).toString(); 455 }); 456 } 457 458 @Override 459 public Lock getLock(String id) { 460 // TODO test performance : retrieve document with read or search document with extract 461 // TODO retrieve only some field 462 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 463 State state = readState(id); 464 if (state == null) { 465 throw new DocumentNotFoundException(id); 466 } 467 String owner = (String) state.get(KEY_LOCK_OWNER); 468 if (owner == null) { 469 // not locked 470 return null; 471 } 472 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 473 return new Lock(owner, created); 474 } 475 476 @Override 477 public Lock setLock(String id, Lock lock) { 478 State state = new State(); 479 state.put(KEY_LOCK_OWNER, lock.getOwner()); 480 state.put(KEY_LOCK_CREATED, lock.getCreated()); 481 String lockString = MarkLogicStateSerializer.serialize(state); 482 if (log.isTraceEnabled()) { 483 log.trace("MarkLogic: SETLOCK " + id + ": " + lockString); 484 } 485 try (Session session = xccContentSource.newSession()) { 486 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy"); 487 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 488 request.setNewStringVariable("lock-string", lockString); 489 // ResultSequence will be closed by Session close 490 ResultSequence result = session.submitRequest(request); 491 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 492 return extractLock(resultState); 493 } catch (RequestException e) { 494 if ("Document not found".equals(e.getMessage())) { 495 throw new DocumentNotFoundException(id, e); 496 } 497 throw new NuxeoException("An exception happened during xcc call", e); 498 } 499 // TODO check how the concurrent exception is raised 500 } 501 502 @Override 503 public Lock removeLock(String id, String owner) { 504 if (log.isTraceEnabled()) { 505 log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner); 506 } 507 try (Session session = xccContentSource.newSession()) { 508 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy"); 509 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 510 request.setNewStringVariable("owner", Strings.nullToEmpty(owner)); 511 // ResultSequence will be closed by Session close 512 ResultSequence result = session.submitRequest(request); 513 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 514 return extractLock(resultState); 515 } catch (RequestException e) { 516 if ("Document not found".equals(e.getMessage())) { 517 throw new DocumentNotFoundException(id, e); 518 } 519 throw new NuxeoException("An exception happened during xcc call", e); 520 } 521 } 522 523 private Lock extractLock(State state) { 524 if (state.isEmpty()) { 525 return null; 526 } 527 String owner = (String) state.get(KEY_LOCK_OWNER); 528 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 529 Boolean failed = (Boolean) state.get("failed"); 530 return new Lock(owner, created, Boolean.TRUE.equals(failed)); 531 } 532 533 @Override 534 public void closeLockManager() { 535 } 536 537 @Override 538 public void clearLockManagerCaches() { 539 } 540 541 protected List<String> binaryPaths; 542 543 @Override 544 protected void initBlobsPaths() { 545 MarkLogicBlobFinder finder = new MarkLogicBlobFinder(); 546 finder.visit(); 547 binaryPaths = finder.binaryPaths; 548 } 549 550 protected static class MarkLogicBlobFinder extends BlobFinder { 551 protected List<String> binaryPaths = new ArrayList<>(); 552 553 @Override 554 protected void recordBlobPath() { 555 path.addLast(KEY_BLOB_DATA); 556 StringBuilder binaryPath = new StringBuilder(); 557 MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager(); 558 Type previousType = null; 559 for (String element : path) { 560 if (binaryPath.length() > 0) { 561 binaryPath.append('/'); 562 } 563 // Append current element to path 564 binaryPath.append(element); 565 if (previousType == null) { 566 // No previous type - it's the first element 567 previousType = schemaManager.computeField(String.join(".", path), element).getType(); 568 } else if (previousType.isComplexType()) { 569 // Previous type is a complex type - retrieve the type of current element 570 if (TypeConstants.isContentType(previousType)) { 571 // The complex type hold the binary data, it's the last element, so break the loop 572 break; 573 } 574 previousType = ((ComplexType) previousType).getField(element).getType(); 575 } 576 // Add the item array element if type is a list (here previousType is the type of current element) 577 // Here we re allocate previousType to item array type as there's no element in this.path to select item 578 if (previousType.isListType()) { 579 binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX); 580 previousType = ((ListType) previousType).getFieldType(); 581 } 582 } 583 binaryPaths.add(binaryPath.toString()); 584 path.removeLast(); 585 } 586 } 587 588 @Override 589 public void markReferencedBinaries() { 590 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 591 // TODO add a query to not scan all documents 592 String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build(); 593 try (Stream<State> states = findAll(query, binaryPaths.toArray(new String[0]))) { 594 states.forEach(state -> markReferencedBinaries(state, blobManager)); 595 } 596 } 597 598 protected void markReferencedBinaries(State state, DocumentBlobManager blobManager) { 599 for (Entry<String, Serializable> entry : state.entrySet()) { 600 Serializable value = entry.getValue(); 601 if (value instanceof List) { 602 List<?> list = (List<?>) value; 603 for (Object v : list) { 604 if (v instanceof State) { 605 markReferencedBinaries((State) v, blobManager); 606 } else { 607 markReferencedBinary(v, blobManager); 608 } 609 } 610 } else if (value instanceof Object[]) { 611 for (Object v : (Object[]) value) { 612 markReferencedBinary(v, blobManager); 613 } 614 } else if (value instanceof State) { 615 markReferencedBinaries((State) value, blobManager); 616 } else { 617 markReferencedBinary(value, blobManager); 618 } 619 } 620 } 621 622 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 623 if (!(value instanceof String)) { 624 return; 625 } 626 String key = (String) value; 627 blobManager.markReferencedBinary(key, repositoryName); 628 } 629 630 private void logQuery(String query) { 631 log.trace("MarkLogic: QUERY " + query); 632 } 633 634 private boolean exist(String ctsQuery) { 635 // first build exist query from cts query 636 String query = "xdmp:exists(" + ctsQuery + ")"; 637 if (log.isTraceEnabled()) { 638 logQuery(query); 639 } 640 // Run query 641 try (Session session = xccContentSource.newSession()) { 642 AdhocQuery request = session.newAdhocQuery(query); 643 // ResultSequence will be closed by Session close 644 ResultSequence rs = session.submitRequest(request); 645 return Boolean.parseBoolean(rs.asString()); 646 } catch (RequestException e) { 647 throw new NuxeoException("An exception happened during xcc call", e); 648 } 649 } 650 651 private State findOne(String ctsQuery) { 652 // first add limit to ctsQuery 653 String query = ctsQuery + "[1 to 1]"; 654 if (log.isTraceEnabled()) { 655 logQuery(query); 656 } 657 // Run query 658 try (Session session = xccContentSource.newSession()) { 659 AdhocQuery request = session.newAdhocQuery(query); 660 // ResultSequence will be closed by Session close 661 ResultSequence rs = session.submitRequest(request); 662 if (rs.hasNext()) { 663 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 664 } 665 return null; 666 } catch (RequestException e) { 667 throw new NuxeoException("An exception happened during xcc call", e); 668 } 669 } 670 671 protected Stream<State> findAll(String ctsQuery, String... selects) { 672 String query = ctsQuery; 673 if (selects.length > 0) { 674 query = "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n" 675 + "let $paths := (" + Arrays.stream(selects) 676 .map(MarkLogicHelper::serializeKey) 677 .map(select -> "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/" + select 678 + "\"") 679 .collect(Collectors.joining(",\n")) 680 + ")let $namespaces := ()\n" + "for $i in " + query 681 + " return extract:extract-nodes($i, $paths, $namespaces)"; 682 } 683 if (log.isTraceEnabled()) { 684 logQuery(query); 685 } 686 // Run query 687 boolean completedAbruptly = true; 688 Session session = xccContentSource.newSession(); 689 try { 690 // As we can get a lot of results don't buffer the result 691 RequestOptions options = new RequestOptions(); 692 options.setCacheResult(false); 693 AdhocQuery request = session.newAdhocQuery(query, options); 694 695 // We give 0 as characteristics because it's the value used by Iterable#spliterator() and according to 696 // StreamSupport.stream(Supplier, int, boolean) documentation, characteristics must be equal to 697 // supplier.get().characteristics() 698 Supplier<Spliterator<ResultItem>> spliteratorSupplier = () -> { 699 try { 700 // ResultSequence will be closed by Session close (closed by stream closed) 701 ResultSequence rs = session.submitRequest(request); 702 Iterable<ResultItem> items = rs::iterator; 703 return items.spliterator(); 704 } catch (RequestException e) { 705 throw new NuxeoException("An exception happened during xcc call", e); 706 } 707 }; 708 Stream<State> stream = StreamSupport.stream(spliteratorSupplier, 0, false) 709 .onClose(session::close) 710 .map(ResultItem::getItem) 711 .map(XdmItem::asInputStream) 712 .map(MarkLogicStateDeserializer::deserialize); 713 // the stream takes responsibility for closing the session 714 completedAbruptly = false; 715 return stream; 716 } finally { 717 if (completedAbruptly) { 718 session.close(); 719 } 720 } 721 } 722 723}