001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 */
019package org.nuxeo.ecm.platform.audit.service;
020
021import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
022import static org.nuxeo.ecm.platform.audit.service.LogEntryProvider.createProvider;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.function.Consumer;
034import java.util.function.Function;
035
036import org.nuxeo.ecm.core.api.CursorResult;
037import org.nuxeo.ecm.core.api.CursorService;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.ecm.core.api.ScrollResult;
040import org.nuxeo.ecm.core.persistence.PersistenceProvider;
041import org.nuxeo.ecm.core.persistence.PersistenceProviderFactory;
042import org.nuxeo.ecm.core.query.sql.model.OrderByExpr;
043import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder;
044import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
045import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
046import org.nuxeo.ecm.platform.audit.api.LogEntry;
047import org.nuxeo.ecm.platform.audit.api.OrderByExprs;
048import org.nuxeo.ecm.platform.audit.impl.ExtendedInfoImpl;
049import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
050import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.model.DefaultComponent;
053import org.nuxeo.runtime.transaction.TransactionHelper;
054
055import com.fasterxml.jackson.databind.ObjectMapper;
056
057/**
058 * Contains the Hibernate based (legacy) implementation
059 *
060 * @author tiry
061 */
062public class DefaultAuditBackend extends AbstractAuditBackend {
063
064    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
065
066    protected PersistenceProvider persistenceProvider;
067
068    protected CursorService<Iterator<LogEntry>, LogEntry, String> cursorService;
069
070    public DefaultAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
071        super(component, config);
072        activatePersistenceProvider();
073    }
074
075    /**
076     * @since 9.3
077     */
078    public DefaultAuditBackend() {
079        super();
080    }
081
082    @Override
083    public int getApplicationStartedOrder() {
084        DefaultComponent component = (DefaultComponent) Framework.getRuntime().getComponent(
085                "org.nuxeo.ecm.core.persistence.PersistenceComponent");
086        return component.getApplicationStartedOrder() + 1;
087    }
088
089    @Override
090    public void onApplicationStarted() {
091        activatePersistenceProvider();
092        cursorService = new CursorService<>(entry -> {
093            try {
094                return OBJECT_MAPPER.writeValueAsString(entry);
095            } catch (IOException e) {
096                throw new NuxeoException("Unable to serialize entry");
097            }
098        });
099    }
100
101    @Override
102    public void onApplicationStopped() {
103        try {
104            persistenceProvider.closePersistenceUnit();
105        } finally {
106            persistenceProvider = null;
107        }
108    }
109
110    // public for testing purpose !
111    public PersistenceProvider getOrCreatePersistenceProvider() {
112        if (persistenceProvider == null) {
113            activatePersistenceProvider();
114        }
115        return persistenceProvider;
116    }
117
118    protected void activatePersistenceProvider() {
119        Thread thread = Thread.currentThread();
120        ClassLoader last = thread.getContextClassLoader();
121        try {
122            thread.setContextClassLoader(PersistenceProvider.class.getClassLoader());
123            PersistenceProviderFactory persistenceProviderFactory = Framework.getService(
124                    PersistenceProviderFactory.class);
125            persistenceProvider = persistenceProviderFactory.newProvider("nxaudit-logs");
126            persistenceProvider.openPersistenceUnit();
127        } finally {
128            thread.setContextClassLoader(last);
129        }
130    }
131
132    protected <T> T apply(boolean needActivateSession, Function<LogEntryProvider, T> function) {
133        return getOrCreatePersistenceProvider().run(Boolean.valueOf(needActivateSession), em -> {
134            return function.apply(createProvider(em));
135        });
136    }
137
138    protected void accept(boolean needActivateSession, Consumer<LogEntryProvider> consumer) {
139        getOrCreatePersistenceProvider().run(Boolean.valueOf(needActivateSession), em -> {
140            consumer.accept(createProvider(em));
141        });
142    }
143
144    @Override
145    public void addLogEntries(final List<LogEntry> entries) {
146        if (entries.isEmpty()) {
147            return;
148        }
149        TransactionHelper.runInTransaction(() -> accept(true, provider -> provider.addLogEntries(entries)));
150    }
151
152    @Override
153    public List<LogEntry> getLogEntriesFor(final String uuid, final String repositoryId) {
154        return apply(false, provider -> provider.getLogEntriesFor(uuid, repositoryId));
155    }
156
157    @Override
158    public List<LogEntry> getLogEntriesFor(final String uuid) {
159        return apply(false, provider -> provider.getLogEntriesFor(uuid));
160    }
161
162    @Override
163    public List<LogEntry> getLogEntriesFor(final String uuid, final Map<String, FilterMapEntry> filterMap,
164            final boolean doDefaultSort) {
165        return apply(false, provider -> provider.getLogEntriesFor(uuid, filterMap, doDefaultSort));
166    }
167
168    @Override
169    public LogEntry getLogEntryByID(final long id) {
170        return apply(false, provider -> provider.getLogEntryByID(id));
171    }
172
173    @Override
174    public List<LogEntry> nativeQueryLogs(final String whereClause, final int pageNb, final int pageSize) {
175        return apply(false, provider -> provider.nativeQueryLogs(whereClause, pageNb, pageSize));
176    }
177
178    @Override
179    public List<?> nativeQuery(final String query, final int pageNb, final int pageSize) {
180        return apply(false, provider -> provider.nativeQuery(query, pageNb, pageSize));
181    }
182
183    @Override
184    public List<?> nativeQuery(final String query, final Map<String, Object> params, final int pageNb,
185            final int pageSize) {
186        return apply(false, provider -> provider.nativeQuery(query, params, pageNb, pageSize));
187    }
188
189    @Override
190    public List<LogEntry> queryLogs(AuditQueryBuilder builder) {
191        return apply(false, provider -> provider.queryLogs(builder));
192    }
193
194    @Override
195    public List<LogEntry> queryLogs(final String[] eventIds, final String dateRange) {
196        return apply(false, provider -> provider.queryLogs(eventIds, dateRange));
197    }
198
199    @Override
200    public List<LogEntry> queryLogsByPage(final String[] eventIds, final Date limit, final String[] category,
201            final String path, final int pageNb, final int pageSize) {
202        return apply(false, provider -> provider.queryLogsByPage(eventIds, limit, category, path, pageNb, pageSize));
203    }
204
205    @Override
206    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
207        return apply(false, provider -> syncLogCreationEntries(provider, repoId, path, recurs));
208    }
209
210    @Override
211    public Long getEventsCount(final String eventId) {
212        return apply(false, provider -> provider.countEventsById(eventId));
213    }
214
215    public List<String> getLoggedEventIds() {
216        return apply(false, LogEntryProvider::findEventIds);
217    }
218
219    @Override
220    public ExtendedInfo newExtendedInfo(Serializable value) {
221        return ExtendedInfoImpl.createExtendedInfo(value);
222    }
223
224    @Override
225    public long getLatestLogId(String repositoryId, String... eventIds) {
226        Map<String, Object> params = getParams(eventIds);
227        String paramNames = getParamNames(eventIds);
228        params.put("repoId", repositoryId);
229        String query = String.format("FROM LogEntry log" //
230                + " WHERE log.eventId IN (%s)" //
231                + "   AND log.repositoryId = :repoId" //
232                + " ORDER BY log.id DESC", paramNames);
233        @SuppressWarnings("unchecked")
234        List<LogEntry> entries = (List<LogEntry>) nativeQuery(query, params, 1, 1);
235        return entries.isEmpty() ? 0 : entries.get(0).getId();
236    }
237
238    @Override
239    @SuppressWarnings("unchecked")
240    public List<LogEntry> getLogEntriesAfter(long logIdOffset, int limit, String repositoryId, String... eventIds) {
241        Map<String, Object> params = getParams(eventIds);
242        String paramNames = getParamNames(eventIds);
243        params.put("repoId", repositoryId);
244        params.put("minId", Long.valueOf(logIdOffset));
245        String query = String.format("FROM LogEntry log" //
246                + " WHERE log.id >= :minId" //
247                + "   AND log.eventId IN (%s)" //
248                + "   AND log.repositoryId = :repoId" //
249                + " ORDER BY log.id", paramNames);
250        return (List<LogEntry>) nativeQuery(query, params, 1, limit);
251    }
252
253    protected String getParamNames(String[] eventId) {
254        List<String> ret = new ArrayList<>(eventId.length);
255        for (String event : eventId) {
256            ret.add(":ev" + event);
257        }
258        return String.join(",", ret);
259    }
260
261    protected Map<String, Object> getParams(String[] eventId) {
262        HashMap<String, Object> ret = new HashMap<>(eventId.length);
263        for (String event : eventId) {
264            ret.put("ev" + event, event);
265        }
266        return ret;
267    }
268
269    @Override
270    public void append(List<String> jsonEntries) {
271        List<LogEntry> entries = new ArrayList<>();
272        for (String json : jsonEntries) {
273            try {
274                LogEntryImpl entry = OBJECT_MAPPER.readValue(json, LogEntryImpl.class);
275                if (entry.getId() == 0) {
276                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
277                }
278                entries.add(entry);
279            } catch (IOException e) {
280                throw new NuxeoException("Unable to deserialize json entries", e);
281            }
282        }
283        accept(false, provider -> provider.append(entries));
284    }
285
286    @Override
287    public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
288        // as we're using pages to scroll audit, we need to add an order to make results across pages deterministic
289        builder.orders(OrderByExprs.asc(LOG_ID), builder.orders().toArray(new OrderByExpr[0]));
290        String scrollId = cursorService.registerCursorResult(
291                new SQLAuditCursorResult(builder, batchSize, keepAliveSeconds));
292        return scroll(scrollId);
293    }
294
295    @Override
296    public ScrollResult<String> scroll(String scrollId) {
297        return cursorService.scroll(scrollId);
298    }
299
300    public class SQLAuditCursorResult extends CursorResult<Iterator<LogEntry>, LogEntry> {
301
302        protected final AuditQueryBuilder builder;
303
304        protected long pageNb;
305
306        protected boolean end;
307
308        public SQLAuditCursorResult(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
309            super(Collections.emptyIterator(), batchSize, keepAliveSeconds);
310            this.builder = builder;
311            this.pageNb = 0;
312        }
313
314        @Override
315        public boolean hasNext() {
316            if (cursor == null || end) {
317                return false;
318            } else if (cursor.hasNext()) {
319                return true;
320            } else {
321                runNextPage();
322                return !end;
323            }
324        }
325
326        @Override
327        public LogEntry next() {
328            if (cursor != null && !cursor.hasNext() && !end) {
329                // try to run a next scroll
330                runNextPage();
331            }
332            return super.next();
333        }
334
335        protected void runNextPage() {
336            builder.offset(pageNb++ * batchSize).limit(batchSize);
337            cursor = queryLogs(builder).iterator();
338            end = !cursor.hasNext();
339        }
340
341        @Override
342        public void close() {
343            end = true;
344            // Call super close to clear cursor
345            super.close();
346        }
347
348    }
349
350}