001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 *     Kevin Leturc <kleturc@nuxeo.com>
020 */
021package org.nuxeo.ecm.core.test;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.net.URL;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.api.CloseableCoreSession;
034import org.nuxeo.ecm.core.api.CoreInstance;
035import org.nuxeo.ecm.core.api.CoreSession;
036import org.nuxeo.ecm.core.api.CoreSessionService;
037import org.nuxeo.ecm.core.api.CoreSessionService.CoreSessionRegistrationInfo;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.DocumentRef;
040import org.nuxeo.ecm.core.api.IdRef;
041import org.nuxeo.ecm.core.api.IterableQueryResult;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.NuxeoPrincipal;
044import org.nuxeo.ecm.core.api.PathRef;
045import org.nuxeo.ecm.core.api.impl.UserPrincipal;
046import org.nuxeo.ecm.core.api.security.ACP;
047import org.nuxeo.ecm.core.query.QueryParseException;
048import org.nuxeo.ecm.core.query.sql.NXQL;
049import org.nuxeo.ecm.core.repository.RepositoryService;
050import org.nuxeo.ecm.core.test.annotations.Granularity;
051import org.nuxeo.ecm.core.test.annotations.RepositoryConfig;
052import org.nuxeo.ecm.core.test.annotations.RepositoryInit;
053import org.nuxeo.ecm.core.work.api.WorkManager;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.jtajca.NuxeoContainer;
056import org.nuxeo.runtime.model.URLStreamRef;
057import org.nuxeo.runtime.test.runner.Defaults;
058import org.nuxeo.runtime.test.runner.Deploy;
059import org.nuxeo.runtime.test.runner.Features;
060import org.nuxeo.runtime.test.runner.FeaturesRunner;
061import org.nuxeo.runtime.test.runner.HotDeployer;
062import org.nuxeo.runtime.test.runner.RuntimeFeature;
063import org.nuxeo.runtime.test.runner.RuntimeHarness;
064import org.nuxeo.runtime.test.runner.SimpleFeature;
065import org.nuxeo.runtime.test.runner.TransactionalFeature;
066import org.nuxeo.runtime.test.runner.TransactionalFeature.Waiter;
067import org.nuxeo.runtime.transaction.TransactionHelper;
068
069import com.google.inject.Binder;
070
071/**
072 * The core feature provides a default {@link CoreSession} that can be injected.
073 * <p>
074 * In addition, by injecting the feature itself, some helper methods are available to open new sessions.
075 */
076@Deploy("org.nuxeo.runtime.management")
077@Deploy("org.nuxeo.runtime.metrics")
078@Deploy("org.nuxeo.runtime.reload")
079@Deploy("org.nuxeo.runtime.kv")
080@Deploy("org.nuxeo.runtime.pubsub")
081@Deploy("org.nuxeo.runtime.mongodb")
082@Deploy("org.nuxeo.runtime.migration")
083@Deploy("org.nuxeo.runtime.stream")
084@Deploy("org.nuxeo.ecm.core.schema")
085@Deploy("org.nuxeo.ecm.core.query")
086@Deploy("org.nuxeo.ecm.core.api")
087@Deploy("org.nuxeo.ecm.core.event")
088@Deploy("org.nuxeo.ecm.core")
089@Deploy("org.nuxeo.ecm.core.io")
090@Deploy("org.nuxeo.ecm.core.cache")
091@Deploy("org.nuxeo.ecm.core.test")
092@Deploy("org.nuxeo.ecm.core.mimetype")
093@Deploy("org.nuxeo.ecm.core.convert")
094@Deploy("org.nuxeo.ecm.core.convert.plugins")
095@Deploy("org.nuxeo.ecm.core.storage")
096@Deploy("org.nuxeo.ecm.core.storage.sql")
097@Deploy("org.nuxeo.ecm.core.storage.sql.test")
098@Deploy("org.nuxeo.ecm.core.storage.dbs")
099@Deploy("org.nuxeo.ecm.core.storage.mem")
100@Deploy("org.nuxeo.ecm.core.storage.mongodb")
101@Deploy("org.nuxeo.ecm.platform.commandline.executor")
102@Deploy("org.nuxeo.ecm.platform.el")
103@Deploy("org.nuxeo.ecm.core.event:test-queuing.xml")
104@RepositoryConfig(cleanup = Granularity.METHOD)
105@Features({ RuntimeFeature.class, TransactionalFeature.class })
106public class CoreFeature extends SimpleFeature {
107
108    protected ACP rootAcp;
109
110    public class WorksWaiter implements Waiter {
111        @Override
112        public boolean await(long deadline) throws InterruptedException {
113            WorkManager workManager = Framework.getService(WorkManager.class);
114            if (workManager.awaitCompletion(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
115                return true;
116            }
117            logInfos(workManager);
118            return false;
119        }
120
121        protected void logInfos(WorkManager workManager) {
122            StringBuilder sb = new StringBuilder().append("Timed out while waiting for works").append(" ");
123            for (String queueId : workManager.getWorkQueueIds()) {
124                sb.append(System.lineSeparator());
125                sb.append(workManager.getMetrics(queueId));
126            }
127            log.error(sb.toString(), new Throwable("stack trace"));
128        }
129
130    }
131
132    private static final Log log = LogFactory.getLog(CoreFeature.class);
133
134    protected StorageConfiguration storageConfiguration;
135
136    protected RepositoryInit repositoryInit;
137
138    protected Granularity granularity;
139
140    // this value gets injected
141    protected CoreSession session;
142
143    protected boolean cleaned;
144
145    protected TransactionalFeature txFeature;
146
147    public StorageConfiguration getStorageConfiguration() {
148        return storageConfiguration;
149    }
150
151    @Override
152    public void initialize(FeaturesRunner runner) {
153        runner.getFeature(RuntimeFeature.class).registerHandler(new CoreDeployer());
154
155        storageConfiguration = new StorageConfiguration(this);
156        txFeature = runner.getFeature(TransactionalFeature.class);
157        txFeature.addWaiter(new WorksWaiter());
158        // init from RepositoryConfig annotations
159        RepositoryConfig repositoryConfig = runner.getConfig(RepositoryConfig.class);
160        if (repositoryConfig == null) {
161            repositoryConfig = Defaults.of(RepositoryConfig.class);
162        }
163        try {
164            repositoryInit = repositoryConfig.init().newInstance();
165        } catch (ReflectiveOperationException e) {
166            throw new NuxeoException(e);
167        }
168        Granularity cleanup = repositoryConfig.cleanup();
169        granularity = cleanup == Granularity.UNDEFINED ? Granularity.CLASS : cleanup;
170    }
171
172    public Granularity getGranularity() {
173        return granularity;
174    }
175
176    @Override
177    public void start(FeaturesRunner runner) {
178        try {
179            RuntimeHarness harness = runner.getFeature(RuntimeFeature.class).getHarness();
180            storageConfiguration.init();
181            for (String bundle : storageConfiguration.getExternalBundles()) {
182                try {
183                    harness.deployBundle(bundle);
184                } catch (Exception e) {
185                    throw new NuxeoException(e);
186                }
187            }
188            URL blobContribUrl = storageConfiguration.getBlobManagerContrib(runner);
189            harness.getContext().deploy(new URLStreamRef(blobContribUrl));
190            URL repoContribUrl = storageConfiguration.getRepositoryContrib(runner);
191            harness.getContext().deploy(new URLStreamRef(repoContribUrl));
192        } catch (IOException e) {
193            throw new NuxeoException(e);
194        }
195    }
196
197    @Override
198    public void beforeRun(FeaturesRunner runner) throws InterruptedException {
199        // wait for async tasks that may have been triggered by
200        // RuntimeFeature (typically repo initialization)
201        txFeature.nextTransaction(10, TimeUnit.SECONDS);
202        if (granularity != Granularity.METHOD) {
203            // we need a transaction to properly initialize the session
204            // but it hasn't been started yet by TransactionalFeature
205            TransactionHelper.startTransaction();
206            initializeSession(runner);
207            TransactionHelper.commitOrRollbackTransaction();
208        }
209    }
210
211    @Override
212    public void configure(FeaturesRunner runner, Binder binder) {
213        binder.bind(CoreSession.class).toProvider(() -> session);
214    }
215
216    @Override
217    public void afterRun(FeaturesRunner runner) {
218        waitForAsyncCompletion(); // fulltext and various workers
219        if (granularity != Granularity.METHOD) {
220            cleanupSession(runner);
221        }
222        if (session != null) {
223            releaseCoreSession();
224        }
225
226        List<CoreSessionRegistrationInfo> leakedInfos = Framework.getService(CoreSessionService.class)
227                                                                 .getCoreSessionRegistrationInfos();
228        if (leakedInfos.size() == 0) {
229            return;
230        }
231        AssertionError leakedErrors = new AssertionError(String.format("leaked %d sessions", leakedInfos.size()));
232        for (CoreSessionRegistrationInfo info : leakedInfos) {
233            try {
234                ((CloseableCoreSession) info.getCoreSession()).close();
235                leakedErrors.addSuppressed(info);
236            } catch (RuntimeException cause) {
237                leakedErrors.addSuppressed(cause);
238            }
239        }
240        throw leakedErrors;
241    }
242
243    @Override
244    public void beforeSetup(FeaturesRunner runner) {
245        if (granularity == Granularity.METHOD) {
246            initializeSession(runner);
247        }
248    }
249
250    @Override
251    public void afterTeardown(FeaturesRunner runner) {
252        if (granularity == Granularity.METHOD) {
253            cleanupSession(runner);
254        } else {
255            waitForAsyncCompletion();
256        }
257    }
258
259    public void waitForAsyncCompletion() {
260        txFeature.nextTransaction();
261    }
262
263    protected void cleanupSession(FeaturesRunner runner) {
264        waitForAsyncCompletion();
265        if (session == null) {
266            createCoreSession();
267        }
268        TransactionHelper.runInNewTransaction(() -> {
269            try {
270                log.trace("remove everything except root");
271                // remove proxies first, as we cannot remove a target if there's a proxy pointing to it
272                try (IterableQueryResult results = session.queryAndFetch(
273                        "SELECT ecm:uuid FROM Document WHERE ecm:isProxy = 1", NXQL.NXQL)) {
274                    batchRemoveDocuments(results);
275                } catch (QueryParseException e) {
276                    // ignore, proxies disabled
277                }
278                // remove non-proxies
279                session.removeChildren(new PathRef("/"));
280                log.trace(
281                        "remove orphan versions as OrphanVersionRemoverListener is not triggered by CoreSession#removeChildren");
282                // remove remaining placeless documents
283                try (IterableQueryResult results = session.queryAndFetch("SELECT ecm:uuid FROM Document, Relation",
284                        NXQL.NXQL)) {
285                    batchRemoveDocuments(results);
286                }
287                // set original ACP on root
288                DocumentModel root = session.getRootDocument();
289                root.setACP(rootAcp, true);
290
291                session.save();
292                waitForAsyncCompletion();
293                if (!session.query("SELECT * FROM Document, Relation").isEmpty()) {
294                    log.error("Fail to cleanupSession, repository will not be empty for the next test.");
295                }
296            } catch (NuxeoException e) {
297                log.error("Unable to reset repository", e);
298            } finally {
299                CoreScope.INSTANCE.exit();
300            }
301        });
302        releaseCoreSession();
303        cleaned = true;
304    }
305
306    protected void batchRemoveDocuments(IterableQueryResult results) {
307        String rootDocumentId = session.getRootDocument().getId();
308        List<DocumentRef> ids = new ArrayList<>();
309        for (Map<String, Serializable> result : results) {
310            String id = (String) result.get("ecm:uuid");
311            if (id.equals(rootDocumentId)) {
312                continue;
313            }
314            ids.add(new IdRef(id));
315            if (ids.size() >= 100) {
316                batchRemoveDocuments(ids);
317                ids.clear();
318            }
319        }
320        if (!ids.isEmpty()) {
321            batchRemoveDocuments(ids);
322        }
323    }
324
325    protected void batchRemoveDocuments(List<DocumentRef> ids) {
326        List<DocumentRef> deferredIds = new ArrayList<>();
327        for (DocumentRef id : ids) {
328            if (!session.exists(id)) {
329                continue;
330            }
331            if (session.canRemoveDocument(id)) {
332                session.removeDocument(id);
333            } else {
334                deferredIds.add(id);
335            }
336        }
337        session.removeDocuments(deferredIds.toArray(new DocumentRef[0]));
338    }
339
340    protected void initializeSession(FeaturesRunner runner) {
341        if (cleaned) {
342            // reinitialize repositories content
343            RepositoryService repositoryService = Framework.getService(RepositoryService.class);
344            repositoryService.initRepositories();
345            cleaned = false;
346        }
347        CoreScope.INSTANCE.enter();
348        createCoreSession();
349        if (repositoryInit != null) {
350            repositoryInit.populate(session);
351            session.save();
352            waitForAsyncCompletion();
353        }
354        // save current root acp
355        DocumentModel root = session.getRootDocument();
356        rootAcp = root.getACP();
357    }
358
359    public String getRepositoryName() {
360        return getStorageConfiguration().getRepositoryName();
361    }
362
363    public CloseableCoreSession openCoreSession(String username) {
364        return CoreInstance.openCoreSession(getRepositoryName(), username);
365    }
366
367    public CloseableCoreSession openCoreSession(NuxeoPrincipal principal) {
368        return CoreInstance.openCoreSession(getRepositoryName(), principal);
369    }
370
371    public CloseableCoreSession openCoreSession() {
372        return CoreInstance.openCoreSession(getRepositoryName());
373    }
374
375    public CloseableCoreSession openCoreSessionSystem() {
376        return CoreInstance.openCoreSessionSystem(getRepositoryName());
377    }
378
379    public CloseableCoreSession createCoreSession() {
380        UserPrincipal principal = new UserPrincipal("Administrator", new ArrayList<>(), false, true);
381        session = CoreInstance.openCoreSession(getRepositoryName(), principal);
382        return (CloseableCoreSession) session;
383    }
384
385    public CoreSession getCoreSession() {
386        return session;
387    }
388
389    public void releaseCoreSession() {
390        ((CloseableCoreSession) session).close();
391        session = null;
392    }
393
394    public CoreSession reopenCoreSession() {
395        releaseCoreSession();
396        waitForAsyncCompletion();
397        // flush JCA cache to acquire a new low-level session
398        NuxeoContainer.resetConnectionManager();
399        createCoreSession();
400        return session;
401    }
402
403    public class CoreDeployer extends HotDeployer.ActionHandler {
404
405        @Override
406        public void exec(String action, String... agrs) throws Exception {
407            waitForAsyncCompletion();
408            releaseCoreSession();
409            next.exec(action, agrs);
410            createCoreSession();
411        }
412
413    }
414
415}