001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.storage.marklogic; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 031 032import java.io.Serializable; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Calendar; 036import java.util.Collections; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.UUID; 041import java.util.function.Function; 042import java.util.stream.Collectors; 043 044import javax.resource.spi.ConnectionManager; 045 046import org.apache.commons.lang.StringUtils; 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049import org.nuxeo.ecm.core.api.DocumentNotFoundException; 050import org.nuxeo.ecm.core.api.Lock; 051import org.nuxeo.ecm.core.api.NuxeoException; 052import org.nuxeo.ecm.core.api.PartialList; 053import org.nuxeo.ecm.core.api.ScrollResult; 054import org.nuxeo.ecm.core.api.ScrollResultImpl; 055import org.nuxeo.ecm.core.model.Repository; 056import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 057import org.nuxeo.ecm.core.storage.State; 058import org.nuxeo.ecm.core.storage.State.StateDiff; 059import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 060import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 061import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 062import org.nuxeo.ecm.core.storage.marklogic.MarkLogicQueryBuilder.MarkLogicQuery; 063 064import com.google.common.base.Strings; 065import com.marklogic.xcc.AdhocQuery; 066import com.marklogic.xcc.Content; 067import com.marklogic.xcc.ContentFactory; 068import com.marklogic.xcc.ContentSource; 069import com.marklogic.xcc.ContentSourceFactory; 070import com.marklogic.xcc.ModuleInvoke; 071import com.marklogic.xcc.ResultSequence; 072import com.marklogic.xcc.Session; 073import com.marklogic.xcc.exceptions.RequestException; 074 075/** 076 * MarkLogic implementation of a {@link Repository}. 077 * 078 * @since 8.3 079 */ 080public class MarkLogicRepository extends DBSRepositoryBase { 081 082 private static final Log log = LogFactory.getLog(MarkLogicRepository.class); 083 084 private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id); 085 086 public static final String DB_DEFAULT = "nuxeo"; 087 088 protected static final String NOSCROLL_ID = "noscroll"; 089 090 protected ContentSource xccContentSource; 091 092 /** Last value used from the in-memory sequence. Used by unit tests. */ 093 protected long sequenceLastValue; 094 095 protected List<MarkLogicRangeElementIndexDescriptor> rangeElementIndexes; 096 097 public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) { 098 super(cm, descriptor.name, descriptor); 099 xccContentSource = newMarkLogicContentSource(descriptor); 100 rangeElementIndexes = descriptor.rangeElementIndexes.stream() 101 .map(MarkLogicRangeElementIndexDescriptor::new) 102 .collect(Collectors.toList()); 103 initRepository(); 104 } 105 106 @Override 107 public List<IdType> getAllowedIdTypes() { 108 return Collections.singletonList(IdType.varchar); 109 } 110 111 // used also by unit tests 112 public static ContentSource newMarkLogicContentSource(MarkLogicRepositoryDescriptor descriptor) { 113 String host = descriptor.host; 114 Integer port = descriptor.port; 115 if (StringUtils.isBlank(host) || port == null) { 116 throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor"); 117 } 118 String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT); 119 String user = descriptor.user; 120 String password = descriptor.password; 121 return ContentSourceFactory.newContentSource(host, port, user, password, dbname); 122 } 123 124 protected void initRepository() { 125 if (readState(getRootId()) == null) { 126 initRoot(); 127 } 128 } 129 130 @Override 131 protected void initBlobsPaths() { 132 // throw new IllegalStateException("Not implemented yet"); 133 } 134 135 @Override 136 public String generateNewId() { 137 if (DEBUG_UUIDS) { 138 Long id = getNextSequenceId(); 139 return "UUID_" + id; 140 } 141 return UUID.randomUUID().toString(); 142 } 143 144 // Used by unit tests 145 protected synchronized Long getNextSequenceId() { 146 sequenceLastValue++; 147 return Long.valueOf(sequenceLastValue); 148 } 149 150 @Override 151 public State readState(String id) { 152 if (log.isTraceEnabled()) { 153 log.trace("MarkLogic: READ " + id); 154 } 155 try (Session session = xccContentSource.newSession()) { 156 String query = "fn:doc('" + ID_FORMATTER.apply(id) + "')"; 157 AdhocQuery request = session.newAdhocQuery(query); 158 // ResultSequence will be closed by Session close 159 ResultSequence rs = session.submitRequest(request); 160 if (rs.hasNext()) { 161 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 162 } 163 return null; 164 } catch (RequestException e) { 165 throw new NuxeoException("An exception happened during xcc call", e); 166 } 167 } 168 169 @Override 170 public List<State> readStates(List<String> ids) { 171 if (log.isTraceEnabled()) { 172 log.trace("MarkLogic: READ " + ids); 173 } 174 try (Session session = xccContentSource.newSession()) { 175 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 176 Collectors.joining(",", "fn:doc((", "))")); 177 AdhocQuery request = session.newAdhocQuery(query); 178 // ResultSequence will be closed by Session close 179 ResultSequence rs = session.submitRequest(request); 180 return Arrays.stream(rs.asStrings()) 181 .map(MarkLogicStateDeserializer::deserialize) 182 .collect(Collectors.toList()); 183 } catch (RequestException e) { 184 throw new NuxeoException("An exception happened during xcc call", e); 185 } 186 } 187 188 @Override 189 public void createState(State state) { 190 String id = state.get(KEY_ID).toString(); 191 if (log.isTraceEnabled()) { 192 log.trace("MarkLogic: CREATE " + id + ": " + state); 193 } 194 try (Session session = xccContentSource.newSession()) { 195 session.insertContent(convert(state)); 196 } catch (RequestException e) { 197 throw new NuxeoException("An exception happened during xcc call", e); 198 } 199 } 200 201 @Override 202 public void createStates(List<State> states) { 203 if (log.isTraceEnabled()) { 204 log.trace("MarkLogic: CREATE [" 205 + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", ")) 206 + "]: " + states); 207 } 208 try (Session session = xccContentSource.newSession()) { 209 Content[] contents = states.stream().map(this::convert).toArray(Content[]::new); 210 session.insertContent(contents); 211 } catch (RequestException e) { 212 throw new NuxeoException("An exception happened during xcc call", e); 213 } 214 } 215 216 private Content convert(State state) { 217 String id = state.get(KEY_ID).toString(); 218 return ContentFactory.newContent(ID_FORMATTER.apply(id), MarkLogicStateSerializer.serialize(state), null); 219 } 220 221 @Override 222 public void updateState(String id, StateDiff diff) { 223 String patch = MarkLogicStateSerializer.serialize(diff); 224 if (log.isTraceEnabled()) { 225 log.trace("MarkLogic: UPDATE " + id + ": " + patch); 226 } 227 try (Session session = xccContentSource.newSession()) { 228 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/patch.xqy"); 229 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 230 request.setNewStringVariable("patch-string", patch); 231 // ResultSequence will be closed by Session close 232 session.submitRequest(request); 233 } catch (RequestException e) { 234 throw new NuxeoException("An exception happened during xcc call", e); 235 } 236 } 237 238 @Override 239 public void deleteStates(Set<String> ids) { 240 if (log.isTraceEnabled()) { 241 log.trace("MarkLogic: DELETE " + ids); 242 } 243 try (Session session = xccContentSource.newSession()) { 244 String query = ids.stream().map(ID_FORMATTER).map(id -> "'" + id + "'").collect( 245 Collectors.joining(",", "xdmp:document-delete((", "))")); 246 AdhocQuery request = session.newAdhocQuery(query); 247 // ResultSequence will be closed by Session close 248 session.submitRequest(request); 249 } catch (RequestException e) { 250 throw new NuxeoException("An exception happened during xcc call", e); 251 } 252 } 253 254 @Override 255 public State readChildState(String parentId, String name, Set<String> ignored) { 256 String query = getChildQuery(parentId, name, ignored); 257 return findOne(query); 258 } 259 260 @Override 261 public boolean hasChild(String parentId, String name, Set<String> ignored) { 262 String query = getChildQuery(parentId, name, ignored); 263 return exist(query); 264 } 265 266 private String getChildQuery(String parentId, String name, Set<String> ignored) { 267 return new MarkLogicQuerySimpleBuilder(rangeElementIndexes).eq(KEY_PARENT_ID, parentId) 268 .eq(KEY_NAME, name) 269 .notIn(KEY_ID, ignored) 270 .build(); 271 } 272 273 @Override 274 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 275 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 276 builder.eq(key, value); 277 builder.notIn(KEY_ID, ignored); 278 return findAll(builder.build()); 279 } 280 281 @Override 282 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 283 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 284 builder.eq(key1, value1); 285 builder.eq(key2, value2); 286 builder.notIn(KEY_ID, ignored); 287 return findAll(builder.build()); 288 } 289 290 @Override 291 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 292 Map<String, Object[]> targetProxies) { 293 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 294 builder.eq(key, value); 295 for (State state : findAll(builder.build(), KEY_ID, KEY_IS_PROXY, KEY_PROXY_TARGET_ID, KEY_PROXY_IDS)) { 296 String id = (String) state.get(KEY_ID); 297 ids.add(id); 298 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 299 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 300 proxyTargets.put(id, targetId); 301 } 302 if (targetProxies != null) { 303 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 304 if (proxyIds != null) { 305 targetProxies.put(id, proxyIds); 306 } 307 } 308 } 309 } 310 311 @Override 312 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 313 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(rangeElementIndexes); 314 builder.eq(key, value); 315 builder.notIn(KEY_ID, ignored); 316 return exist(builder.build()); 317 } 318 319 @Override 320 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 321 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 322 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, orderByClause, distinctDocuments, 323 rangeElementIndexes); 324 MarkLogicQuery query = builder.buildQuery(); 325 // Don't do manual projection if there are no projection wildcards, as this brings no new 326 // information and is costly. The only difference is several identical rows instead of one. 327 boolean manualProjection = builder.doManualProjection(); 328 if (manualProjection) { 329 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 330 // so we need the full state from the database 331 evaluator.parse(); 332 } 333 String searchQuery = query.getSearchQuery(limit, offset); 334 if (log.isTraceEnabled()) { 335 logQuery(searchQuery); 336 } 337 // Run query 338 try (Session session = xccContentSource.newSession()) { 339 AdhocQuery request = session.newAdhocQuery(searchQuery); 340 // ResultSequence will be closed by Session close 341 ResultSequence rs = session.submitRequest(request); 342 343 List<Map<String, Serializable>> projections = new ArrayList<>(limit); 344 for (String rsItem : rs.asStrings()) { 345 State state = MarkLogicStateDeserializer.deserialize(rsItem); 346 if (manualProjection) { 347 projections.addAll(evaluator.matches(state)); 348 } else { 349 projections.add(DBSStateFlattener.flatten(state)); 350 } 351 } 352 long totalSize; 353 if (countUpTo == -1) { 354 // count full size 355 if (limit == 0) { 356 totalSize = projections.size(); 357 } else { 358 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery()); 359 // ResultSequence will be closed by Session close 360 ResultSequence countRs = session.submitRequest(countRequest); 361 totalSize = Long.parseLong(countRs.asStrings()[0]); 362 } 363 } else if (countUpTo == 0) { 364 // no count 365 totalSize = -1; // not counted 366 } else { 367 // count only if less than countUpTo 368 if (limit == 0) { 369 totalSize = projections.size(); 370 } else { 371 AdhocQuery countRequest = session.newAdhocQuery(query.getCountQuery(countUpTo + 1)); 372 // ResultSequence will be closed by Session close 373 ResultSequence countRs = session.submitRequest(countRequest); 374 totalSize = Long.parseLong(countRs.asStrings()[0]); 375 } 376 if (totalSize > countUpTo) { 377 totalSize = -2; // truncated 378 } 379 } 380 381 if (log.isTraceEnabled() && projections.size() != 0) { 382 log.trace("MarkLogic: -> " + projections.size()); 383 } 384 return new PartialList<>(projections, totalSize); 385 } catch (RequestException e) { 386 throw new NuxeoException("An exception happened during xcc call", e); 387 } 388 } 389 390 @Override 391 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveInSecond) { 392 // Not yet implemented, return all result in one shot for now 393 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(evaluator, null, false, rangeElementIndexes); 394 String query = builder.buildQuery().getSearchQuery(); 395 // Run query 396 try (Session session = xccContentSource.newSession()) { 397 AdhocQuery request = session.newAdhocQuery(query); 398 // ResultSequence will be closed by Session close 399 ResultSequence rs = session.submitRequest(request); 400 return Arrays.stream(rs.asStrings()) 401 .map(MarkLogicStateDeserializer::deserialize) 402 .map(state -> state.get(KEY_ID).toString()) 403 .collect(Collectors.collectingAndThen(Collectors.toList(), 404 ids -> new ScrollResultImpl(NOSCROLL_ID, ids))); 405 } catch (RequestException e) { 406 throw new NuxeoException("An exception happened during xcc call", e); 407 } 408 } 409 410 @Override 411 public ScrollResult scroll(String scrollId) { 412 if (NOSCROLL_ID.equals(scrollId)) { 413 // there is only one batch 414 return emptyResult(); 415 } 416 throw new NuxeoException("Unknown or timed out scrollId"); 417 } 418 419 @Override 420 public Lock getLock(String id) { 421 // TODO test performance : retrieve document with read or search document with extract 422 // TODO retrieve only some field 423 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 424 State state = readState(id); 425 if (state == null) { 426 throw new DocumentNotFoundException(id); 427 } 428 String owner = (String) state.get(KEY_LOCK_OWNER); 429 if (owner == null) { 430 // not locked 431 return null; 432 } 433 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 434 return new Lock(owner, created); 435 } 436 437 @Override 438 public Lock setLock(String id, Lock lock) { 439 State state = new State(); 440 state.put(KEY_LOCK_OWNER, lock.getOwner()); 441 state.put(KEY_LOCK_CREATED, lock.getCreated()); 442 String lockString = MarkLogicStateSerializer.serialize(state); 443 if (log.isTraceEnabled()) { 444 log.trace("MarkLogic: SETLOCK " + id + ": " + lockString); 445 } 446 try (Session session = xccContentSource.newSession()) { 447 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/set-lock.xqy"); 448 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 449 request.setNewStringVariable("lock-string", lockString); 450 // ResultSequence will be closed by Session close 451 ResultSequence result = session.submitRequest(request); 452 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 453 return extractLock(resultState); 454 } catch (RequestException e) { 455 if ("Document not found".equals(e.getMessage())) { 456 throw new DocumentNotFoundException(id, e); 457 } 458 throw new NuxeoException("An exception happened during xcc call", e); 459 } 460 // TODO check how the concurrent exception is raised 461 } 462 463 @Override 464 public Lock removeLock(String id, String owner) { 465 if (log.isTraceEnabled()) { 466 log.trace("MarkLogic: REMOVELOCK " + id + ": " + owner); 467 } 468 try (Session session = xccContentSource.newSession()) { 469 ModuleInvoke request = session.newModuleInvoke("/ext/nuxeo/remove-lock.xqy"); 470 request.setNewStringVariable("uri", ID_FORMATTER.apply(id)); 471 request.setNewStringVariable("owner", Strings.nullToEmpty(owner)); 472 // ResultSequence will be closed by Session close 473 ResultSequence result = session.submitRequest(request); 474 State resultState = MarkLogicStateDeserializer.deserialize(result.asString()); 475 return extractLock(resultState); 476 } catch (RequestException e) { 477 if ("Document not found".equals(e.getMessage())) { 478 throw new DocumentNotFoundException(id, e); 479 } 480 throw new NuxeoException("An exception happened during xcc call", e); 481 } 482 } 483 484 private Lock extractLock(State state) { 485 if (state.isEmpty()) { 486 return null; 487 } 488 String owner = (String) state.get(KEY_LOCK_OWNER); 489 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 490 Boolean failed = (Boolean) state.get("failed"); 491 return new Lock(owner, created, Boolean.TRUE.equals(failed)); 492 } 493 494 @Override 495 public void closeLockManager() { 496 } 497 498 @Override 499 public void clearLockManagerCaches() { 500 } 501 502 @Override 503 public void markReferencedBinaries() { 504 throw new IllegalStateException("Not implemented yet"); 505 } 506 507 private void logQuery(String query) { 508 log.trace("MarkLogic: QUERY " + query); 509 } 510 511 private boolean exist(String ctsQuery) { 512 // first build exist query from cts query 513 String query = "xdmp:exists(" + ctsQuery + ")"; 514 if (log.isTraceEnabled()) { 515 logQuery(query); 516 } 517 // Run query 518 try (Session session = xccContentSource.newSession()) { 519 AdhocQuery request = session.newAdhocQuery(query); 520 // ResultSequence will be closed by Session close 521 ResultSequence rs = session.submitRequest(request); 522 return Boolean.parseBoolean(rs.asString()); 523 } catch (RequestException e) { 524 throw new NuxeoException("An exception happened during xcc call", e); 525 } 526 } 527 528 private State findOne(String ctsQuery) { 529 // first add limit to ctsQuery 530 String query = ctsQuery + "[1 to 1]"; 531 if (log.isTraceEnabled()) { 532 logQuery(query); 533 } 534 // Run query 535 try (Session session = xccContentSource.newSession()) { 536 AdhocQuery request = session.newAdhocQuery(query); 537 // ResultSequence will be closed by Session close 538 ResultSequence rs = session.submitRequest(request); 539 if (rs.hasNext()) { 540 return MarkLogicStateDeserializer.deserialize(rs.asStrings()[0]); 541 } 542 return null; 543 } catch (RequestException e) { 544 throw new NuxeoException("An exception happened during xcc call", e); 545 } 546 } 547 548 private List<State> findAll(String ctsQuery, String... selects) { 549 String query = ctsQuery; 550 if (selects.length > 0) { 551 query = "for $i in " + query 552 + " return document {element document{$i/document/@*,$i/document/*[ fn:local-name(.) = (" 553 + Arrays.stream(selects) 554 .map(MarkLogicHelper::serializeKey) 555 .map(select -> "\"" + select + "\"") 556 .collect(Collectors.joining(",")) 557 + ")]}}"; 558 } 559 if (log.isTraceEnabled()) { 560 logQuery(query); 561 } 562 // Run query 563 try (Session session = xccContentSource.newSession()) { 564 AdhocQuery request = session.newAdhocQuery(query); 565 // ResultSequence will be closed by Session close 566 ResultSequence rs = session.submitRequest(request); 567 return Arrays.stream(rs.asStrings()) 568 .map(MarkLogicStateDeserializer::deserialize) 569 .collect(Collectors.toList()); 570 } catch (RequestException e) { 571 throw new NuxeoException("An exception happened during xcc call", e); 572 } 573 } 574 575}