001/* 002 * (C) Copyright 2016-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.marklogic; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 028 029import java.io.Serializable; 030import java.security.GeneralSecurityException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Calendar; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Set; 040import java.util.Spliterator; 041import java.util.UUID; 042import java.util.function.Function; 043import java.util.function.Supplier; 044import java.util.stream.Collector; 045import java.util.stream.Collectors; 046import java.util.stream.Stream; 047import java.util.stream.StreamSupport; 048 049import javax.net.ssl.SSLContext; 050import javax.resource.spi.ConnectionManager; 051 052import org.apache.commons.lang3.StringUtils; 053import org.apache.commons.logging.Log; 054import org.apache.commons.logging.LogFactory; 055import org.nuxeo.ecm.core.api.CursorService; 056import org.nuxeo.ecm.core.api.DocumentNotFoundException; 057import org.nuxeo.ecm.core.api.Lock; 058import org.nuxeo.ecm.core.api.NuxeoException; 059import org.nuxeo.ecm.core.api.PartialList; 060import org.nuxeo.ecm.core.api.ScrollResult; 061import org.nuxeo.ecm.core.blob.DocumentBlobManager; 062import org.nuxeo.ecm.core.model.Repository; 063import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 064import org.nuxeo.ecm.core.schema.TypeConstants; 065import org.nuxeo.ecm.core.schema.types.ComplexType; 066import org.nuxeo.ecm.core.schema.types.ListType; 067import org.nuxeo.ecm.core.schema.types.Type; 068import org.nuxeo.ecm.core.storage.State; 069import org.nuxeo.ecm.core.storage.State.StateDiff; 070import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 071import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 072import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 073import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 074import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery; 075import org.nuxeo.runtime.api.Framework; 076 077import com.google.common.base.Strings; 078import com.marklogic.xcc.AdhocQuery; 079import com.marklogic.xcc.Content; 080import com.marklogic.xcc.ContentFactory; 081import com.marklogic.xcc.ContentSource; 082import com.marklogic.xcc.ContentSourceFactory; 083import com.marklogic.xcc.ModuleInvoke; 084import com.marklogic.xcc.RequestOptions; 085import com.marklogic.xcc.ResultItem; 086import com.marklogic.xcc.ResultSequence; 087import com.marklogic.xcc.SecurityOptions; 088import com.marklogic.xcc.Session; 089import com.marklogic.xcc.exceptions.RequestException; 090import com.marklogic.xcc.types.XdmItem; 091 092/** 093 * MarkLogic implementation of a {@link Repository}. 094 * 095 * @since 8.3 096 */ 097public class MarkLogicRepository extends DBSRepositoryBase { 098 099 private static final Log log = LogFactory.getLog(MarkLogicRepository.class); 100 101 private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id); 102 103 public static final String DB_DEFAULT = "nuxeo"; 104 105 protected ContentSource xccContentSource; 106 107 /** Last value used from the in-memory sequence. Used by unit tests. */ 108 protected long sequenceLastValue; 109 110 protected final List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes; 111 112 protected final CursorService<ResultSequence, ResultItem, String> cursorService; 113 114 public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) { 115 super(cm, descriptor.name, descriptor); 116 xccContentSource = newMarkLogicContentSource(descriptor); 117 rangeElementIndexes = descriptor.rangeElementIndexes.stream() 118 .map(MarkLogicRangeElementIndexDescriptor::new) 119 .collect(Collectors.toList()); 120 cursorService = new CursorService<>(item -> { 121 State state = MarkLogicStateDeserializer.deserialize(item.asInputStream()); 122 return state.get(KEY_ID).toString(); 123 }); 124 initRepository(); 125 } 126 127 @Override 128 public List<IdType> getAllowedIdTypes() { 129 return Collections.singletonList(IdType.varchar); 130 } 131 132 // used also by unit tests 133 public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) { 134 String host = descriptor.host; 135 Integer port = descriptor.port; 136 if (StringUtils.isBlank(host) || port == null) { 137 throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor"); 138 } 139 String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT); 140 String user = descriptor.user; 141 String password = descriptor.password; 142 // handle SSL 143 SecurityOptions securityOptions = descriptor.sslEnabled ? newTrustOptions() : null; 144 return ContentSourceFactory.newContentSource(host, port.intValue(), user, password, dbname, securityOptions); 145 } 146 147 protected static SecurityOptions newTrustOptions() { 148 try { 149 SSLContext sslContext = SSLContext.getDefault(); 150 return new SecurityOptions(sslContext); 151 } catch (GeneralSecurityException e) { 152 throw new NuxeoException("Unable to initialize Security options for MarkLogic connection.", e); 153 } 154 } 155 156 protected void initRepository() { 157 if (readState(getRootId()) == null) { 158 initRoot(); 159 } 160 } 161 162 @Override 163 public String generateNewId() { 164 if (DEBUG_UUIDS) { 165 Long id = getNextSequenceId(); 166 return "UUID_" + id; 167 } 168 return UUID.randomUUID().toString(); 169 } 170 171 // Used by unit tests 172 protected synchronized Long getNextSequenceId() { 173 sequenceLastValue++; 174 return Long.valueOf(sequenceLastValue); 175 } 176 177 @Override 178 public void shutdown() { 179 super.shutdown(); 180 cursorService.clear(); 181 } 182 183 @Override 184 public State readState(String id) { 185 return readPartialState(id, null); 186 } 187 188 @Override 189 public State readPartialState(String id, Collection<String> keys) { 190 if (log.isTraceEnabled()) { 191 log.trace("MarkLogic: READ " + id); 192 } 193 try (Session session = xccContentSource.newSession()) { 194 String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')"; 195 if (keys != null && !keys.isEmpty()) { 196 query = getProjectedQuery(query, keys); 197 } 198 AdhocQuery request = session.newAdhocQuery(query); 199 // ResultSequence will be closed by Session close 200 ResultSequence rs = session.submitRequest(request); 201 if (rs.hasNext()) { 202 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 203 } 204 return null; 205 } catch (RequestException e) { 206 throw new NuxeoException("An exception happened during xcc call", e); 207 } 208 } 209 210 @Override 211 public List<State> readStates(List<String> ids) { 212 if (log.isTraceEnabled()) { 213 log.trace("MarkLogic: READ " + ids); 214 } 215 try (Session session = xccContentSource.newSession()) { 216 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 217 Collectors.joining(",", "fn:doc((", "))")); 218 AdhocQuery request = session.newAdhocQuery(query); 219 // ResultSequence will be closed by Session close 220 ResultSequence rs = session.submitRequest(request); 221 return Arrays.stream(rs.asStrings()) 222 .map(MarkLogicStateDeserializer::deserialize) 223 .collect(Collectors.toList()); 224 } catch (RequestException e) { 225 throw new NuxeoException("An exception happened during xcc call", e); 226 } 227 } 228 229 @Override 230 public void createState(State state) { 231 String id = state.get(KEY_ID).toString(); 232 if (log.isTraceEnabled()) { 233 log.trace("MarkLogic: CREATE " + id + ": " + state); 234 } 235 try (Session session = xccContentSource.newSession()) { 236 session.insertContent(convert(state)); 237 } catch (RequestException e) { 238 throw new NuxeoException("An exception happened during xcc call", e); 239 } 240 } 241 242 @Override 243 public void createStates(List<State> states) { 244 if (log.isTraceEnabled()) { 245 log.trace("MarkLogic: CREATE [" 246 + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", ")) 247 + "]: " + states); 248 } 249 try (Session session = xccContentSource.newSession()) { 250 Content[] contents = states.stream().map(this::convert).toArray(Content[]::new); 251 session.insertContent(contents); 252 } catch (RequestException e) { 253 throw new NuxeoException("An exception happened during xcc call", e); 254 } 255 } 256 257 private Content convert(State state) { 258 String id = state.get(KEY_ID).toString(); 259 return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null); 260 } 261 262 @Override 263 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 264 // TODO changeTokenUpdater 265 String patch = MarkLogicStateSerializer.serialize(diff); 266 if (log.isTraceEnabled()) { 267 log.trace("MarkLogic: UPDATE " + id + ": " + patch); 268 } 269 try (Session session = xccContentSource.newSession()) { 270 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy"); 271 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 272 request.setNewStringVariable("patch-string", patch); 273 // ResultSequence will be closed by Session close 274 session.submitRequest(request); 275 } catch (RequestException e) { 276 throw new NuxeoException("An exception happened during xcc call", e); 277 } 278 } 279 280 @Override 281 public void deleteStates(Set<String> ids) { 282 if (log.isTraceEnabled()) { 283 log.trace("MarkLogic: DELETE " + ids); 284 } 285 try (Session session = xccContentSource.newSession()) { 286 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 287 Collectors.joining(",", "xdmp:document-delete((", "))")); 288 AdhocQuery request = session.newAdhocQuery(query); 289 // ResultSequence will be closed by Session close 290 session.submitRequest(request); 291 } catch (RequestException e) { 292 throw new NuxeoException("An exception happened during xcc call", e); 293 } 294 } 295 296 @Override 297 public State readChildState(String parentId, String name, Set<String> ignored) { 298 String query = getChildQuery(parentId, name, ignored); 299 return findOne(query); 300 } 301 302 @Override 303 public boolean hasChild(String parentId, String name, Set<String> ignored) { 304 String query = getChildQuery(parentId, name, ignored); 305 return exist(query); 306 } 307 308 private String getChildQuery(String parentId, String name, Set<String> ignored) { 309 return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId) 310 .eq(KEY_NAME, name) 311 .notIn(KEY_ID, ignored) 312 .build(); 313 } 314 315 @Override 316 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 317 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 318 builder.eq(key, value); 319 builder.notIn(KEY_ID, ignored); 320 try (Stream<State> states = findAll(builder.build())) { 321 return states.collect(Collectors.toList()); 322 } 323 } 324 325 @Override 326 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 327 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 328 builder.eq(key1, value1); 329 builder.eq(key2, value2); 330 builder.notIn(KEY_ID, ignored); 331 try (Stream<State> states = findAll(builder.build())) { 332 return states.collect(Collectors.toList()); 333 } 334 } 335 336 @Override 337 public Stream<State> getDescendants(String rootId, Set<String> keys) { 338 return getDescendants(rootId, keys, 0); 339 } 340 341 @Override 342 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 343 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 344 builder.eq(KEY_ANCESTOR_IDS, rootId).limit(limit); 345 List<String> selects = new ArrayList<>(); 346 selects.add(KEY_ID); 347 selects.addAll(keys); 348 return findAll(builder.build(), selects.toArray(new String[0])); 349 } 350 351 @Override 352 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 353 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 354 builder.eq(key, value); 355 builder.notIn(KEY_ID, ignored); 356 return exist(builder.build()); 357 } 358 359 @Override 360 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 361 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 362 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments, 363 rangeElementIndexes); 364 MarkLogicQuery query = builder.buildQuery(); 365 // Don't do manual projection if there are no projection wildcards, as this brings no new 366 // information and is costly. The only difference is several identical rows instead of one. 367 boolean manualProjection = builder.doManualProjection(); 368 if (manualProjection) { 369 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 370 // so we need the full state from the database 371 evaluator.parse(); 372 } 373 String searchQuery = query.getSearchQuery(limit, offset); 374 if (log.isTraceEnabled()) { 375 logQuery(searchQuery); 376 } 377 // Run query 378 try (Session session = xccContentSource.newSession()) { 379 AdhocQuery request = session.newAdhocQuery(searchQuery); 380 // ResultSequence will be closed by Session close 381 ResultSequence rs = session.submitRequest(request); 382 383 List<Map<String, Serializable>> projections = new ArrayList<>(limit); 384 DBSStateFlattener flattener = new DBSStateFlattener(); 385 for (String rsItem : rs.asStrings()) { 386 State state = MarkLogicStateDeserializer.deserialize(rsItem); 387 if (manualProjection) { 388 projections.addAll(evaluator.matches(state)); 389 } else { 390 projections.add(flattener.flatten(state)); 391 } 392 } 393 long totalSize; 394 if (countUpTo == -1) { 395 // count full size 396 if (limit == 0) { 397 totalSize = projections.size(); 398 } else if (manualProjection) { 399 totalSize = -1; // unknown due to manual projection 400 } else { 401 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery()); 402 // ResultSequence will be closed by Session close 403 ResultSequence countRs = session.submitRequest(countRequest); 404 totalSize = Long.parseLong(countRs.asStrings()[0]); 405 } 406 } else if (countUpTo == 0) { 407 // no count 408 totalSize = -1; // not counted 409 } else { 410 // count only if less than countUpTo 411 if (limit == 0) { 412 totalSize = projections.size(); 413 } else if (manualProjection) { 414 totalSize = -1; // unknown due to manual projection 415 } else { 416 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1)); 417 // ResultSequence will be closed by Session close 418 ResultSequence countRs = session.submitRequest(countRequest); 419 totalSize = Long.parseLong(countRs.asStrings()[0]); 420 } 421 if (totalSize > countUpTo) { 422 totalSize = -2; // truncated 423 } 424 } 425 426 if (log.isTraceEnabled() && projections.size() != 0) { 427 log.trace("MarkLogic: -> " + projections.size()); 428 } 429 return new PartialList<>(projections, totalSize); 430 } catch (RequestException e) { 431 throw new NuxeoException("An exception happened during xcc call when executing '" + evaluator + "'", e); 432 } 433 } 434 435 @Override 436 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) { 437 cursorService.checkForTimedOutScroll(); 438 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes); 439 String query = builder.buildQuery().getSearchQuery(); 440 // Don't auto-close the session as we need to keep it open for next scroll 441 try { 442 Session session = xccContentSource.newSession(); // kept open by cursor, do not close 443 // Build option to scroll results and not buffer them 444 RequestOptions options = new RequestOptions(); 445 options.setCacheResult(false); 446 AdhocQuery request = session.newAdhocQuery(query, options); 447 // ResultSequence will be closed by Session close 448 ResultSequence rs = session.submitRequest(request); 449 String scrollId = cursorService.registerCursorResult( 450 new MarkLogicCursorResult(session, rs, batchSize, keepAliveInSecond)); 451 return scroll(scrollId); 452 } catch (RequestException e) { 453 throw new NuxeoException("An exception happened during xcc call", e); 454 } 455 } 456 457 @Override 458 public ScrollResult<String> scroll(String scrollId) { 459 return cursorService.scroll(scrollId); 460 } 461 462 @Override 463 public Lock getLock(String id) { 464 // TODO test performance : retrieve document with read or search document with extract 465 // TODO retrieve only some field 466 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 467 State state = readState(id); 468 if (state == null) { 469 throw new DocumentNotFoundException(id); 470 } 471 String owner = (String) state.get(KEY_LOCK_OWNER); 472 if (owner == null) { 473 // not locked 474 return null; 475 } 476 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 477 return new Lock(owner, created); 478 } 479 480 @Override 481 public Lock setLock(String id, Lock lock) { 482 State state = new State(); 483 state.put(KEY_LOCK_OWNER, lock.getOwner()); 484 state.put(KEY_LOCK_CREATED, lock.getCreated()); 485 String lockString = MarkLogicStateSerializer.serialize(state); 486 if (log.isTraceEnabled()) { 487 log.trace("MarkLogic: SETLOCK " + id + ": " + lockString); 488 } 489 try (Session session = xccContentSource.newSession()) { 490 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy"); 491 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 492 request.setNewStringVariable("lock-string", lockString); 493 // ResultSequence will be closed by Session close 494 ResultSequence result = session.submitRequest(request); 495 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 496 return extractLock(resultState); 497 } catch (RequestException e) { 498 if ("Document not found".equals(e.getMessage())) { 499 throw new DocumentNotFoundException(id, e); 500 } 501 throw new NuxeoException("An exception happened during xcc call", e); 502 } 503 // TODO check how the concurrent exception is raised 504 } 505 506 @Override 507 public Lock removeLock(String id, String owner) { 508 if (log.isTraceEnabled()) { 509 log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner); 510 } 511 try (Session session = xccContentSource.newSession()) { 512 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy"); 513 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 514 request.setNewStringVariable("owner", Strings.nullToEmpty(owner)); 515 // ResultSequence will be closed by Session close 516 ResultSequence result = session.submitRequest(request); 517 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 518 return extractLock(resultState); 519 } catch (RequestException e) { 520 if ("Document not found".equals(e.getMessage())) { 521 throw new DocumentNotFoundException(id, e); 522 } 523 throw new NuxeoException("An exception happened during xcc call", e); 524 } 525 } 526 527 private Lock extractLock(State state) { 528 if (state.isEmpty()) { 529 return null; 530 } 531 String owner = (String) state.get(KEY_LOCK_OWNER); 532 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 533 Boolean failed = (Boolean) state.get("failed"); 534 return new Lock(owner, created, Boolean.TRUE.equals(failed)); 535 } 536 537 @Override 538 public void closeLockManager() { 539 } 540 541 @Override 542 public void clearLockManagerCaches() { 543 } 544 545 protected List<String> binaryPaths; 546 547 @Override 548 protected void initBlobsPaths() { 549 MarkLogicBlobFinder finder = new MarkLogicBlobFinder(); 550 finder.visit(); 551 binaryPaths = finder.binaryPaths; 552 } 553 554 protected static class MarkLogicBlobFinder extends BlobFinder { 555 protected List<String> binaryPaths = new ArrayList<>(); 556 557 @Override 558 protected void recordBlobPath() { 559 path.addLast(KEY_BLOB_DATA); 560 StringBuilder binaryPath = new StringBuilder(); 561 MarkLogicSchemaManager schemaManager = new MarkLogicSchemaManager(); 562 Type previousType = null; 563 for (String element : path) { 564 if (binaryPath.length() > 0) { 565 binaryPath.append('/'); 566 } 567 // Append current element to path 568 binaryPath.append(element); 569 if (previousType == null) { 570 // No previous type - it's the first element 571 previousType = schemaManager.computeField(String.join(".", path), element).getType(); 572 } else if (previousType.isComplexType()) { 573 // Previous type is a complex type - retrieve the type of current element 574 if (TypeConstants.isContentType(previousType)) { 575 // The complex type hold the binary data, it's the last element, so break the loop 576 break; 577 } 578 previousType = ((ComplexType) previousType).getField(element).getType(); 579 } 580 // Add the item array element if type is a list (here previousType is the type of current element) 581 // Here we re allocate previousType to item array type as there's no element in this.path to select item 582 if (previousType.isListType()) { 583 binaryPath.append('/').append(element).append(MarkLogicHelper.ARRAY_ITEM_KEY_SUFFIX); 584 previousType = ((ListType) previousType).getFieldType(); 585 } 586 } 587 binaryPaths.add(binaryPath.toString()); 588 path.removeLast(); 589 } 590 } 591 592 @Override 593 public void markReferencedBinaries() { 594 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 595 // TODO add a query to not scan all documents 596 String query = new MarkLogicQuerySimpleBuilder(rangeElementIndexes).build(); 597 try (Stream<State> states = findAll(query, binaryPaths.toArray(new String[0]))) { 598 states.forEach(state -> markReferencedBinaries(state, blobManager)); 599 } 600 } 601 602 protected void markReferencedBinaries(State state, DocumentBlobManager blobManager) { 603 for (Entry<String, Serializable> entry : state.entrySet()) { 604 Serializable value = entry.getValue(); 605 if (value instanceof List) { 606 List<?> list = (List<?>) value; 607 for (Object v : list) { 608 if (v instanceof State) { 609 markReferencedBinaries((State) v, blobManager); 610 } else { 611 markReferencedBinary(v, blobManager); 612 } 613 } 614 } else if (value instanceof Object[]) { 615 for (Object v : (Object[]) value) { 616 markReferencedBinary(v, blobManager); 617 } 618 } else if (value instanceof State) { 619 markReferencedBinaries((State) value, blobManager); 620 } else { 621 markReferencedBinary(value, blobManager); 622 } 623 } 624 } 625 626 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 627 if (!(value instanceof String)) { 628 return; 629 } 630 String key = (String) value; 631 blobManager.markReferencedBinary(key, repositoryName); 632 } 633 634 private void logQuery(String query) { 635 log.trace("MarkLogic: QUERY " + query); 636 } 637 638 private boolean exist(String ctsQuery) { 639 // first build exist query from cts query 640 String query = "xdmp:exists(" + ctsQuery + ")"; 641 if (log.isTraceEnabled()) { 642 logQuery(query); 643 } 644 // Run query 645 try (Session session = xccContentSource.newSession()) { 646 AdhocQuery request = session.newAdhocQuery(query); 647 // ResultSequence will be closed by Session close 648 ResultSequence rs = session.submitRequest(request); 649 return Boolean.parseBoolean(rs.asString()); 650 } catch (RequestException e) { 651 throw new NuxeoException("An exception happened during xcc call", e); 652 } 653 } 654 655 private State findOne(String ctsQuery) { 656 // first add limit to ctsQuery 657 String query = ctsQuery + "[1 to 1]"; 658 if (log.isTraceEnabled()) { 659 logQuery(query); 660 } 661 // Run query 662 try (Session session = xccContentSource.newSession()) { 663 AdhocQuery request = session.newAdhocQuery(query); 664 // ResultSequence will be closed by Session close 665 ResultSequence rs = session.submitRequest(request); 666 if (rs.hasNext()) { 667 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 668 } 669 return null; 670 } catch (RequestException e) { 671 throw new NuxeoException("An exception happened during xcc call", e); 672 } 673 } 674 675 protected Stream<State> findAll(String ctsQuery, String... selects) { 676 String query = ctsQuery; 677 if (selects.length > 0) { 678 query = getProjectedQuery(query, Arrays.asList(selects)); 679 } 680 if (log.isTraceEnabled()) { 681 logQuery(query); 682 } 683 // Run query 684 boolean completedAbruptly = true; 685 Session session = xccContentSource.newSession(); // kept open by stream or closed on exception 686 try { 687 // As we can get a lot of results don't buffer the result 688 RequestOptions options = new RequestOptions(); 689 options.setCacheResult(false); 690 AdhocQuery request = session.newAdhocQuery(query, options); 691 692 // We give 0 as characteristics because it's the value used by Iterable#spliterator() and according to 693 // StreamSupport.stream(Supplier, int, boolean) documentation, characteristics must be equal to 694 // supplier.get().characteristics() 695 Supplier<Spliterator<ResultItem>> spliteratorSupplier = () -> { 696 try { 697 // ResultSequence will be closed by Session close (closed by stream closed) 698 ResultSequence rs = session.submitRequest(request); 699 Iterable<ResultItem> items = rs::iterator; 700 return items.spliterator(); 701 } catch (RequestException e) { 702 throw new NuxeoException("An exception happened during xcc call", e); 703 } 704 }; 705 Stream<State> stream = StreamSupport.stream(spliteratorSupplier, 0, false) 706 .onClose(session::close) 707 .map(ResultItem::getItem) 708 .map(XdmItem::asInputStream) 709 .map(MarkLogicStateDeserializer::deserialize); 710 // the stream takes responsibility for closing the session 711 completedAbruptly = false; 712 return stream; 713 } finally { 714 if (completedAbruptly) { 715 session.close(); 716 } 717 } 718 } 719 720 protected static final String PATH_COLLECTOR_BEGIN = "\"" + MarkLogicHelper.DOCUMENT_ROOT_PATH + "/"; 721 722 protected static final String PATH_COLLECTOR_END = "\""; 723 724 protected static final String PATH_COLLECTOR_SEP = ",\n"; 725 726 protected static final Collector<CharSequence, ?, String> PATH_COLLECTOR = Collectors.joining( 727 PATH_COLLECTOR_END + PATH_COLLECTOR_SEP + PATH_COLLECTOR_BEGIN, PATH_COLLECTOR_BEGIN, PATH_COLLECTOR_END); 728 729 protected String getProjectedQuery(String query, Collection<String> keys) { 730 String paths = keys.stream().map(MarkLogicHelper::serializeKey).collect(PATH_COLLECTOR); 731 return "import module namespace extract = 'http://nuxeo.com/extract' at '/ext/nuxeo/extract.xqy';\n" 732 + "let $paths := (" + paths + ")\n" // 733 + "let $namespaces := ()\n" // 734 + "for $i in " + query + " return extract:extract-nodes($i, $paths, $namespaces)"; 735 } 736 737}