001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.test;
021
022import java.io.IOException;
023import java.io.Serializable;
024import java.net.URL;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.api.CoreInstance;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.CoreSessionService;
036import org.nuxeo.ecm.core.api.CoreSessionService.CoreSessionRegistrationInfo;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.DocumentRef;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.IterableQueryResult;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.NuxeoPrincipal;
043import org.nuxeo.ecm.core.api.PathRef;
044import org.nuxeo.ecm.core.api.impl.UserPrincipal;
045import org.nuxeo.ecm.core.api.security.ACP;
046import org.nuxeo.ecm.core.query.QueryParseException;
047import org.nuxeo.ecm.core.query.sql.NXQL;
048import org.nuxeo.ecm.core.repository.RepositoryService;
049import org.nuxeo.ecm.core.test.TransactionalFeature.Waiter;
050import org.nuxeo.ecm.core.test.annotations.Granularity;
051import org.nuxeo.ecm.core.test.annotations.RepositoryConfig;
052import org.nuxeo.ecm.core.test.annotations.RepositoryInit;
053import org.nuxeo.ecm.core.work.api.WorkManager;
054import org.nuxeo.runtime.api.Framework;
055import org.nuxeo.runtime.jtajca.NuxeoContainer;
056import org.nuxeo.runtime.model.URLStreamRef;
057import org.nuxeo.runtime.reload.ReloadService;
058import org.nuxeo.runtime.test.runner.Defaults;
059import org.nuxeo.runtime.test.runner.Deploy;
060import org.nuxeo.runtime.test.runner.Features;
061import org.nuxeo.runtime.test.runner.FeaturesRunner;
062import org.nuxeo.runtime.test.runner.HotDeployer;
063import org.nuxeo.runtime.test.runner.LocalDeploy;
064import org.nuxeo.runtime.test.runner.RuntimeFeature;
065import org.nuxeo.runtime.test.runner.RuntimeHarness;
066import org.nuxeo.runtime.test.runner.SimpleFeature;
067import org.nuxeo.runtime.transaction.TransactionHelper;
068
069import com.google.inject.Binder;
070
071/**
072 * The core feature provides a default {@link CoreSession} that can be injected.
073 * <p>
074 * In addition, by injecting the feature itself, some helper methods are available to open new sessions.
075 */
076@Deploy({ "org.nuxeo.runtime.management", //
077        "org.nuxeo.runtime.metrics", //
078        "org.nuxeo.runtime.reload", // required by #CoreDeployer
079        "org.nuxeo.ecm.core.schema", //
080        "org.nuxeo.ecm.core.query", //
081        "org.nuxeo.ecm.core.api", //
082        "org.nuxeo.ecm.core.event", //
083        "org.nuxeo.ecm.core", //
084        "org.nuxeo.ecm.core.cache", //
085        "org.nuxeo.ecm.core.test", //
086        "org.nuxeo.ecm.core.mimetype", //
087        "org.nuxeo.ecm.core.convert", //
088        "org.nuxeo.ecm.core.convert.plugins", //
089        "org.nuxeo.ecm.core.storage", //
090        "org.nuxeo.ecm.core.storage.sql", //
091        "org.nuxeo.ecm.core.storage.sql.test", //
092        "org.nuxeo.ecm.core.storage.dbs", //
093        "org.nuxeo.ecm.core.storage.mem", //
094        "org.nuxeo.ecm.core.storage.mongodb", //
095        "org.nuxeo.ecm.platform.commandline.executor", //
096        "org.nuxeo.ecm.platform.el", //
097        "org.nuxeo.ecm.core.io", //
098})
099@Features({ RuntimeFeature.class, TransactionalFeature.class })
100@LocalDeploy("org.nuxeo.ecm.core.event:test-queuing.xml")
101public class CoreFeature extends SimpleFeature {
102
103    protected ACP rootAcp;
104
105    public class WorksWaiter implements Waiter {
106        @Override
107        public boolean await(long deadline) throws InterruptedException {
108            WorkManager workManager = Framework.getService(WorkManager.class);
109            if (workManager.awaitCompletion(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
110                return true;
111            }
112            logInfos(workManager);
113            return false;
114        }
115
116        protected void logInfos(WorkManager workManager) {
117            StringBuilder sb = new StringBuilder().append("Timed out while waiting for works").append(" ");
118            Iterator<String> queueids = workManager.getWorkQueueIds().iterator();
119            while (queueids.hasNext()) {
120                sb.append(System.lineSeparator());
121                String queueid = queueids.next();
122                sb.append(workManager.getMetrics(queueid));
123                sb.append(",works=");
124                Iterator<String> works = workManager.listWorkIds(queueid, null).iterator();
125                while (works.hasNext()) {
126                    sb.append(works.next());
127                    if (works.hasNext()) {
128                        sb.append(",");
129                    }
130                }
131            }
132            log.error(sb.toString(), new Throwable("stack trace"));
133        }
134
135    }
136
137    private static final Log log = LogFactory.getLog(CoreFeature.class);
138
139    protected StorageConfiguration storageConfiguration;
140
141    protected RepositoryInit repositoryInit;
142
143    protected Granularity granularity;
144
145    // this value gets injected
146    protected CoreSession session;
147
148    protected boolean cleaned;
149
150    protected TransactionalFeature txFeature;
151
152    public StorageConfiguration getStorageConfiguration() {
153        return storageConfiguration;
154    }
155
156    @Override
157    public void initialize(FeaturesRunner runner) {
158        runner.getFeature(RuntimeFeature.class).registerHandler(new CoreDeployer());
159
160        storageConfiguration = new StorageConfiguration(this);
161        txFeature = runner.getFeature(TransactionalFeature.class);
162        txFeature.addWaiter(new WorksWaiter());
163        // init from RepositoryConfig annotations
164        RepositoryConfig repositoryConfig = runner.getConfig(RepositoryConfig.class);
165        if (repositoryConfig == null) {
166            repositoryConfig = Defaults.of(RepositoryConfig.class);
167        }
168        try {
169            repositoryInit = repositoryConfig.init().newInstance();
170        } catch (ReflectiveOperationException e) {
171            throw new NuxeoException(e);
172        }
173        Granularity cleanup = repositoryConfig.cleanup();
174        granularity = cleanup == Granularity.UNDEFINED ? Granularity.CLASS : cleanup;
175    }
176
177    public Granularity getGranularity() {
178        return granularity;
179    }
180
181    @Override
182    public void start(FeaturesRunner runner) {
183        try {
184            RuntimeHarness harness = runner.getFeature(RuntimeFeature.class).getHarness();
185            storageConfiguration.init();
186            for (String bundle : storageConfiguration.getExternalBundles()) {
187                try {
188                    harness.deployBundle(bundle);
189                } catch (Exception e) {
190                    throw new NuxeoException(e);
191                }
192            }
193            URL blobContribUrl = storageConfiguration.getBlobManagerContrib(runner);
194            harness.getContext().deploy(new URLStreamRef(blobContribUrl));
195            URL repoContribUrl = storageConfiguration.getRepositoryContrib(runner);
196            harness.getContext().deploy(new URLStreamRef(repoContribUrl));
197        } catch (IOException e) {
198            throw new NuxeoException(e);
199        }
200    }
201
202    @Override
203    public void beforeRun(FeaturesRunner runner) throws InterruptedException {
204        // wait for async tasks that may have been triggered by
205        // RuntimeFeature (typically repo initialization)
206        txFeature.nextTransaction(10, TimeUnit.SECONDS);
207        if (granularity != Granularity.METHOD) {
208            // we need a transaction to properly initialize the session
209            // but it hasn't been started yet by TransactionalFeature
210            TransactionHelper.startTransaction();
211            initializeSession(runner);
212            TransactionHelper.commitOrRollbackTransaction();
213        }
214    }
215
216    @Override
217    public void configure(FeaturesRunner runner, Binder binder) {
218        binder.bind(CoreSession.class).toProvider(() -> session);
219    }
220
221    @Override
222    public void afterRun(FeaturesRunner runner) {
223        waitForAsyncCompletion(); // fulltext and various workers
224        if (granularity != Granularity.METHOD) {
225            cleanupSession(runner);
226        }
227        if (session != null) {
228            releaseCoreSession();
229        }
230
231        List<CoreSessionRegistrationInfo> leakedInfos = Framework.getService(CoreSessionService.class)
232                                                                 .getCoreSessionRegistrationInfos();
233        if (leakedInfos.size() == 0) {
234            return;
235        }
236        AssertionError leakedErrors = new AssertionError(String.format("leaked %d sessions", leakedInfos.size()));
237        for (CoreSessionRegistrationInfo info : leakedInfos) {
238            try {
239                info.getCoreSession().close();
240                leakedErrors.addSuppressed(info);
241            } catch (RuntimeException cause) {
242                leakedErrors.addSuppressed(cause);
243            }
244        }
245        throw leakedErrors;
246    }
247
248    @Override
249    public void beforeSetup(FeaturesRunner runner) {
250        if (granularity == Granularity.METHOD) {
251            initializeSession(runner);
252        }
253    }
254
255    @Override
256    public void afterTeardown(FeaturesRunner runner) {
257        if (granularity == Granularity.METHOD) {
258            cleanupSession(runner);
259        } else {
260            waitForAsyncCompletion();
261        }
262    }
263
264    public void waitForAsyncCompletion() {
265        txFeature.nextTransaction();
266    }
267
268    protected void cleanupSession(FeaturesRunner runner) {
269        waitForAsyncCompletion();
270        if (session == null) {
271            createCoreSession();
272        }
273        TransactionHelper.runInNewTransaction(() -> {
274            try {
275                log.trace("remove everything except root");
276                // remove proxies first, as we cannot remove a target if there's a proxy pointing to it
277                try (IterableQueryResult results = session
278                        .queryAndFetch("SELECT ecm:uuid FROM Document WHERE ecm:isProxy = 1", NXQL.NXQL)) {
279                    batchRemoveDocuments(results);
280                } catch (QueryParseException e) {
281                    // ignore, proxies disabled
282                }
283                // remove non-proxies
284                session.removeChildren(new PathRef("/"));
285                log.trace(
286                        "remove orphan versions as OrphanVersionRemoverListener is not triggered by CoreSession#removeChildren");
287                // remove remaining placeless documents
288                try (IterableQueryResult results = session.queryAndFetch("SELECT ecm:uuid FROM Document, Relation",
289                        NXQL.NXQL)) {
290                    batchRemoveDocuments(results);
291                }
292                // set original ACP on root
293                DocumentModel root = session.getRootDocument();
294                root.setACP(rootAcp, true);
295
296                session.save();
297                waitForAsyncCompletion();
298                if (!session.query("SELECT * FROM Document, Relation").isEmpty()) {
299                    log.error("Fail to cleanupSession, repository will not be empty for the next test.");
300                }
301            } catch (NuxeoException e) {
302                log.error("Unable to reset repository", e);
303            } finally {
304                CoreScope.INSTANCE.exit();
305            }
306        });
307        releaseCoreSession();
308        cleaned = true;
309    }
310
311    protected void batchRemoveDocuments(IterableQueryResult results) {
312        String rootDocumentId = session.getRootDocument().getId();
313        List<DocumentRef> ids = new ArrayList<>();
314        for (Map<String, Serializable> result : results) {
315            String id = (String) result.get("ecm:uuid");
316            if (id.equals(rootDocumentId)) {
317                continue;
318            }
319            ids.add(new IdRef(id));
320            if (ids.size() >= 100) {
321                batchRemoveDocuments(ids);
322                ids.clear();
323            }
324        }
325        if (!ids.isEmpty()) {
326            batchRemoveDocuments(ids);
327        }
328    }
329
330    protected void batchRemoveDocuments(List<DocumentRef> ids) {
331        List<DocumentRef> deferredIds = new ArrayList<>();
332        for (DocumentRef id : ids) {
333            if (!session.exists(id)) {
334                continue;
335            }
336            if (session.canRemoveDocument(id)) {
337                session.removeDocument(id);
338            } else {
339                deferredIds.add(id);
340            }
341        }
342        session.removeDocuments(deferredIds.toArray(new DocumentRef[0]));
343    }
344
345    protected void initializeSession(FeaturesRunner runner) {
346        if (cleaned) {
347            // reinitialize repositories content
348            RepositoryService repositoryService = Framework.getLocalService(RepositoryService.class);
349            repositoryService.initRepositories();
350            cleaned = false;
351        }
352        CoreScope.INSTANCE.enter();
353        createCoreSession();
354        if (repositoryInit != null) {
355            repositoryInit.populate(session);
356            session.save();
357            waitForAsyncCompletion();
358        }
359        // save current root acp
360        DocumentModel root = session.getRootDocument();
361        rootAcp = root.getACP();
362    }
363
364    public String getRepositoryName() {
365        return getStorageConfiguration().getRepositoryName();
366    }
367
368    public CoreSession openCoreSession(String username) {
369        return CoreInstance.openCoreSession(getRepositoryName(), username);
370    }
371
372    public CoreSession openCoreSession(NuxeoPrincipal principal) {
373        return CoreInstance.openCoreSession(getRepositoryName(), principal);
374    }
375
376    public CoreSession openCoreSession() {
377        return CoreInstance.openCoreSession(getRepositoryName());
378    }
379
380    public CoreSession openCoreSessionSystem() {
381        return CoreInstance.openCoreSessionSystem(getRepositoryName());
382    }
383
384    public CoreSession createCoreSession() {
385        UserPrincipal principal = new UserPrincipal("Administrator", new ArrayList<>(), false, true);
386        session = CoreInstance.openCoreSession(getRepositoryName(), principal);
387        return session;
388    }
389
390    public CoreSession getCoreSession() {
391        return session;
392    }
393
394    public void releaseCoreSession() {
395        session.close();
396        session = null;
397    }
398
399    public CoreSession reopenCoreSession() {
400        releaseCoreSession();
401        waitForAsyncCompletion();
402        // flush JCA cache to acquire a new low-level session
403        NuxeoContainer.resetConnectionManager();
404        createCoreSession();
405        return session;
406    }
407
408    public class CoreDeployer extends HotDeployer.ActionHandler {
409
410                @Override
411                public void exec(String action, String... agrs) throws Exception {
412                        waitForAsyncCompletion();
413                releaseCoreSession();
414                        next.exec(action, agrs);
415                        Framework.getService(ReloadService.class).reloadRepository();
416                createCoreSession();
417
418                }
419
420    }
421
422}