001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Florent Guillaume 019 * Kevin Leturc <kleturc@nuxeo.com> 020 */ 021package org.nuxeo.ecm.core.test; 022 023import java.io.IOException; 024import java.io.Serializable; 025import java.net.URL; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.CloseableCoreSession; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.CoreSessionService; 038import org.nuxeo.ecm.core.api.CoreSessionService.CoreSessionRegistrationInfo; 039import org.nuxeo.ecm.core.api.DocumentModel; 040import org.nuxeo.ecm.core.api.DocumentRef; 041import org.nuxeo.ecm.core.api.IdRef; 042import org.nuxeo.ecm.core.api.IterableQueryResult; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.api.NuxeoPrincipal; 045import org.nuxeo.ecm.core.api.PathRef; 046import org.nuxeo.ecm.core.api.impl.UserPrincipal; 047import org.nuxeo.ecm.core.api.security.ACP; 048import org.nuxeo.ecm.core.query.QueryParseException; 049import org.nuxeo.ecm.core.query.sql.NXQL; 050import org.nuxeo.ecm.core.repository.RepositoryService; 051import org.nuxeo.ecm.core.test.TransactionalFeature.Waiter; 052import org.nuxeo.ecm.core.test.annotations.Granularity; 053import org.nuxeo.ecm.core.test.annotations.RepositoryConfig; 054import org.nuxeo.ecm.core.test.annotations.RepositoryInit; 055import org.nuxeo.ecm.core.work.api.WorkManager; 056import org.nuxeo.runtime.api.Framework; 057import org.nuxeo.runtime.jtajca.NuxeoContainer; 058import org.nuxeo.runtime.model.URLStreamRef; 059import org.nuxeo.runtime.test.runner.Defaults; 060import org.nuxeo.runtime.test.runner.Deploy; 061import org.nuxeo.runtime.test.runner.Features; 062import org.nuxeo.runtime.test.runner.FeaturesRunner; 063import org.nuxeo.runtime.test.runner.HotDeployer; 064import org.nuxeo.runtime.test.runner.RuntimeFeature; 065import org.nuxeo.runtime.test.runner.RuntimeHarness; 066import org.nuxeo.runtime.test.runner.SimpleFeature; 067import org.nuxeo.runtime.transaction.TransactionHelper; 068 069import com.google.inject.Binder; 070 071/** 072 * The core feature provides a default {@link CoreSession} that can be injected. 073 * <p> 074 * In addition, by injecting the feature itself, some helper methods are available to open new sessions. 075 */ 076@Deploy("org.nuxeo.runtime.management") 077@Deploy("org.nuxeo.runtime.metrics") 078@Deploy("org.nuxeo.runtime.reload") 079@Deploy("org.nuxeo.runtime.kv") 080@Deploy("org.nuxeo.runtime.pubsub") 081@Deploy("org.nuxeo.runtime.mongodb") 082@Deploy("org.nuxeo.runtime.migration") 083@Deploy("org.nuxeo.ecm.core.schema") 084@Deploy("org.nuxeo.ecm.core.query") 085@Deploy("org.nuxeo.ecm.core.api") 086@Deploy("org.nuxeo.ecm.core.event") 087@Deploy("org.nuxeo.ecm.core") 088@Deploy("org.nuxeo.ecm.core.cache") 089@Deploy("org.nuxeo.ecm.core.test") 090@Deploy("org.nuxeo.ecm.core.mimetype") 091@Deploy("org.nuxeo.ecm.core.convert") 092@Deploy("org.nuxeo.ecm.core.convert.plugins") 093@Deploy("org.nuxeo.ecm.core.storage") 094@Deploy("org.nuxeo.ecm.core.storage.sql") 095@Deploy("org.nuxeo.ecm.core.storage.sql.test") 096@Deploy("org.nuxeo.ecm.core.storage.dbs") 097@Deploy("org.nuxeo.ecm.core.storage.mem") 098@Deploy("org.nuxeo.ecm.core.storage.mongodb") 099@Deploy("org.nuxeo.ecm.platform.commandline.executor") 100@Deploy("org.nuxeo.ecm.platform.el") 101@Deploy("org.nuxeo.ecm.core.io") 102@Features({ RuntimeFeature.class, TransactionalFeature.class }) 103@Deploy("org.nuxeo.ecm.core.event:test-queuing.xml") 104public class CoreFeature extends SimpleFeature { 105 106 protected ACP rootAcp; 107 108 public class WorksWaiter implements Waiter { 109 @Override 110 public boolean await(long deadline) throws InterruptedException { 111 WorkManager workManager = Framework.getService(WorkManager.class); 112 if (workManager.awaitCompletion(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { 113 return true; 114 } 115 logInfos(workManager); 116 return false; 117 } 118 119 protected void logInfos(WorkManager workManager) { 120 StringBuilder sb = new StringBuilder().append("Timed out while waiting for works").append(" "); 121 Iterator<String> queueids = workManager.getWorkQueueIds().iterator(); 122 while (queueids.hasNext()) { 123 sb.append(System.lineSeparator()); 124 String queueid = queueids.next(); 125 sb.append(workManager.getMetrics(queueid)); 126 sb.append(",works="); 127 Iterator<String> works = workManager.listWorkIds(queueid, null).iterator(); 128 while (works.hasNext()) { 129 sb.append(works.next()); 130 if (works.hasNext()) { 131 sb.append(","); 132 } 133 } 134 } 135 log.error(sb.toString(), new Throwable("stack trace")); 136 } 137 138 } 139 140 private static final Log log = LogFactory.getLog(CoreFeature.class); 141 142 protected StorageConfiguration storageConfiguration; 143 144 protected RepositoryInit repositoryInit; 145 146 protected Granularity granularity; 147 148 // this value gets injected 149 protected CoreSession session; 150 151 protected boolean cleaned; 152 153 protected TransactionalFeature txFeature; 154 155 public StorageConfiguration getStorageConfiguration() { 156 return storageConfiguration; 157 } 158 159 @Override 160 public void initialize(FeaturesRunner runner) { 161 runner.getFeature(RuntimeFeature.class).registerHandler(new CoreDeployer()); 162 163 storageConfiguration = new StorageConfiguration(this); 164 txFeature = runner.getFeature(TransactionalFeature.class); 165 txFeature.addWaiter(new WorksWaiter()); 166 // init from RepositoryConfig annotations 167 RepositoryConfig repositoryConfig = runner.getConfig(RepositoryConfig.class); 168 if (repositoryConfig == null) { 169 repositoryConfig = Defaults.of(RepositoryConfig.class); 170 } 171 try { 172 repositoryInit = repositoryConfig.init().newInstance(); 173 } catch (ReflectiveOperationException e) { 174 throw new NuxeoException(e); 175 } 176 Granularity cleanup = repositoryConfig.cleanup(); 177 granularity = cleanup == Granularity.UNDEFINED ? Granularity.CLASS : cleanup; 178 } 179 180 public Granularity getGranularity() { 181 return granularity; 182 } 183 184 @Override 185 public void start(FeaturesRunner runner) { 186 try { 187 RuntimeHarness harness = runner.getFeature(RuntimeFeature.class).getHarness(); 188 storageConfiguration.init(); 189 for (String bundle : storageConfiguration.getExternalBundles()) { 190 try { 191 harness.deployBundle(bundle); 192 } catch (Exception e) { 193 throw new NuxeoException(e); 194 } 195 } 196 URL blobContribUrl = storageConfiguration.getBlobManagerContrib(runner); 197 harness.getContext().deploy(new URLStreamRef(blobContribUrl)); 198 URL repoContribUrl = storageConfiguration.getRepositoryContrib(runner); 199 harness.getContext().deploy(new URLStreamRef(repoContribUrl)); 200 } catch (IOException e) { 201 throw new NuxeoException(e); 202 } 203 } 204 205 @Override 206 public void beforeRun(FeaturesRunner runner) throws InterruptedException { 207 // wait for async tasks that may have been triggered by 208 // RuntimeFeature (typically repo initialization) 209 txFeature.nextTransaction(10, TimeUnit.SECONDS); 210 if (granularity != Granularity.METHOD) { 211 // we need a transaction to properly initialize the session 212 // but it hasn't been started yet by TransactionalFeature 213 TransactionHelper.startTransaction(); 214 initializeSession(runner); 215 TransactionHelper.commitOrRollbackTransaction(); 216 } 217 } 218 219 @Override 220 public void configure(FeaturesRunner runner, Binder binder) { 221 binder.bind(CoreSession.class).toProvider(() -> session); 222 } 223 224 @Override 225 public void afterRun(FeaturesRunner runner) { 226 waitForAsyncCompletion(); // fulltext and various workers 227 if (granularity != Granularity.METHOD) { 228 cleanupSession(runner); 229 } 230 if (session != null) { 231 releaseCoreSession(); 232 } 233 234 List<CoreSessionRegistrationInfo> leakedInfos = Framework.getService(CoreSessionService.class) 235 .getCoreSessionRegistrationInfos(); 236 if (leakedInfos.size() == 0) { 237 return; 238 } 239 AssertionError leakedErrors = new AssertionError(String.format("leaked %d sessions", leakedInfos.size())); 240 for (CoreSessionRegistrationInfo info : leakedInfos) { 241 try { 242 ((CloseableCoreSession) info.getCoreSession()).close(); 243 leakedErrors.addSuppressed(info); 244 } catch (RuntimeException cause) { 245 leakedErrors.addSuppressed(cause); 246 } 247 } 248 throw leakedErrors; 249 } 250 251 @Override 252 public void beforeSetup(FeaturesRunner runner) { 253 if (granularity == Granularity.METHOD) { 254 initializeSession(runner); 255 } 256 } 257 258 @Override 259 public void afterTeardown(FeaturesRunner runner) { 260 if (granularity == Granularity.METHOD) { 261 cleanupSession(runner); 262 } else { 263 waitForAsyncCompletion(); 264 } 265 } 266 267 public void waitForAsyncCompletion() { 268 txFeature.nextTransaction(); 269 } 270 271 protected void cleanupSession(FeaturesRunner runner) { 272 waitForAsyncCompletion(); 273 if (session == null) { 274 createCoreSession(); 275 } 276 TransactionHelper.runInNewTransaction(() -> { 277 try { 278 log.trace("remove everything except root"); 279 // remove proxies first, as we cannot remove a target if there's a proxy pointing to it 280 try (IterableQueryResult results = session.queryAndFetch( 281 "SELECT ecm:uuid FROM Document WHERE ecm:isProxy = 1", NXQL.NXQL)) { 282 batchRemoveDocuments(results); 283 } catch (QueryParseException e) { 284 // ignore, proxies disabled 285 } 286 // remove non-proxies 287 session.removeChildren(new PathRef("/")); 288 log.trace( 289 "remove orphan versions as OrphanVersionRemoverListener is not triggered by CoreSession#removeChildren"); 290 // remove remaining placeless documents 291 try (IterableQueryResult results = session.queryAndFetch("SELECT ecm:uuid FROM Document, Relation", 292 NXQL.NXQL)) { 293 batchRemoveDocuments(results); 294 } 295 // set original ACP on root 296 DocumentModel root = session.getRootDocument(); 297 root.setACP(rootAcp, true); 298 299 session.save(); 300 waitForAsyncCompletion(); 301 if (!session.query("SELECT * FROM Document, Relation").isEmpty()) { 302 log.error("Fail to cleanupSession, repository will not be empty for the next test."); 303 } 304 } catch (NuxeoException e) { 305 log.error("Unable to reset repository", e); 306 } finally { 307 CoreScope.INSTANCE.exit(); 308 } 309 }); 310 releaseCoreSession(); 311 cleaned = true; 312 } 313 314 protected void batchRemoveDocuments(IterableQueryResult results) { 315 String rootDocumentId = session.getRootDocument().getId(); 316 List<DocumentRef> ids = new ArrayList<>(); 317 for (Map<String, Serializable> result : results) { 318 String id = (String) result.get("ecm:uuid"); 319 if (id.equals(rootDocumentId)) { 320 continue; 321 } 322 ids.add(new IdRef(id)); 323 if (ids.size() >= 100) { 324 batchRemoveDocuments(ids); 325 ids.clear(); 326 } 327 } 328 if (!ids.isEmpty()) { 329 batchRemoveDocuments(ids); 330 } 331 } 332 333 protected void batchRemoveDocuments(List<DocumentRef> ids) { 334 List<DocumentRef> deferredIds = new ArrayList<>(); 335 for (DocumentRef id : ids) { 336 if (!session.exists(id)) { 337 continue; 338 } 339 if (session.canRemoveDocument(id)) { 340 session.removeDocument(id); 341 } else { 342 deferredIds.add(id); 343 } 344 } 345 session.removeDocuments(deferredIds.toArray(new DocumentRef[0])); 346 } 347 348 protected void initializeSession(FeaturesRunner runner) { 349 if (cleaned) { 350 // reinitialize repositories content 351 RepositoryService repositoryService = Framework.getService(RepositoryService.class); 352 repositoryService.initRepositories(); 353 cleaned = false; 354 } 355 CoreScope.INSTANCE.enter(); 356 createCoreSession(); 357 if (repositoryInit != null) { 358 repositoryInit.populate(session); 359 session.save(); 360 waitForAsyncCompletion(); 361 } 362 // save current root acp 363 DocumentModel root = session.getRootDocument(); 364 rootAcp = root.getACP(); 365 } 366 367 public String getRepositoryName() { 368 return getStorageConfiguration().getRepositoryName(); 369 } 370 371 public CloseableCoreSession openCoreSession(String username) { 372 return CoreInstance.openCoreSession(getRepositoryName(), username); 373 } 374 375 public CloseableCoreSession openCoreSession(NuxeoPrincipal principal) { 376 return CoreInstance.openCoreSession(getRepositoryName(), principal); 377 } 378 379 public CloseableCoreSession openCoreSession() { 380 return CoreInstance.openCoreSession(getRepositoryName()); 381 } 382 383 public CloseableCoreSession openCoreSessionSystem() { 384 return CoreInstance.openCoreSessionSystem(getRepositoryName()); 385 } 386 387 public CloseableCoreSession createCoreSession() { 388 UserPrincipal principal = new UserPrincipal("Administrator", new ArrayList<>(), false, true); 389 session = CoreInstance.openCoreSession(getRepositoryName(), principal); 390 return (CloseableCoreSession) session; 391 } 392 393 public CoreSession getCoreSession() { 394 return session; 395 } 396 397 public void releaseCoreSession() { 398 ((CloseableCoreSession) session).close(); 399 session = null; 400 } 401 402 public CoreSession reopenCoreSession() { 403 releaseCoreSession(); 404 waitForAsyncCompletion(); 405 // flush JCA cache to acquire a new low-level session 406 NuxeoContainer.resetConnectionManager(); 407 createCoreSession(); 408 return session; 409 } 410 411 public class CoreDeployer extends HotDeployer.ActionHandler { 412 413 @Override 414 public void exec(String action, String... agrs) throws Exception { 415 waitForAsyncCompletion(); 416 releaseCoreSession(); 417 next.exec(action, agrs); 418 createCoreSession(); 419 } 420 421 } 422 423}