001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.storage.marklogic; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 030 031import java.io.Serializable; 032import java.util.ArrayList; 033import java.util.Calendar; 034import java.util.Collections; 035import java.util.List; 036import java.util.Map; 037import java.util.Optional; 038import java.util.Set; 039import java.util.UUID; 040import java.util.function.Function; 041import java.util.stream.Collectors; 042import java.util.stream.StreamSupport; 043 044import javax.resource.spi.ConnectionManager; 045 046import org.apache.commons.lang.StringUtils; 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 050import org.nuxeo.ecm.core.api.DocumentNotFoundException; 051import org.nuxeo.ecm.core.api.Lock; 052import org.nuxeo.ecm.core.api.NuxeoException; 053import org.nuxeo.ecm.core.api.PartialList; 054import org.nuxeo.ecm.core.model.LockManager; 055import org.nuxeo.ecm.core.model.Repository; 056import org.nuxeo.ecm.core.query.QueryParseException; 057import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 058import org.nuxeo.ecm.core.storage.State; 059import org.nuxeo.ecm.core.storage.State.StateDiff; 060import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 061import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 062import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 063 064import com.marklogic.client.DatabaseClient; 065import com.marklogic.client.DatabaseClientFactory; 066import com.marklogic.client.DatabaseClientFactory.Authentication; 067import com.marklogic.client.FailedRequestException; 068import com.marklogic.client.ResourceNotFoundException; 069import com.marklogic.client.admin.ServerConfigurationManager; 070import com.marklogic.client.admin.ServerConfigurationManager.UpdatePolicy; 071import com.marklogic.client.document.DocumentDescriptor; 072import com.marklogic.client.document.DocumentMetadataPatchBuilder.PatchHandle; 073import com.marklogic.client.document.DocumentPage; 074import com.marklogic.client.document.DocumentRecord; 075import com.marklogic.client.document.DocumentWriteSet; 076import com.marklogic.client.document.XMLDocumentManager; 077import com.marklogic.client.query.RawQueryDefinition; 078 079/** 080 * MarkLogic implementation of a {@link Repository}. 081 * 082 * @since 8.3 083 */ 084public class MarkLogicRepository extends DBSRepositoryBase { 085 086 private static final Log log = LogFactory.getLog(MarkLogicRepository.class); 087 088 private static final Function<String, String> ID_FORMATTER = id -> String.format("/%s.xml", id); 089 090 public static final String DB_DEFAULT = "nuxeo"; 091 092 protected DatabaseClient markLogicClient; 093 094 public MarkLogicRepository(ConnectionManager cm, MarkLogicRepositoryDescriptor descriptor) { 095 super(cm, descriptor.name, descriptor); 096 markLogicClient = newMarkLogicClient(descriptor); 097 initRepository(); 098 } 099 100 @Override 101 public List<IdType> getAllowedIdTypes() { 102 return Collections.singletonList(IdType.varchar); 103 } 104 105 @Override 106 public void shutdown() { 107 super.shutdown(); 108 markLogicClient.release(); 109 } 110 111 // used also by unit tests 112 public static DatabaseClient newMarkLogicClient(MarkLogicRepositoryDescriptor descriptor) { 113 String host = descriptor.host; 114 Integer port = descriptor.port; 115 if (StringUtils.isBlank(host) || port == null) { 116 throw new NuxeoException("Missing <host> or <port> in MarkLogic repository descriptor"); 117 } 118 String dbname = StringUtils.defaultIfBlank(descriptor.dbname, DB_DEFAULT); 119 String user = descriptor.user; 120 String password = descriptor.password; 121 if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) { 122 return DatabaseClientFactory.newClient(host, port, dbname, user, password, Authentication.DIGEST); 123 } 124 return DatabaseClientFactory.newClient(host, port, dbname); 125 } 126 127 protected void initRepository() { 128 // Activate Optimistic Locking 129 // https://docs.marklogic.com/guide/java/transactions#id_81051 130 ServerConfigurationManager configMgr = markLogicClient.newServerConfigManager(); 131 configMgr.readConfiguration(); 132 configMgr.setUpdatePolicy(UpdatePolicy.VERSION_OPTIONAL); 133 // write the server configuration to the database 134 configMgr.writeConfiguration(); 135 if (readState(getRootId()) == null) { 136 initRoot(); 137 } 138 } 139 140 @Override 141 protected void initBlobsPaths() { 142 // throw new IllegalStateException("Not implemented yet"); 143 } 144 145 @Override 146 public String generateNewId() { 147 return UUID.randomUUID().toString(); 148 } 149 150 @Override 151 public State readState(String id) { 152 if (log.isTraceEnabled()) { 153 log.trace("MarkLogic: READ " + id); 154 } 155 try { 156 return markLogicClient.newXMLDocumentManager().read(ID_FORMATTER.apply(id), new StateHandle()).get(); 157 } catch (ResourceNotFoundException e) { 158 return null; 159 } 160 } 161 162 @Override 163 public List<State> readStates(List<String> ids) { 164 if (log.isTraceEnabled()) { 165 log.trace("MarkLogic: READ " + ids); 166 } 167 String[] markLogicIds = ids.stream().map(ID_FORMATTER).toArray(String[]::new); 168 DocumentPage page = markLogicClient.newXMLDocumentManager().read(markLogicIds); 169 return StreamSupport.stream(page.spliterator(), false) 170 .map(document -> document.getContent(new StateHandle()).get()) 171 .collect(Collectors.toList()); 172 } 173 174 @Override 175 public void createState(State state) { 176 String id = state.get(KEY_ID).toString(); 177 if (log.isTraceEnabled()) { 178 log.trace("MarkLogic: CREATE " + id + ": " + state); 179 } 180 markLogicClient.newXMLDocumentManager().write(ID_FORMATTER.apply(id), new StateHandle(state)); 181 } 182 183 @Override 184 public void createStates(List<State> states) { 185 XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager(); 186 DocumentWriteSet writeSet = docManager.newWriteSet(); 187 for (State state : states) { 188 String id = state.get(KEY_ID).toString(); 189 writeSet.add(ID_FORMATTER.apply(id), new StateHandle(state)); 190 } 191 if (log.isTraceEnabled()) { 192 log.trace("MarkLogic: CREATE [" 193 + states.stream().map(state -> state.get(KEY_ID).toString()).collect(Collectors.joining(", ")) 194 + "]: " + states); 195 } 196 docManager.write(writeSet); 197 } 198 199 @Override 200 public void updateState(String id, StateDiff diff) { 201 XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager(); 202 PatchHandle patch = new MarkLogicStateUpdateBuilder(docManager::newPatchBuilder).apply(diff); 203 if (log.isTraceEnabled()) { 204 log.trace("MarkLogic: UPDATE " + id + ": " + patch.toString()); 205 } 206 docManager.patch(ID_FORMATTER.apply(id), patch); 207 } 208 209 @Override 210 public void deleteStates(Set<String> ids) { 211 if (log.isTraceEnabled()) { 212 log.trace("MarkLogic: DELETE " + ids); 213 } 214 String[] markLogicIds = ids.stream().map(ID_FORMATTER).toArray(String[]::new); 215 markLogicClient.newXMLDocumentManager().delete(markLogicIds); 216 } 217 218 @Override 219 public State readChildState(String parentId, String name, Set<String> ignored) { 220 RawQueryDefinition query = getChildQuery(parentId, name, ignored); 221 return findOne(query); 222 } 223 224 @Override 225 public boolean hasChild(String parentId, String name, Set<String> ignored) { 226 RawQueryDefinition query = getChildQuery(parentId, name, ignored); 227 return exist(query); 228 } 229 230 private RawQueryDefinition getChildQuery(String parentId, String name, Set<String> ignored) { 231 return new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager()).eq(KEY_PARENT_ID, parentId) 232 .eq(KEY_NAME, name) 233 .notIn(KEY_ID, ignored) 234 .build(); 235 } 236 237 @Override 238 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 239 return queryKeyValue(key, value, ignored, this::findAll); 240 } 241 242 @Override 243 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 244 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager()); 245 builder.eq(key1, value1); 246 builder.eq(key2, value2); 247 builder.notIn(KEY_ID, ignored); 248 return findAll(builder.build()); 249 } 250 251 @Override 252 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 253 Map<String, Object[]> targetProxies) { 254 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager()); 255 builder.eq(key, value); 256 builder.select(KEY_ID); 257 builder.select(KEY_IS_PROXY); 258 builder.select(KEY_PROXY_TARGET_ID); 259 builder.select(KEY_PROXY_IDS); 260 RawQueryDefinition query = builder.build(); 261 if (log.isTraceEnabled()) { 262 logQuery(query); 263 } 264 265 try (DocumentPage page = markLogicClient.newXMLDocumentManager().search(query, 0)) { 266 for (DocumentRecord record : page) { 267 State state = record.getContent(new StateHandle()).get(); 268 String id = (String) state.get(KEY_ID); 269 ids.add(id); 270 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 271 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 272 proxyTargets.put(id, targetId); 273 } 274 if (targetProxies != null) { 275 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 276 if (proxyIds != null) { 277 targetProxies.put(id, proxyIds); 278 } 279 } 280 } 281 } 282 } 283 284 @Override 285 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 286 return queryKeyValue(key, value, ignored, this::exist); 287 } 288 289 @Override 290 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 291 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 292 MarkLogicQueryBuilder builder = new MarkLogicQueryBuilder(markLogicClient.newQueryManager(), evaluator, 293 orderByClause, distinctDocuments); 294 RawQueryDefinition query = builder.buildQuery(); 295 // Don't do manual projection if there are no projection wildcards, as this brings no new 296 // information and is costly. The only difference is several identical rows instead of one. 297 boolean manualProjection = builder.doManualProjection(); 298 if (manualProjection) { 299 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 300 // so we need the full state from the database 301 evaluator.parse(); 302 } 303 if (log.isTraceEnabled()) { 304 logQuery(query, limit, offset); 305 } 306 XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager(); 307 docManager.setPageLength(limit == 0 ? 50 : limit); 308 try (DocumentPage page = docManager.search(query, offset)) { 309 List<Map<String, Serializable>> projections = new ArrayList<>((int) page.size()); 310 for (DocumentRecord record : page) { 311 State state = record.getContent(new StateHandle()).get(); 312 if (manualProjection) { 313 projections.addAll(evaluator.matches(state)); 314 } else { 315 projections.add(DBSStateFlattener.flatten(state)); 316 } 317 } 318 long totalSize; 319 if (countUpTo == -1) { 320 // count full size 321 if (limit == 0) { 322 totalSize = projections.size(); 323 } else { 324 totalSize = page.getTotalSize(); 325 } 326 } else if (countUpTo == 0) { 327 // no count 328 totalSize = -1; // not counted 329 } else { 330 // count only if less than countUpTo 331 if (limit == 0) { 332 totalSize = projections.size(); 333 } else { 334 totalSize = page.getTotalSize(); 335 } 336 if (totalSize > countUpTo) { 337 totalSize = -2; // truncated 338 } 339 } 340 341 if (log.isTraceEnabled() && projections.size() != 0) { 342 log.trace("MarkLogic: -> " + projections.size()); 343 } 344 return new PartialList<>(projections, totalSize); 345 } catch (FailedRequestException fre) { 346 throw new QueryParseException("Request was rejected by server", fre); 347 } 348 } 349 350 @Override 351 public Lock getLock(String id) { 352 // TODO test performance : retrieve document with read or search document with extract 353 // TODO retrieve only some field 354 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 355 State state = readState(id); 356 if (state == null) { 357 throw new DocumentNotFoundException(id); 358 } 359 String owner = (String) state.get(KEY_LOCK_OWNER); 360 if (owner == null) { 361 // not locked 362 return null; 363 } 364 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 365 return new Lock(owner, created); 366 } 367 368 @Override 369 public Lock setLock(String id, Lock lock) { 370 // Here we use Optimistic Locking to set the lock 371 // https://docs.marklogic.com/guide/java/transactions#id_81051 372 XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager(); 373 DocumentDescriptor descriptor = docManager.newDescriptor(ID_FORMATTER.apply(id)); 374 // TODO test performance : retrieve document with read or search document with extract 375 // TODO retrieve only some field 376 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 377 try { 378 if (log.isTraceEnabled()) { 379 log.trace("MarkLogic: READ " + id); 380 } 381 State state = docManager.read(descriptor, new StateHandle()).get(); 382 Optional<Lock> oldLock = extractLock(state); 383 if (oldLock.isPresent()) { 384 // Lock owner already set 385 return oldLock.get(); 386 } 387 // Set the lock 388 PatchHandle patch = new MarkLogicLockUpdateBuilder(docManager::newPatchBuilder).set(lock); 389 if (log.isTraceEnabled()) { 390 log.trace("MarkLogic: UPDATE " + id + ": " + patch.toString()); 391 } 392 docManager.patch(descriptor, patch); 393 // doc is now locked 394 return null; 395 } catch (ResourceNotFoundException e) { 396 // Document not found 397 throw new DocumentNotFoundException(id, e); 398 } catch (FailedRequestException e) { 399 // There was a race condition - another lock was set 400 return extractLock(readState(id)).orElseThrow(() -> new ConcurrentUpdateException("Lock " + id)); 401 } 402 } 403 404 @Override 405 public Lock removeLock(String id, String owner) { 406 // Here we use Optimistic Locking to set the lock 407 // https://docs.marklogic.com/guide/java/transactions#id_81051 408 XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager(); 409 DocumentDescriptor descriptor = docManager.newDescriptor(ID_FORMATTER.apply(id)); 410 // TODO test performance : retrieve document with read or search document with extract 411 // TODO retrieve only some field 412 // https://docs.marklogic.com/guide/search-dev/qbe#id_54044 413 try { 414 if (log.isTraceEnabled()) { 415 log.trace("MarkLogic: READ " + id); 416 } 417 // Retrieve state of document 418 State state = docManager.read(descriptor, new StateHandle()).get(); 419 Optional<Lock> oldLockOpt = extractLock(state); 420 if (oldLockOpt.isPresent()) { 421 // A Lock exist on document 422 Lock oldLock = oldLockOpt.get(); 423 if (LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 424 // Delete the lock 425 PatchHandle patch = new MarkLogicLockUpdateBuilder(docManager::newPatchBuilder).delete(); 426 if (log.isTraceEnabled()) { 427 log.trace("MarkLogic: UPDATE " + id + ": " + patch.toString()); 428 } 429 docManager.patch(descriptor, patch); 430 // Return previous lock 431 return oldLock; 432 } else { 433 // existing mismatched lock, flag failure 434 return new Lock(oldLock.getOwner(), oldLock.getCreated(), true); 435 } 436 } else { 437 // document was not locked 438 return null; 439 } 440 } catch (ResourceNotFoundException e) { 441 // Document not found 442 throw new DocumentNotFoundException(id, e); 443 } 444 } 445 446 private Optional<Lock> extractLock(State state) { 447 String owner = (String) state.get(KEY_LOCK_OWNER); 448 if (owner == null) { 449 return Optional.empty(); 450 } 451 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 452 return Optional.of(new Lock(owner, oldCreated)); 453 } 454 455 @Override 456 public void closeLockManager() { 457 } 458 459 @Override 460 public void clearLockManagerCaches() { 461 } 462 463 @Override 464 public void markReferencedBinaries() { 465 throw new IllegalStateException("Not implemented yet"); 466 } 467 468 private <T> T queryKeyValue(String key, Object value, Set<String> ignored, Function<RawQueryDefinition, T> executor) { 469 MarkLogicQuerySimpleBuilder builder = new MarkLogicQuerySimpleBuilder(markLogicClient.newQueryManager()); 470 builder.eq(key, value); 471 builder.notIn(KEY_ID, ignored); 472 return executor.apply(builder.build()); 473 } 474 475 private boolean exist(RawQueryDefinition query) { 476 if (log.isTraceEnabled()) { 477 logQuery(query); 478 } 479 return markLogicClient.newQueryManager().findOne(query) != null; 480 } 481 482 private State findOne(RawQueryDefinition query) { 483 if (log.isTraceEnabled()) { 484 logQuery(query); 485 } 486 XMLDocumentManager docManager = markLogicClient.newXMLDocumentManager(); 487 docManager.setPageLength(1); 488 try (DocumentPage page = docManager.search(query, 0)) { 489 if (page.hasNext()) { 490 return page.nextContent(new StateHandle()).get(); 491 } 492 return null; 493 } 494 } 495 496 private List<State> findAll(RawQueryDefinition query) { 497 if (log.isTraceEnabled()) { 498 logQuery(query); 499 } 500 return findAll(query, 1); 501 } 502 503 private List<State> findAll(RawQueryDefinition query, long start) { 504 try (DocumentPage page = markLogicClient.newXMLDocumentManager().search(query, start)) { 505 List<State> states = new ArrayList<>((int) (page.getTotalSize() - start + 1)); 506 for (DocumentRecord record : page) { 507 states.add(record.getContent(new StateHandle()).get()); 508 } 509 if (page.hasNextPage()) { 510 states.addAll(findAll(query, start + page.getPageSize())); 511 } 512 return states; 513 } 514 } 515 516 private void logQuery(RawQueryDefinition query) { 517 log.trace("MarkLogic: QUERY " + query.getHandle()); 518 } 519 520 private void logQuery(RawQueryDefinition query, int limit, int offset) { 521 log.trace("MarkLogic: QUERY " + query.getHandle() + " OFFSET " + offset + " LIMIT " + limit); 522 } 523 524}