001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 *     Kevin Leturc <kleturc@nuxeo.com>
020 */
021package org.nuxeo.ecm.core.test;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.net.URL;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.CloseableCoreSession;
035import org.nuxeo.ecm.core.api.CoreInstance;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.CoreSessionService;
038import org.nuxeo.ecm.core.api.CoreSessionService.CoreSessionRegistrationInfo;
039import org.nuxeo.ecm.core.api.DocumentModel;
040import org.nuxeo.ecm.core.api.DocumentRef;
041import org.nuxeo.ecm.core.api.IdRef;
042import org.nuxeo.ecm.core.api.IterableQueryResult;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.api.NuxeoPrincipal;
045import org.nuxeo.ecm.core.api.PathRef;
046import org.nuxeo.ecm.core.api.impl.UserPrincipal;
047import org.nuxeo.ecm.core.api.security.ACP;
048import org.nuxeo.ecm.core.query.QueryParseException;
049import org.nuxeo.ecm.core.query.sql.NXQL;
050import org.nuxeo.ecm.core.repository.RepositoryService;
051import org.nuxeo.ecm.core.test.TransactionalFeature.Waiter;
052import org.nuxeo.ecm.core.test.annotations.Granularity;
053import org.nuxeo.ecm.core.test.annotations.RepositoryConfig;
054import org.nuxeo.ecm.core.test.annotations.RepositoryInit;
055import org.nuxeo.ecm.core.work.api.WorkManager;
056import org.nuxeo.runtime.api.Framework;
057import org.nuxeo.runtime.jtajca.NuxeoContainer;
058import org.nuxeo.runtime.model.URLStreamRef;
059import org.nuxeo.runtime.test.runner.Defaults;
060import org.nuxeo.runtime.test.runner.Deploy;
061import org.nuxeo.runtime.test.runner.Features;
062import org.nuxeo.runtime.test.runner.FeaturesRunner;
063import org.nuxeo.runtime.test.runner.HotDeployer;
064import org.nuxeo.runtime.test.runner.RuntimeFeature;
065import org.nuxeo.runtime.test.runner.RuntimeHarness;
066import org.nuxeo.runtime.test.runner.SimpleFeature;
067import org.nuxeo.runtime.transaction.TransactionHelper;
068
069import com.google.inject.Binder;
070
071/**
072 * The core feature provides a default {@link CoreSession} that can be injected.
073 * <p>
074 * In addition, by injecting the feature itself, some helper methods are available to open new sessions.
075 */
076@Deploy("org.nuxeo.runtime.management")
077@Deploy("org.nuxeo.runtime.metrics")
078@Deploy("org.nuxeo.runtime.reload")
079@Deploy("org.nuxeo.runtime.kv")
080@Deploy("org.nuxeo.runtime.pubsub")
081@Deploy("org.nuxeo.runtime.mongodb")
082@Deploy("org.nuxeo.runtime.migration")
083@Deploy("org.nuxeo.ecm.core.schema")
084@Deploy("org.nuxeo.ecm.core.query")
085@Deploy("org.nuxeo.ecm.core.api")
086@Deploy("org.nuxeo.ecm.core.event")
087@Deploy("org.nuxeo.ecm.core")
088@Deploy("org.nuxeo.ecm.core.cache")
089@Deploy("org.nuxeo.ecm.core.test")
090@Deploy("org.nuxeo.ecm.core.mimetype")
091@Deploy("org.nuxeo.ecm.core.convert")
092@Deploy("org.nuxeo.ecm.core.convert.plugins")
093@Deploy("org.nuxeo.ecm.core.storage")
094@Deploy("org.nuxeo.ecm.core.storage.sql")
095@Deploy("org.nuxeo.ecm.core.storage.sql.test")
096@Deploy("org.nuxeo.ecm.core.storage.dbs")
097@Deploy("org.nuxeo.ecm.core.storage.mem")
098@Deploy("org.nuxeo.ecm.core.storage.mongodb")
099@Deploy("org.nuxeo.ecm.platform.commandline.executor")
100@Deploy("org.nuxeo.ecm.platform.el")
101@Deploy("org.nuxeo.ecm.core.io")
102@Features({ RuntimeFeature.class, TransactionalFeature.class })
103@Deploy("org.nuxeo.ecm.core.event:test-queuing.xml")
104public class CoreFeature extends SimpleFeature {
105
106    protected ACP rootAcp;
107
108    public class WorksWaiter implements Waiter {
109        @Override
110        public boolean await(long deadline) throws InterruptedException {
111            WorkManager workManager = Framework.getService(WorkManager.class);
112            if (workManager.awaitCompletion(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
113                return true;
114            }
115            logInfos(workManager);
116            return false;
117        }
118
119        protected void logInfos(WorkManager workManager) {
120            StringBuilder sb = new StringBuilder().append("Timed out while waiting for works").append(" ");
121            Iterator<String> queueids = workManager.getWorkQueueIds().iterator();
122            while (queueids.hasNext()) {
123                sb.append(System.lineSeparator());
124                String queueid = queueids.next();
125                sb.append(workManager.getMetrics(queueid));
126                sb.append(",works=");
127                Iterator<String> works = workManager.listWorkIds(queueid, null).iterator();
128                while (works.hasNext()) {
129                    sb.append(works.next());
130                    if (works.hasNext()) {
131                        sb.append(",");
132                    }
133                }
134            }
135            log.error(sb.toString(), new Throwable("stack trace"));
136        }
137
138    }
139
140    private static final Log log = LogFactory.getLog(CoreFeature.class);
141
142    protected StorageConfiguration storageConfiguration;
143
144    protected RepositoryInit repositoryInit;
145
146    protected Granularity granularity;
147
148    // this value gets injected
149    protected CoreSession session;
150
151    protected boolean cleaned;
152
153    protected TransactionalFeature txFeature;
154
155    public StorageConfiguration getStorageConfiguration() {
156        return storageConfiguration;
157    }
158
159    @Override
160    public void initialize(FeaturesRunner runner) {
161        runner.getFeature(RuntimeFeature.class).registerHandler(new CoreDeployer());
162
163        storageConfiguration = new StorageConfiguration(this);
164        txFeature = runner.getFeature(TransactionalFeature.class);
165        txFeature.addWaiter(new WorksWaiter());
166        // init from RepositoryConfig annotations
167        RepositoryConfig repositoryConfig = runner.getConfig(RepositoryConfig.class);
168        if (repositoryConfig == null) {
169            repositoryConfig = Defaults.of(RepositoryConfig.class);
170        }
171        try {
172            repositoryInit = repositoryConfig.init().newInstance();
173        } catch (ReflectiveOperationException e) {
174            throw new NuxeoException(e);
175        }
176        Granularity cleanup = repositoryConfig.cleanup();
177        granularity = cleanup == Granularity.UNDEFINED ? Granularity.CLASS : cleanup;
178    }
179
180    public Granularity getGranularity() {
181        return granularity;
182    }
183
184    @Override
185    public void start(FeaturesRunner runner) {
186        try {
187            RuntimeHarness harness = runner.getFeature(RuntimeFeature.class).getHarness();
188            storageConfiguration.init();
189            for (String bundle : storageConfiguration.getExternalBundles()) {
190                try {
191                    harness.deployBundle(bundle);
192                } catch (Exception e) {
193                    throw new NuxeoException(e);
194                }
195            }
196            URL blobContribUrl = storageConfiguration.getBlobManagerContrib(runner);
197            harness.getContext().deploy(new URLStreamRef(blobContribUrl));
198            URL repoContribUrl = storageConfiguration.getRepositoryContrib(runner);
199            harness.getContext().deploy(new URLStreamRef(repoContribUrl));
200        } catch (IOException e) {
201            throw new NuxeoException(e);
202        }
203    }
204
205    @Override
206    public void beforeRun(FeaturesRunner runner) throws InterruptedException {
207        // wait for async tasks that may have been triggered by
208        // RuntimeFeature (typically repo initialization)
209        txFeature.nextTransaction(10, TimeUnit.SECONDS);
210        if (granularity != Granularity.METHOD) {
211            // we need a transaction to properly initialize the session
212            // but it hasn't been started yet by TransactionalFeature
213            TransactionHelper.startTransaction();
214            initializeSession(runner);
215            TransactionHelper.commitOrRollbackTransaction();
216        }
217    }
218
219    @Override
220    public void configure(FeaturesRunner runner, Binder binder) {
221        binder.bind(CoreSession.class).toProvider(() -> session);
222    }
223
224    @Override
225    public void afterRun(FeaturesRunner runner) {
226        waitForAsyncCompletion(); // fulltext and various workers
227        if (granularity != Granularity.METHOD) {
228            cleanupSession(runner);
229        }
230        if (session != null) {
231            releaseCoreSession();
232        }
233
234        List<CoreSessionRegistrationInfo> leakedInfos = Framework.getService(CoreSessionService.class)
235                                                                 .getCoreSessionRegistrationInfos();
236        if (leakedInfos.size() == 0) {
237            return;
238        }
239        AssertionError leakedErrors = new AssertionError(String.format("leaked %d sessions", leakedInfos.size()));
240        for (CoreSessionRegistrationInfo info : leakedInfos) {
241            try {
242                ((CloseableCoreSession) info.getCoreSession()).close();
243                leakedErrors.addSuppressed(info);
244            } catch (RuntimeException cause) {
245                leakedErrors.addSuppressed(cause);
246            }
247        }
248        throw leakedErrors;
249    }
250
251    @Override
252    public void beforeSetup(FeaturesRunner runner) {
253        if (granularity == Granularity.METHOD) {
254            initializeSession(runner);
255        }
256    }
257
258    @Override
259    public void afterTeardown(FeaturesRunner runner) {
260        if (granularity == Granularity.METHOD) {
261            cleanupSession(runner);
262        } else {
263            waitForAsyncCompletion();
264        }
265    }
266
267    public void waitForAsyncCompletion() {
268        txFeature.nextTransaction();
269    }
270
271    protected void cleanupSession(FeaturesRunner runner) {
272        waitForAsyncCompletion();
273        if (session == null) {
274            createCoreSession();
275        }
276        TransactionHelper.runInNewTransaction(() -> {
277            try {
278                log.trace("remove everything except root");
279                // remove proxies first, as we cannot remove a target if there's a proxy pointing to it
280                try (IterableQueryResult results = session.queryAndFetch(
281                        "SELECT ecm:uuid FROM Document WHERE ecm:isProxy = 1", NXQL.NXQL)) {
282                    batchRemoveDocuments(results);
283                } catch (QueryParseException e) {
284                    // ignore, proxies disabled
285                }
286                // remove non-proxies
287                session.removeChildren(new PathRef("/"));
288                log.trace(
289                        "remove orphan versions as OrphanVersionRemoverListener is not triggered by CoreSession#removeChildren");
290                // remove remaining placeless documents
291                try (IterableQueryResult results = session.queryAndFetch("SELECT ecm:uuid FROM Document, Relation",
292                        NXQL.NXQL)) {
293                    batchRemoveDocuments(results);
294                }
295                // set original ACP on root
296                DocumentModel root = session.getRootDocument();
297                root.setACP(rootAcp, true);
298
299                session.save();
300                waitForAsyncCompletion();
301                if (!session.query("SELECT * FROM Document, Relation").isEmpty()) {
302                    log.error("Fail to cleanupSession, repository will not be empty for the next test.");
303                }
304            } catch (NuxeoException e) {
305                log.error("Unable to reset repository", e);
306            } finally {
307                CoreScope.INSTANCE.exit();
308            }
309        });
310        releaseCoreSession();
311        cleaned = true;
312    }
313
314    protected void batchRemoveDocuments(IterableQueryResult results) {
315        String rootDocumentId = session.getRootDocument().getId();
316        List<DocumentRef> ids = new ArrayList<>();
317        for (Map<String, Serializable> result : results) {
318            String id = (String) result.get("ecm:uuid");
319            if (id.equals(rootDocumentId)) {
320                continue;
321            }
322            ids.add(new IdRef(id));
323            if (ids.size() >= 100) {
324                batchRemoveDocuments(ids);
325                ids.clear();
326            }
327        }
328        if (!ids.isEmpty()) {
329            batchRemoveDocuments(ids);
330        }
331    }
332
333    protected void batchRemoveDocuments(List<DocumentRef> ids) {
334        List<DocumentRef> deferredIds = new ArrayList<>();
335        for (DocumentRef id : ids) {
336            if (!session.exists(id)) {
337                continue;
338            }
339            if (session.canRemoveDocument(id)) {
340                session.removeDocument(id);
341            } else {
342                deferredIds.add(id);
343            }
344        }
345        session.removeDocuments(deferredIds.toArray(new DocumentRef[0]));
346    }
347
348    protected void initializeSession(FeaturesRunner runner) {
349        if (cleaned) {
350            // reinitialize repositories content
351            RepositoryService repositoryService = Framework.getService(RepositoryService.class);
352            repositoryService.initRepositories();
353            cleaned = false;
354        }
355        CoreScope.INSTANCE.enter();
356        createCoreSession();
357        if (repositoryInit != null) {
358            repositoryInit.populate(session);
359            session.save();
360            waitForAsyncCompletion();
361        }
362        // save current root acp
363        DocumentModel root = session.getRootDocument();
364        rootAcp = root.getACP();
365    }
366
367    public String getRepositoryName() {
368        return getStorageConfiguration().getRepositoryName();
369    }
370
371    public CloseableCoreSession openCoreSession(String username) {
372        return CoreInstance.openCoreSession(getRepositoryName(), username);
373    }
374
375    public CloseableCoreSession openCoreSession(NuxeoPrincipal principal) {
376        return CoreInstance.openCoreSession(getRepositoryName(), principal);
377    }
378
379    public CloseableCoreSession openCoreSession() {
380        return CoreInstance.openCoreSession(getRepositoryName());
381    }
382
383    public CloseableCoreSession openCoreSessionSystem() {
384        return CoreInstance.openCoreSessionSystem(getRepositoryName());
385    }
386
387    public CloseableCoreSession createCoreSession() {
388        UserPrincipal principal = new UserPrincipal("Administrator", new ArrayList<>(), false, true);
389        session = CoreInstance.openCoreSession(getRepositoryName(), principal);
390        return (CloseableCoreSession) session;
391    }
392
393    public CoreSession getCoreSession() {
394        return session;
395    }
396
397    public void releaseCoreSession() {
398        ((CloseableCoreSession) session).close();
399        session = null;
400    }
401
402    public CoreSession reopenCoreSession() {
403        releaseCoreSession();
404        waitForAsyncCompletion();
405        // flush JCA cache to acquire a new low-level session
406        NuxeoContainer.resetConnectionManager();
407        createCoreSession();
408        return session;
409    }
410
411    public class CoreDeployer extends HotDeployer.ActionHandler {
412
413        @Override
414        public void exec(String action, String... agrs) throws Exception {
415            waitForAsyncCompletion();
416            releaseCoreSession();
417            next.exec(action, agrs);
418            createCoreSession();
419        }
420
421    }
422
423}