001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 *     Kevin Leturc <kleturc@nuxeo.com>
020 */
021package org.nuxeo.ecm.core.test;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.net.URL;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.CoreInstance;
035import org.nuxeo.ecm.core.api.CoreSession;
036import org.nuxeo.ecm.core.api.CoreSessionService;
037import org.nuxeo.ecm.core.api.CoreSessionService.CoreSessionRegistrationInfo;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.DocumentRef;
040import org.nuxeo.ecm.core.api.IdRef;
041import org.nuxeo.ecm.core.api.IterableQueryResult;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.NuxeoPrincipal;
044import org.nuxeo.ecm.core.api.PathRef;
045import org.nuxeo.ecm.core.api.impl.UserPrincipal;
046import org.nuxeo.ecm.core.api.security.ACP;
047import org.nuxeo.ecm.core.query.QueryParseException;
048import org.nuxeo.ecm.core.query.sql.NXQL;
049import org.nuxeo.ecm.core.repository.RepositoryService;
050import org.nuxeo.ecm.core.test.TransactionalFeature.Waiter;
051import org.nuxeo.ecm.core.test.annotations.Granularity;
052import org.nuxeo.ecm.core.test.annotations.RepositoryConfig;
053import org.nuxeo.ecm.core.test.annotations.RepositoryInit;
054import org.nuxeo.ecm.core.work.api.WorkManager;
055import org.nuxeo.runtime.api.Framework;
056import org.nuxeo.runtime.jtajca.NuxeoContainer;
057import org.nuxeo.runtime.model.URLStreamRef;
058import org.nuxeo.runtime.test.runner.Defaults;
059import org.nuxeo.runtime.test.runner.Deploy;
060import org.nuxeo.runtime.test.runner.Features;
061import org.nuxeo.runtime.test.runner.FeaturesRunner;
062import org.nuxeo.runtime.test.runner.HotDeployer;
063import org.nuxeo.runtime.test.runner.LocalDeploy;
064import org.nuxeo.runtime.test.runner.RuntimeFeature;
065import org.nuxeo.runtime.test.runner.RuntimeHarness;
066import org.nuxeo.runtime.test.runner.SimpleFeature;
067import org.nuxeo.runtime.transaction.TransactionHelper;
068
069import com.google.inject.Binder;
070
071/**
072 * The core feature provides a default {@link CoreSession} that can be injected.
073 * <p>
074 * In addition, by injecting the feature itself, some helper methods are available to open new sessions.
075 */
076@Deploy({ "org.nuxeo.runtime.management", //
077        "org.nuxeo.runtime.metrics", //
078        "org.nuxeo.runtime.reload", // required by #CoreDeployer
079        "org.nuxeo.runtime.kv", //
080        "org.nuxeo.runtime.pubsub", //
081        "org.nuxeo.runtime.mongodb", //
082        "org.nuxeo.runtime.migration", //
083        "org.nuxeo.ecm.core.schema", //
084        "org.nuxeo.ecm.core.query", //
085        "org.nuxeo.ecm.core.api", //
086        "org.nuxeo.ecm.core.event", //
087        "org.nuxeo.ecm.core", //
088        "org.nuxeo.ecm.core.cache", //
089        "org.nuxeo.ecm.core.test", //
090        "org.nuxeo.ecm.core.mimetype", //
091        "org.nuxeo.ecm.core.convert", //
092        "org.nuxeo.ecm.core.convert.plugins", //
093        "org.nuxeo.ecm.core.storage", //
094        "org.nuxeo.ecm.core.storage.sql", //
095        "org.nuxeo.ecm.core.storage.sql.test", //
096        "org.nuxeo.ecm.core.storage.dbs", //
097        "org.nuxeo.ecm.core.storage.mem", //
098        "org.nuxeo.ecm.core.storage.mongodb", //
099        "org.nuxeo.ecm.platform.commandline.executor", //
100        "org.nuxeo.ecm.platform.el", //
101        "org.nuxeo.ecm.core.io", //
102})
103@Features({ RuntimeFeature.class, TransactionalFeature.class })
104@LocalDeploy("org.nuxeo.ecm.core.event:test-queuing.xml")
105public class CoreFeature extends SimpleFeature {
106
107    protected ACP rootAcp;
108
109    public class WorksWaiter implements Waiter {
110        @Override
111        public boolean await(long deadline) throws InterruptedException {
112            WorkManager workManager = Framework.getService(WorkManager.class);
113            if (workManager.awaitCompletion(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
114                return true;
115            }
116            logInfos(workManager);
117            return false;
118        }
119
120        protected void logInfos(WorkManager workManager) {
121            StringBuilder sb = new StringBuilder().append("Timed out while waiting for works").append(" ");
122            Iterator<String> queueids = workManager.getWorkQueueIds().iterator();
123            while (queueids.hasNext()) {
124                sb.append(System.lineSeparator());
125                String queueid = queueids.next();
126                sb.append(workManager.getMetrics(queueid));
127                sb.append(",works=");
128                Iterator<String> works = workManager.listWorkIds(queueid, null).iterator();
129                while (works.hasNext()) {
130                    sb.append(works.next());
131                    if (works.hasNext()) {
132                        sb.append(",");
133                    }
134                }
135            }
136            log.error(sb.toString(), new Throwable("stack trace"));
137        }
138
139    }
140
141    private static final Log log = LogFactory.getLog(CoreFeature.class);
142
143    protected StorageConfiguration storageConfiguration;
144
145    protected RepositoryInit repositoryInit;
146
147    protected Granularity granularity;
148
149    // this value gets injected
150    protected CoreSession session;
151
152    protected boolean cleaned;
153
154    protected TransactionalFeature txFeature;
155
156    public StorageConfiguration getStorageConfiguration() {
157        return storageConfiguration;
158    }
159
160    @Override
161    public void initialize(FeaturesRunner runner) {
162        runner.getFeature(RuntimeFeature.class).registerHandler(new CoreDeployer());
163
164        storageConfiguration = new StorageConfiguration(this);
165        txFeature = runner.getFeature(TransactionalFeature.class);
166        txFeature.addWaiter(new WorksWaiter());
167        // init from RepositoryConfig annotations
168        RepositoryConfig repositoryConfig = runner.getConfig(RepositoryConfig.class);
169        if (repositoryConfig == null) {
170            repositoryConfig = Defaults.of(RepositoryConfig.class);
171        }
172        try {
173            repositoryInit = repositoryConfig.init().newInstance();
174        } catch (ReflectiveOperationException e) {
175            throw new NuxeoException(e);
176        }
177        Granularity cleanup = repositoryConfig.cleanup();
178        granularity = cleanup == Granularity.UNDEFINED ? Granularity.CLASS : cleanup;
179    }
180
181    public Granularity getGranularity() {
182        return granularity;
183    }
184
185    @Override
186    public void start(FeaturesRunner runner) {
187        try {
188            RuntimeHarness harness = runner.getFeature(RuntimeFeature.class).getHarness();
189            storageConfiguration.init();
190            for (String bundle : storageConfiguration.getExternalBundles()) {
191                try {
192                    harness.deployBundle(bundle);
193                } catch (Exception e) {
194                    throw new NuxeoException(e);
195                }
196            }
197            URL blobContribUrl = storageConfiguration.getBlobManagerContrib(runner);
198            harness.getContext().deploy(new URLStreamRef(blobContribUrl));
199            URL repoContribUrl = storageConfiguration.getRepositoryContrib(runner);
200            harness.getContext().deploy(new URLStreamRef(repoContribUrl));
201        } catch (IOException e) {
202            throw new NuxeoException(e);
203        }
204    }
205
206    @Override
207    public void beforeRun(FeaturesRunner runner) throws InterruptedException {
208        // wait for async tasks that may have been triggered by
209        // RuntimeFeature (typically repo initialization)
210        txFeature.nextTransaction(10, TimeUnit.SECONDS);
211        if (granularity != Granularity.METHOD) {
212            // we need a transaction to properly initialize the session
213            // but it hasn't been started yet by TransactionalFeature
214            TransactionHelper.startTransaction();
215            initializeSession(runner);
216            TransactionHelper.commitOrRollbackTransaction();
217        }
218    }
219
220    @Override
221    public void configure(FeaturesRunner runner, Binder binder) {
222        binder.bind(CoreSession.class).toProvider(() -> session);
223    }
224
225    @Override
226    public void afterRun(FeaturesRunner runner) {
227        waitForAsyncCompletion(); // fulltext and various workers
228        if (granularity != Granularity.METHOD) {
229            cleanupSession(runner);
230        }
231        if (session != null) {
232            releaseCoreSession();
233        }
234
235        List<CoreSessionRegistrationInfo> leakedInfos = Framework.getService(CoreSessionService.class)
236                                                                 .getCoreSessionRegistrationInfos();
237        if (leakedInfos.size() == 0) {
238            return;
239        }
240        AssertionError leakedErrors = new AssertionError(String.format("leaked %d sessions", leakedInfos.size()));
241        for (CoreSessionRegistrationInfo info : leakedInfos) {
242            try {
243                info.getCoreSession().close();
244                leakedErrors.addSuppressed(info);
245            } catch (RuntimeException cause) {
246                leakedErrors.addSuppressed(cause);
247            }
248        }
249        throw leakedErrors;
250    }
251
252    @Override
253    public void beforeSetup(FeaturesRunner runner) {
254        if (granularity == Granularity.METHOD) {
255            initializeSession(runner);
256        }
257    }
258
259    @Override
260    public void afterTeardown(FeaturesRunner runner) {
261        if (granularity == Granularity.METHOD) {
262            cleanupSession(runner);
263        } else {
264            waitForAsyncCompletion();
265        }
266    }
267
268    public void waitForAsyncCompletion() {
269        txFeature.nextTransaction();
270    }
271
272    protected void cleanupSession(FeaturesRunner runner) {
273        waitForAsyncCompletion();
274        if (session == null) {
275            createCoreSession();
276        }
277        TransactionHelper.runInNewTransaction(() -> {
278            try {
279                log.trace("remove everything except root");
280                // remove proxies first, as we cannot remove a target if there's a proxy pointing to it
281                try (IterableQueryResult results = session.queryAndFetch(
282                        "SELECT ecm:uuid FROM Document WHERE ecm:isProxy = 1", NXQL.NXQL)) {
283                    batchRemoveDocuments(results);
284                } catch (QueryParseException e) {
285                    // ignore, proxies disabled
286                }
287                // remove non-proxies
288                session.removeChildren(new PathRef("/"));
289                log.trace(
290                        "remove orphan versions as OrphanVersionRemoverListener is not triggered by CoreSession#removeChildren");
291                // remove remaining placeless documents
292                try (IterableQueryResult results = session.queryAndFetch("SELECT ecm:uuid FROM Document, Relation",
293                        NXQL.NXQL)) {
294                    batchRemoveDocuments(results);
295                }
296                // set original ACP on root
297                DocumentModel root = session.getRootDocument();
298                root.setACP(rootAcp, true);
299
300                session.save();
301                waitForAsyncCompletion();
302                if (!session.query("SELECT * FROM Document, Relation").isEmpty()) {
303                    log.error("Fail to cleanupSession, repository will not be empty for the next test.");
304                }
305            } catch (NuxeoException e) {
306                log.error("Unable to reset repository", e);
307            } finally {
308                CoreScope.INSTANCE.exit();
309            }
310        });
311        releaseCoreSession();
312        cleaned = true;
313    }
314
315    protected void batchRemoveDocuments(IterableQueryResult results) {
316        String rootDocumentId = session.getRootDocument().getId();
317        List<DocumentRef> ids = new ArrayList<>();
318        for (Map<String, Serializable> result : results) {
319            String id = (String) result.get("ecm:uuid");
320            if (id.equals(rootDocumentId)) {
321                continue;
322            }
323            ids.add(new IdRef(id));
324            if (ids.size() >= 100) {
325                batchRemoveDocuments(ids);
326                ids.clear();
327            }
328        }
329        if (!ids.isEmpty()) {
330            batchRemoveDocuments(ids);
331        }
332    }
333
334    protected void batchRemoveDocuments(List<DocumentRef> ids) {
335        List<DocumentRef> deferredIds = new ArrayList<>();
336        for (DocumentRef id : ids) {
337            if (!session.exists(id)) {
338                continue;
339            }
340            if (session.canRemoveDocument(id)) {
341                session.removeDocument(id);
342            } else {
343                deferredIds.add(id);
344            }
345        }
346        session.removeDocuments(deferredIds.toArray(new DocumentRef[0]));
347    }
348
349    protected void initializeSession(FeaturesRunner runner) {
350        if (cleaned) {
351            // reinitialize repositories content
352            RepositoryService repositoryService = Framework.getService(RepositoryService.class);
353            repositoryService.initRepositories();
354            cleaned = false;
355        }
356        CoreScope.INSTANCE.enter();
357        createCoreSession();
358        if (repositoryInit != null) {
359            repositoryInit.populate(session);
360            session.save();
361            waitForAsyncCompletion();
362        }
363        // save current root acp
364        DocumentModel root = session.getRootDocument();
365        rootAcp = root.getACP();
366    }
367
368    public String getRepositoryName() {
369        return getStorageConfiguration().getRepositoryName();
370    }
371
372    public CoreSession openCoreSession(String username) {
373        return CoreInstance.openCoreSession(getRepositoryName(), username);
374    }
375
376    public CoreSession openCoreSession(NuxeoPrincipal principal) {
377        return CoreInstance.openCoreSession(getRepositoryName(), principal);
378    }
379
380    public CoreSession openCoreSession() {
381        return CoreInstance.openCoreSession(getRepositoryName());
382    }
383
384    public CoreSession openCoreSessionSystem() {
385        return CoreInstance.openCoreSessionSystem(getRepositoryName());
386    }
387
388    public CoreSession createCoreSession() {
389        UserPrincipal principal = new UserPrincipal("Administrator", new ArrayList<>(), false, true);
390        session = CoreInstance.openCoreSession(getRepositoryName(), principal);
391        return session;
392    }
393
394    public CoreSession getCoreSession() {
395        return session;
396    }
397
398    public void releaseCoreSession() {
399        session.close();
400        session = null;
401    }
402
403    public CoreSession reopenCoreSession() {
404        releaseCoreSession();
405        waitForAsyncCompletion();
406        // flush JCA cache to acquire a new low-level session
407        NuxeoContainer.resetConnectionManager();
408        createCoreSession();
409        return session;
410    }
411
412    public class CoreDeployer extends HotDeployer.ActionHandler {
413
414        @Override
415        public void exec(String action, String... agrs) throws Exception {
416            waitForAsyncCompletion();
417            releaseCoreSession();
418            next.exec(action, agrs);
419            createCoreSession();
420        }
421
422    }
423
424}