001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.test; 021 022import java.io.IOException; 023import java.io.Serializable; 024import java.net.URL; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.api.CoreInstance; 034import org.nuxeo.ecm.core.api.CoreSession; 035import org.nuxeo.ecm.core.api.CoreSessionService; 036import org.nuxeo.ecm.core.api.CoreSessionService.CoreSessionRegistrationInfo; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.DocumentRef; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.IterableQueryResult; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.NuxeoPrincipal; 043import org.nuxeo.ecm.core.api.PathRef; 044import org.nuxeo.ecm.core.api.impl.UserPrincipal; 045import org.nuxeo.ecm.core.api.security.ACP; 046import org.nuxeo.ecm.core.query.QueryParseException; 047import org.nuxeo.ecm.core.query.sql.NXQL; 048import org.nuxeo.ecm.core.repository.RepositoryService; 049import org.nuxeo.ecm.core.test.TransactionalFeature.Waiter; 050import org.nuxeo.ecm.core.test.annotations.Granularity; 051import org.nuxeo.ecm.core.test.annotations.RepositoryConfig; 052import org.nuxeo.ecm.core.test.annotations.RepositoryInit; 053import org.nuxeo.ecm.core.work.api.WorkManager; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.jtajca.NuxeoContainer; 056import org.nuxeo.runtime.model.URLStreamRef; 057import org.nuxeo.runtime.reload.ReloadService; 058import org.nuxeo.runtime.test.runner.Defaults; 059import org.nuxeo.runtime.test.runner.Deploy; 060import org.nuxeo.runtime.test.runner.Features; 061import org.nuxeo.runtime.test.runner.FeaturesRunner; 062import org.nuxeo.runtime.test.runner.HotDeployer; 063import org.nuxeo.runtime.test.runner.LocalDeploy; 064import org.nuxeo.runtime.test.runner.RuntimeFeature; 065import org.nuxeo.runtime.test.runner.RuntimeHarness; 066import org.nuxeo.runtime.test.runner.SimpleFeature; 067import org.nuxeo.runtime.transaction.TransactionHelper; 068 069import com.google.inject.Binder; 070 071/** 072 * The core feature provides a default {@link CoreSession} that can be injected. 073 * <p> 074 * In addition, by injecting the feature itself, some helper methods are available to open new sessions. 075 */ 076@Deploy({ "org.nuxeo.runtime.management", // 077 "org.nuxeo.runtime.metrics", // 078 "org.nuxeo.runtime.reload", // required by #CoreDeployer 079 "org.nuxeo.ecm.core.schema", // 080 "org.nuxeo.ecm.core.query", // 081 "org.nuxeo.ecm.core.api", // 082 "org.nuxeo.ecm.core.event", // 083 "org.nuxeo.ecm.core", // 084 "org.nuxeo.ecm.core.cache", // 085 "org.nuxeo.ecm.core.test", // 086 "org.nuxeo.ecm.core.mimetype", // 087 "org.nuxeo.ecm.core.convert", // 088 "org.nuxeo.ecm.core.convert.plugins", // 089 "org.nuxeo.ecm.core.storage", // 090 "org.nuxeo.ecm.core.storage.sql", // 091 "org.nuxeo.ecm.core.storage.sql.test", // 092 "org.nuxeo.ecm.core.storage.dbs", // 093 "org.nuxeo.ecm.core.storage.mem", // 094 "org.nuxeo.ecm.core.storage.mongodb", // 095 "org.nuxeo.ecm.platform.commandline.executor", // 096 "org.nuxeo.ecm.platform.el", // 097 "org.nuxeo.ecm.core.io", // 098}) 099@Features({ RuntimeFeature.class, TransactionalFeature.class }) 100@LocalDeploy("org.nuxeo.ecm.core.event:test-queuing.xml") 101public class CoreFeature extends SimpleFeature { 102 103 protected ACP rootAcp; 104 105 public class WorksWaiter implements Waiter { 106 @Override 107 public boolean await(long deadline) throws InterruptedException { 108 WorkManager workManager = Framework.getService(WorkManager.class); 109 if (workManager.awaitCompletion(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { 110 return true; 111 } 112 logInfos(workManager); 113 return false; 114 } 115 116 protected void logInfos(WorkManager workManager) { 117 StringBuilder sb = new StringBuilder().append("Timed out while waiting for works").append(" "); 118 Iterator<String> queueids = workManager.getWorkQueueIds().iterator(); 119 while (queueids.hasNext()) { 120 sb.append(System.lineSeparator()); 121 String queueid = queueids.next(); 122 sb.append(workManager.getMetrics(queueid)); 123 sb.append(",works="); 124 Iterator<String> works = workManager.listWorkIds(queueid, null).iterator(); 125 while (works.hasNext()) { 126 sb.append(works.next()); 127 if (works.hasNext()) { 128 sb.append(","); 129 } 130 } 131 } 132 log.error(sb.toString(), new Throwable("stack trace")); 133 } 134 135 } 136 137 private static final Log log = LogFactory.getLog(CoreFeature.class); 138 139 protected StorageConfiguration storageConfiguration; 140 141 protected RepositoryInit repositoryInit; 142 143 protected Granularity granularity; 144 145 // this value gets injected 146 protected CoreSession session; 147 148 protected boolean cleaned; 149 150 protected TransactionalFeature txFeature; 151 152 public StorageConfiguration getStorageConfiguration() { 153 return storageConfiguration; 154 } 155 156 @Override 157 public void initialize(FeaturesRunner runner) { 158 runner.getFeature(RuntimeFeature.class).registerHandler(new CoreDeployer()); 159 160 storageConfiguration = new StorageConfiguration(this); 161 txFeature = runner.getFeature(TransactionalFeature.class); 162 txFeature.addWaiter(new WorksWaiter()); 163 // init from RepositoryConfig annotations 164 RepositoryConfig repositoryConfig = runner.getConfig(RepositoryConfig.class); 165 if (repositoryConfig == null) { 166 repositoryConfig = Defaults.of(RepositoryConfig.class); 167 } 168 try { 169 repositoryInit = repositoryConfig.init().newInstance(); 170 } catch (ReflectiveOperationException e) { 171 throw new NuxeoException(e); 172 } 173 Granularity cleanup = repositoryConfig.cleanup(); 174 granularity = cleanup == Granularity.UNDEFINED ? Granularity.CLASS : cleanup; 175 } 176 177 public Granularity getGranularity() { 178 return granularity; 179 } 180 181 @Override 182 public void start(FeaturesRunner runner) { 183 try { 184 RuntimeHarness harness = runner.getFeature(RuntimeFeature.class).getHarness(); 185 storageConfiguration.init(); 186 for (String bundle : storageConfiguration.getExternalBundles()) { 187 try { 188 harness.deployBundle(bundle); 189 } catch (Exception e) { 190 throw new NuxeoException(e); 191 } 192 } 193 URL blobContribUrl = storageConfiguration.getBlobManagerContrib(runner); 194 harness.getContext().deploy(new URLStreamRef(blobContribUrl)); 195 URL repoContribUrl = storageConfiguration.getRepositoryContrib(runner); 196 harness.getContext().deploy(new URLStreamRef(repoContribUrl)); 197 } catch (IOException e) { 198 throw new NuxeoException(e); 199 } 200 } 201 202 @Override 203 public void beforeRun(FeaturesRunner runner) throws InterruptedException { 204 // wait for async tasks that may have been triggered by 205 // RuntimeFeature (typically repo initialization) 206 txFeature.nextTransaction(10, TimeUnit.SECONDS); 207 if (granularity != Granularity.METHOD) { 208 // we need a transaction to properly initialize the session 209 // but it hasn't been started yet by TransactionalFeature 210 TransactionHelper.startTransaction(); 211 initializeSession(runner); 212 TransactionHelper.commitOrRollbackTransaction(); 213 } 214 } 215 216 @Override 217 public void configure(FeaturesRunner runner, Binder binder) { 218 binder.bind(CoreSession.class).toProvider(() -> session); 219 } 220 221 @Override 222 public void afterRun(FeaturesRunner runner) { 223 waitForAsyncCompletion(); // fulltext and various workers 224 if (granularity != Granularity.METHOD) { 225 cleanupSession(runner); 226 } 227 if (session != null) { 228 releaseCoreSession(); 229 } 230 231 List<CoreSessionRegistrationInfo> leakedInfos = Framework.getService(CoreSessionService.class) 232 .getCoreSessionRegistrationInfos(); 233 if (leakedInfos.size() == 0) { 234 return; 235 } 236 AssertionError leakedErrors = new AssertionError(String.format("leaked %d sessions", leakedInfos.size())); 237 for (CoreSessionRegistrationInfo info : leakedInfos) { 238 try { 239 info.getCoreSession().close(); 240 leakedErrors.addSuppressed(info); 241 } catch (RuntimeException cause) { 242 leakedErrors.addSuppressed(cause); 243 } 244 } 245 throw leakedErrors; 246 } 247 248 @Override 249 public void beforeSetup(FeaturesRunner runner) { 250 if (granularity == Granularity.METHOD) { 251 initializeSession(runner); 252 } 253 } 254 255 @Override 256 public void afterTeardown(FeaturesRunner runner) { 257 if (granularity == Granularity.METHOD) { 258 cleanupSession(runner); 259 } else { 260 waitForAsyncCompletion(); 261 } 262 } 263 264 public void waitForAsyncCompletion() { 265 txFeature.nextTransaction(); 266 } 267 268 protected void cleanupSession(FeaturesRunner runner) { 269 waitForAsyncCompletion(); 270 if (session == null) { 271 createCoreSession(); 272 } 273 TransactionHelper.runInNewTransaction(() -> { 274 try { 275 log.trace("remove everything except root"); 276 // remove proxies first, as we cannot remove a target if there's a proxy pointing to it 277 try (IterableQueryResult results = session 278 .queryAndFetch("SELECT ecm:uuid FROM Document WHERE ecm:isProxy = 1", NXQL.NXQL)) { 279 batchRemoveDocuments(results); 280 } catch (QueryParseException e) { 281 // ignore, proxies disabled 282 } 283 // remove non-proxies 284 session.removeChildren(new PathRef("/")); 285 log.trace( 286 "remove orphan versions as OrphanVersionRemoverListener is not triggered by CoreSession#removeChildren"); 287 // remove remaining placeless documents 288 try (IterableQueryResult results = session.queryAndFetch("SELECT ecm:uuid FROM Document, Relation", 289 NXQL.NXQL)) { 290 batchRemoveDocuments(results); 291 } 292 // set original ACP on root 293 DocumentModel root = session.getRootDocument(); 294 root.setACP(rootAcp, true); 295 296 session.save(); 297 waitForAsyncCompletion(); 298 if (!session.query("SELECT * FROM Document, Relation").isEmpty()) { 299 log.error("Fail to cleanupSession, repository will not be empty for the next test."); 300 } 301 } catch (NuxeoException e) { 302 log.error("Unable to reset repository", e); 303 } finally { 304 CoreScope.INSTANCE.exit(); 305 } 306 }); 307 releaseCoreSession(); 308 cleaned = true; 309 } 310 311 protected void batchRemoveDocuments(IterableQueryResult results) { 312 String rootDocumentId = session.getRootDocument().getId(); 313 List<DocumentRef> ids = new ArrayList<>(); 314 for (Map<String, Serializable> result : results) { 315 String id = (String) result.get("ecm:uuid"); 316 if (id.equals(rootDocumentId)) { 317 continue; 318 } 319 ids.add(new IdRef(id)); 320 if (ids.size() >= 100) { 321 batchRemoveDocuments(ids); 322 ids.clear(); 323 } 324 } 325 if (!ids.isEmpty()) { 326 batchRemoveDocuments(ids); 327 } 328 } 329 330 protected void batchRemoveDocuments(List<DocumentRef> ids) { 331 List<DocumentRef> deferredIds = new ArrayList<>(); 332 for (DocumentRef id : ids) { 333 if (!session.exists(id)) { 334 continue; 335 } 336 if (session.canRemoveDocument(id)) { 337 session.removeDocument(id); 338 } else { 339 deferredIds.add(id); 340 } 341 } 342 session.removeDocuments(deferredIds.toArray(new DocumentRef[0])); 343 } 344 345 protected void initializeSession(FeaturesRunner runner) { 346 if (cleaned) { 347 // reinitialize repositories content 348 RepositoryService repositoryService = Framework.getLocalService(RepositoryService.class); 349 repositoryService.initRepositories(); 350 cleaned = false; 351 } 352 CoreScope.INSTANCE.enter(); 353 createCoreSession(); 354 if (repositoryInit != null) { 355 repositoryInit.populate(session); 356 session.save(); 357 waitForAsyncCompletion(); 358 } 359 // save current root acp 360 DocumentModel root = session.getRootDocument(); 361 rootAcp = root.getACP(); 362 } 363 364 public String getRepositoryName() { 365 return getStorageConfiguration().getRepositoryName(); 366 } 367 368 public CoreSession openCoreSession(String username) { 369 return CoreInstance.openCoreSession(getRepositoryName(), username); 370 } 371 372 public CoreSession openCoreSession(NuxeoPrincipal principal) { 373 return CoreInstance.openCoreSession(getRepositoryName(), principal); 374 } 375 376 public CoreSession openCoreSession() { 377 return CoreInstance.openCoreSession(getRepositoryName()); 378 } 379 380 public CoreSession openCoreSessionSystem() { 381 return CoreInstance.openCoreSessionSystem(getRepositoryName()); 382 } 383 384 public CoreSession createCoreSession() { 385 UserPrincipal principal = new UserPrincipal("Administrator", new ArrayList<>(), false, true); 386 session = CoreInstance.openCoreSession(getRepositoryName(), principal); 387 return session; 388 } 389 390 public CoreSession getCoreSession() { 391 return session; 392 } 393 394 public void releaseCoreSession() { 395 session.close(); 396 session = null; 397 } 398 399 public CoreSession reopenCoreSession() { 400 releaseCoreSession(); 401 waitForAsyncCompletion(); 402 // flush JCA cache to acquire a new low-level session 403 NuxeoContainer.resetConnectionManager(); 404 createCoreSession(); 405 return session; 406 } 407 408 public class CoreDeployer extends HotDeployer.ActionHandler { 409 410 @Override 411 public void exec(String action, String... agrs) throws Exception { 412 waitForAsyncCompletion(); 413 releaseCoreSession(); 414 next.exec(action, agrs); 415 Framework.getService(ReloadService.class).reloadRepository(); 416 createCoreSession(); 417 418 } 419 420 } 421 422}