001/*
002 * (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thomas Roger <troger@nuxeo.com>
018 */
019package org.nuxeo.ecm.activity;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Date;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.MissingResourceException;
031import java.util.regex.Matcher;
032import java.util.regex.Pattern;
033
034import javax.persistence.EntityManager;
035import javax.persistence.Query;
036
037import org.apache.commons.lang3.StringUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.common.utils.i18n.I18NUtils;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.persistence.PersistenceProvider;
043import org.nuxeo.ecm.core.persistence.PersistenceProviderFactory;
044import org.nuxeo.ecm.core.repository.RepositoryInitializationHandler;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.model.ComponentContext;
047import org.nuxeo.runtime.model.ComponentInstance;
048import org.nuxeo.runtime.model.DefaultComponent;
049
050/**
051 * Default implementation of {@link ActivityStreamService}.
052 *
053 * @author <a href="mailto:troger@nuxeo.com">Thomas Roger</a>
054 * @since 5.5
055 */
056public class ActivityStreamServiceImpl extends DefaultComponent implements ActivityStreamService {
057
058    private static final Log log = LogFactory.getLog(ActivityStreamServiceImpl.class);
059
060    public static final String ACTIVITIES_PROVIDER = "nxactivities";
061
062    public static final String ACTIVITY_STREAM_FILTER_EP = "activityStreamFilters";
063
064    public static final String ACTIVITY_STREAMS_EP = "activityStreams";
065
066    public static final String ACTIVITY_VERBS_EP = "activityVerbs";
067
068    public static final String ACTIVITY_LINK_BUILDERS_EP = "activityLinkBuilders";
069
070    public static final String ACTIVITY_UPGRADERS_EP = "activityUpgraders";
071
072    protected final ThreadLocal<EntityManager> localEntityManager = new ThreadLocal<>();
073
074    protected final Map<String, ActivityStreamFilter> activityStreamFilters = new HashMap<>();
075
076    protected ActivityStreamRegistry activityStreamRegistry;
077
078    protected ActivityVerbRegistry activityVerbRegistry;
079
080    protected ActivityLinkBuilderRegistry activityLinkBuilderRegistry;
081
082    protected ActivityUpgraderRegistry activityUpgraderRegistry;
083
084    protected PersistenceProvider persistenceProvider;
085
086    protected RepositoryInitializationHandler initializationHandler;
087
088    public void upgradeActivities() {
089        for (final ActivityUpgrader upgrader : activityUpgraderRegistry.getOrderedActivityUpgraders()) {
090            try {
091                getOrCreatePersistenceProvider().run(false, em -> {
092                    upgradeActivities(em, upgrader);
093                });
094            } catch (NuxeoException e) {
095                log.error(String.format("Error while running '%s' activity upgrader: %s", upgrader.getName(),
096                        e.getMessage()));
097                log.debug(e, e);
098            }
099        }
100    }
101
102    protected void upgradeActivities(EntityManager em, ActivityUpgrader upgrader) {
103        try {
104            localEntityManager.set(em);
105            upgrader.doUpgrade(this);
106        } finally {
107            localEntityManager.remove();
108        }
109    }
110
111    @Override
112    public ActivitiesList query(String filterId, final Map<String, Serializable> parameters) {
113        return query(filterId, parameters, 0, 0);
114    }
115
116    @Override
117    public ActivitiesList query(String filterId, final Map<String, Serializable> parameters, final long offset,
118            final long limit) {
119        if (ALL_ACTIVITIES.equals(filterId)) {
120            return queryAll(offset, limit);
121        }
122
123        final ActivityStreamFilter filter = activityStreamFilters.get(filterId);
124        if (filter == null) {
125            throw new NuxeoException(String.format("Unable to retrieve '%s' ActivityStreamFilter", filterId));
126        }
127
128        return query(filter, parameters, offset, limit);
129    }
130
131    protected ActivitiesList query(final ActivityStreamFilter filter, final Map<String, Serializable> parameters,
132            final long offset, final long limit) {
133        return getOrCreatePersistenceProvider().run(false, em -> {
134            return query(em, filter, parameters, offset, limit);
135        });
136    }
137
138    protected ActivitiesList query(EntityManager em, ActivityStreamFilter filter, Map<String, Serializable> parameters,
139            long offset, long limit) {
140        try {
141            localEntityManager.set(em);
142            return filter.query(this, parameters, offset, limit);
143        } finally {
144            localEntityManager.remove();
145        }
146
147    }
148
149    protected ActivitiesList queryAll(final long offset, final long limit) {
150        return getOrCreatePersistenceProvider().run(false, em -> {
151            return queryAll(em, offset, limit);
152        });
153    }
154
155    @SuppressWarnings("unchecked")
156    protected ActivitiesList queryAll(EntityManager em, long offset, long limit) {
157        Query query = em.createQuery("select activity from Activity activity order by activity.id asc");
158        if (limit > 0) {
159            query.setMaxResults((int) limit);
160        }
161        if (offset > 0) {
162            query.setFirstResult((int) offset);
163        }
164        return new ActivitiesListImpl(query.getResultList());
165    }
166
167    @Override
168    public Activity addActivity(final Activity activity) {
169        if (activity.getPublishedDate() == null) {
170            activity.setPublishedDate(new Date());
171        }
172        getOrCreatePersistenceProvider().run(true, em -> {
173            addActivity(em, activity);
174        });
175        return activity;
176    }
177
178    protected void addActivity(EntityManager em, Activity activity) {
179        try {
180            localEntityManager.set(em);
181            em.persist(activity);
182            for (ActivityStreamFilter filter : activityStreamFilters.values()) {
183                if (filter.isInterestedIn(activity)) {
184                    filter.handleNewActivity(this, activity);
185                }
186            }
187        } finally {
188            localEntityManager.remove();
189        }
190    }
191
192    @Override
193    public void removeActivities(final Collection<Activity> activities) {
194        if (activities == null || activities.isEmpty()) {
195            return;
196        }
197        getOrCreatePersistenceProvider().run(true, em -> {
198            removeActivities(em, activities);
199        });
200    }
201
202    protected void removeActivities(EntityManager em, Collection<Activity> activities) {
203        try {
204            localEntityManager.set(em);
205
206            ActivitiesList l = new ActivitiesListImpl(activities);
207            for (ActivityStreamFilter filter : activityStreamFilters.values()) {
208                filter.handleRemovedActivities(this, l);
209            }
210
211            Query query = em.createQuery("delete from Activity activity where activity.id in (:ids)");
212            query.setParameter("ids", l.toActivityIds());
213            query.executeUpdate();
214        } finally {
215            localEntityManager.remove();
216        }
217    }
218
219    @Override
220    public ActivityMessage toActivityMessage(final Activity activity, Locale locale) {
221        return toActivityMessage(activity, locale, null);
222    }
223
224    @Override
225    public ActivityMessage toActivityMessage(Activity activity, Locale locale, String activityLinkBuilderName) {
226        ActivityLinkBuilder activityLinkBuilder = getActivityLinkBuilder(activityLinkBuilderName);
227
228        Map<String, String> fields = activity.toMap();
229
230        String actor = activity.getActor();
231        String displayActor = activity.getDisplayActor();
232        String displayActorLink;
233        if (ActivityHelper.isUser(actor)) {
234            displayActorLink = activityLinkBuilder.getUserProfileLink(actor, activity.getDisplayActor());
235        } else {
236            displayActorLink = activity.getDisplayActor();
237        }
238
239        List<ActivityReplyMessage> activityReplyMessages = toActivityReplyMessages(activity.getActivityReplies(),
240                locale, activityLinkBuilderName);
241
242        ActivityVerb verb = activityVerbRegistry.get(activity.getVerb());
243
244        if (verb == null || verb.getLabelKey() == null) {
245            return new ActivityMessage(activity.getId(), actor, displayActor, displayActorLink, activity.getVerb(),
246                    activity.toString(), activity.getPublishedDate(), null, activityReplyMessages);
247        }
248
249        String labelKey = verb.getLabelKey();
250        String messageTemplate;
251        try {
252            messageTemplate = I18NUtils.getMessageString("messages", labelKey, null, locale);
253        } catch (MissingResourceException e) {
254            log.error(e.getMessage());
255            log.debug(e, e);
256            // just return the labelKey if we have no resource bundle
257            return new ActivityMessage(activity.getId(), actor, displayActor, displayActorLink, activity.getVerb(),
258                    labelKey, activity.getPublishedDate(), verb.getIcon(), activityReplyMessages);
259        }
260
261        Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}");
262        Matcher m = pattern.matcher(messageTemplate);
263        while (m.find()) {
264            String param = m.group().replaceAll("[\\|$\\|{\\}]", "");
265            if (fields.containsKey(param)) {
266                String value = fields.get(param);
267                final String displayValue = fields.get("display" + StringUtils.capitalize(param));
268                if (ActivityHelper.isDocument(value)) {
269                    value = activityLinkBuilder.getDocumentLink(value, displayValue);
270                } else if (ActivityHelper.isUser(value)) {
271                    value = activityLinkBuilder.getUserProfileLink(value, displayValue);
272                } else {
273                    // simple text
274                    value = ActivityMessageHelper.replaceURLsByLinks(value);
275                }
276                messageTemplate = messageTemplate.replace(m.group(), value);
277            }
278        }
279
280        return new ActivityMessage(activity.getId(), actor, displayActor, displayActorLink, activity.getVerb(),
281                messageTemplate, activity.getPublishedDate(), verb.getIcon(), activityReplyMessages);
282    }
283
284    @Override
285    public ActivityLinkBuilder getActivityLinkBuilder(String name) {
286        ActivityLinkBuilder activityLinkBuilder;
287        if (StringUtils.isBlank(name)) {
288            activityLinkBuilder = activityLinkBuilderRegistry.getDefaultActivityLinkBuilder();
289        } else {
290            activityLinkBuilder = activityLinkBuilderRegistry.get(name);
291            if (activityLinkBuilder == null) {
292                log.warn("Fallback on default Activity link builder");
293                activityLinkBuilder = activityLinkBuilderRegistry.getDefaultActivityLinkBuilder();
294            }
295        }
296        return activityLinkBuilder;
297    }
298
299    @Override
300    public ActivityReplyMessage toActivityReplyMessage(ActivityReply activityReply, Locale locale) {
301        return toActivityReplyMessage(activityReply, locale, null);
302    }
303
304    @Override
305    public ActivityReplyMessage toActivityReplyMessage(ActivityReply activityReply, Locale locale,
306            String activityLinkBuilderName) {
307        ActivityLinkBuilder activityLinkBuilder = getActivityLinkBuilder(activityLinkBuilderName);
308
309        String actor = activityReply.getActor();
310        String displayActor = activityReply.getDisplayActor();
311        String displayActorLink = activityLinkBuilder.getUserProfileLink(actor, displayActor);
312        String message = ActivityMessageHelper.replaceURLsByLinks(activityReply.getMessage());
313        return new ActivityReplyMessage(activityReply.getId(), actor, displayActor, displayActorLink, message,
314                activityReply.getPublishedDate());
315
316    }
317
318    private List<ActivityReplyMessage> toActivityReplyMessages(List<ActivityReply> replies, Locale locale,
319            String activityLinkBuilderName) {
320        List<ActivityReplyMessage> activityReplyMessages = new ArrayList<>();
321        for (ActivityReply reply : replies) {
322            activityReplyMessages.add(toActivityReplyMessage(reply, locale, activityLinkBuilderName));
323        }
324        return activityReplyMessages;
325    }
326
327    @Override
328    public ActivityStream getActivityStream(String name) {
329        return activityStreamRegistry.get(name);
330    }
331
332    @Override
333    public ActivityReply addActivityReply(Serializable activityId, ActivityReply activityReply) {
334        Activity activity = getActivity(activityId);
335        if (activity != null) {
336            List<ActivityReply> replies = activity.getActivityReplies();
337            String newReplyId = computeNewReplyId(activity);
338            activityReply.setId(newReplyId);
339            replies.add(activityReply);
340            activity.setActivityReplies(replies);
341            updateActivity(activity);
342        }
343        return activityReply;
344    }
345
346    /**
347     * @since 5.6
348     */
349    protected String computeNewReplyId(Activity activity) {
350        String replyIdPrefix = activity.getId() + "-reply-";
351        List<ActivityReply> replies = activity.getActivityReplies();
352        long maxId = 0;
353        for (ActivityReply reply : replies) {
354            String replyId = reply.getId();
355            long currentId = Long.parseLong(replyId.replace(replyIdPrefix, ""));
356            if (currentId > maxId) {
357                maxId = currentId;
358            }
359        }
360        return replyIdPrefix + (maxId + 1);
361    }
362
363    public Activity getActivity(final Serializable activityId) {
364        return getOrCreatePersistenceProvider().run(false, em -> {
365            return getActivity(em, activityId);
366        });
367    }
368
369    public ActivitiesList getActivities(final Collection<Serializable> activityIds) {
370        return getOrCreatePersistenceProvider().run(false, em -> {
371            return getActivities(em, activityIds);
372        });
373    }
374
375    @Override
376    public ActivityReply removeActivityReply(final Serializable activityId, final String activityReplyId) {
377        return getOrCreatePersistenceProvider().run(true, em -> {
378            return removeActivityReply(em, activityId, activityReplyId);
379        });
380    }
381
382    /**
383     * @since 5.6
384     */
385    protected ActivityReply removeActivityReply(EntityManager em, Serializable activityId, String activityReplyId) {
386        try {
387            localEntityManager.set(em);
388
389            Activity activity = getActivity(activityId);
390            if (activity != null) {
391                List<ActivityReply> replies = activity.getActivityReplies();
392                for (Iterator<ActivityReply> it = replies.iterator(); it.hasNext();) {
393                    ActivityReply reply = it.next();
394                    if (reply.getId().equals(activityReplyId)) {
395                        for (ActivityStreamFilter filter : activityStreamFilters.values()) {
396                            filter.handleRemovedActivityReply(this, activity, reply);
397                        }
398                        it.remove();
399                        activity.setActivityReplies(replies);
400                        updateActivity(activity);
401                        return reply;
402                    }
403                }
404            }
405            return null;
406        } finally {
407            localEntityManager.remove();
408        }
409    }
410
411    protected Activity getActivity(EntityManager em, Serializable activityId) {
412        Query query = em.createQuery("select activity from Activity activity where activity.id = :activityId");
413        query.setParameter("activityId", activityId);
414        return (Activity) query.getSingleResult();
415    }
416
417    @SuppressWarnings("unchecked")
418    protected ActivitiesList getActivities(EntityManager em, Collection<Serializable> activityIds) {
419        Query query = em.createQuery("select activity from Activity activity where activity.id in (:ids)");
420        query.setParameter("ids", activityIds);
421        return new ActivitiesListImpl(query.getResultList());
422    }
423
424    protected void updateActivity(final Activity activity) {
425        getOrCreatePersistenceProvider().run(false, em -> {
426            activity.setLastUpdatedDate(new Date());
427            return em.merge(activity);
428        });
429    }
430
431    public EntityManager getEntityManager() {
432        return localEntityManager.get();
433    }
434
435    public PersistenceProvider getOrCreatePersistenceProvider() {
436        if (persistenceProvider == null) {
437            activatePersistenceProvider();
438        }
439        return persistenceProvider;
440    }
441
442    protected void activatePersistenceProvider() {
443        Thread thread = Thread.currentThread();
444        ClassLoader last = thread.getContextClassLoader();
445        try {
446            thread.setContextClassLoader(PersistenceProvider.class.getClassLoader());
447            PersistenceProviderFactory persistenceProviderFactory = Framework.getService(
448                    PersistenceProviderFactory.class);
449            persistenceProvider = persistenceProviderFactory.newProvider(ACTIVITIES_PROVIDER);
450            persistenceProvider.openPersistenceUnit();
451        } finally {
452            thread.setContextClassLoader(last);
453        }
454    }
455
456    protected void deactivatePersistenceProvider() {
457        if (persistenceProvider != null) {
458            persistenceProvider.closePersistenceUnit();
459            persistenceProvider = null;
460        }
461    }
462
463    @Override
464    public void activate(ComponentContext context) {
465        super.activate(context);
466        activityStreamRegistry = new ActivityStreamRegistry();
467        activityVerbRegistry = new ActivityVerbRegistry();
468        activityLinkBuilderRegistry = new ActivityLinkBuilderRegistry();
469        activityUpgraderRegistry = new ActivityUpgraderRegistry();
470
471        initializationHandler = new ActivityRepositoryInitializationHandler();
472        initializationHandler.install();
473    }
474
475    @Override
476    public void deactivate(ComponentContext context) {
477        deactivatePersistenceProvider();
478
479        if (initializationHandler != null) {
480            initializationHandler.uninstall();
481        }
482
483        super.deactivate(context);
484    }
485
486    @Override
487    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
488        if (ACTIVITY_STREAM_FILTER_EP.equals(extensionPoint)) {
489            registerActivityStreamFilter((ActivityStreamFilterDescriptor) contribution);
490        } else if (ACTIVITY_STREAMS_EP.equals(extensionPoint)) {
491            registerActivityStream((ActivityStream) contribution);
492        } else if (ACTIVITY_VERBS_EP.equals(extensionPoint)) {
493            registerActivityVerb((ActivityVerb) contribution);
494        } else if (ACTIVITY_LINK_BUILDERS_EP.equals(extensionPoint)) {
495            registerActivityLinkBuilder((ActivityLinkBuilderDescriptor) contribution);
496        } else if (ACTIVITY_UPGRADERS_EP.equals(extensionPoint)) {
497            registerActivityUpgrader((ActivityUpgraderDescriptor) contribution);
498        }
499    }
500
501    private void registerActivityStreamFilter(ActivityStreamFilterDescriptor descriptor) {
502        ActivityStreamFilter filter = descriptor.getActivityStreamFilter();
503
504        String filterId = filter.getId();
505
506        boolean enabled = descriptor.isEnabled();
507        if (activityStreamFilters.containsKey(filterId)) {
508            log.info("Overriding activity stream filter with id " + filterId);
509            if (!enabled) {
510                activityStreamFilters.remove(filterId);
511                log.info("Disabled activity stream filter with id " + filterId);
512            }
513        }
514        if (enabled) {
515            log.info("Registering activity stream filter with id " + filterId);
516            activityStreamFilters.put(filterId, descriptor.getActivityStreamFilter());
517        }
518    }
519
520    private void registerActivityStream(ActivityStream activityStream) {
521        log.info(String.format("Registering activity stream '%s'", activityStream.getName()));
522        activityStreamRegistry.addContribution(activityStream);
523    }
524
525    private void registerActivityVerb(ActivityVerb activityVerb) {
526        log.info(String.format("Registering activity verb '%s'", activityVerb.getVerb()));
527        activityVerbRegistry.addContribution(activityVerb);
528    }
529
530    private void registerActivityLinkBuilder(ActivityLinkBuilderDescriptor activityLinkBuilderDescriptor) {
531        log.info(String.format("Registering activity link builder '%s'", activityLinkBuilderDescriptor.getName()));
532        activityLinkBuilderRegistry.addContribution(activityLinkBuilderDescriptor);
533    }
534
535    private void registerActivityUpgrader(ActivityUpgraderDescriptor activityUpgraderDescriptor) {
536        log.info(String.format("Registering activity upgrader '%s'", activityUpgraderDescriptor.getName()));
537        activityUpgraderRegistry.addContribution(activityUpgraderDescriptor);
538    }
539
540    @Override
541    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
542        if (ACTIVITY_STREAM_FILTER_EP.equals(extensionPoint)) {
543            unregisterActivityStreamFilter((ActivityStreamFilterDescriptor) contribution);
544        } else if (ACTIVITY_STREAMS_EP.equals(extensionPoint)) {
545            unregisterActivityStream((ActivityStream) contribution);
546        } else if (ACTIVITY_VERBS_EP.equals(extensionPoint)) {
547            unregisterActivityVerb((ActivityVerb) contribution);
548        } else if (ACTIVITY_LINK_BUILDERS_EP.equals(extensionPoint)) {
549            unregisterActivityLinkBuilder((ActivityLinkBuilderDescriptor) contribution);
550        } else if (ACTIVITY_UPGRADERS_EP.equals(extensionPoint)) {
551            unregisterActivityUpgrader((ActivityUpgraderDescriptor) contribution);
552        }
553    }
554
555    private void unregisterActivityStreamFilter(ActivityStreamFilterDescriptor descriptor) {
556        ActivityStreamFilter filter = descriptor.getActivityStreamFilter();
557        String filterId = filter.getId();
558        activityStreamFilters.remove(filterId);
559        log.info("Unregistering activity stream filter with id " + filterId);
560    }
561
562    private void unregisterActivityStream(ActivityStream activityStream) {
563        activityStreamRegistry.removeContribution(activityStream);
564        log.info(String.format("Unregistering activity stream '%s'", activityStream.getName()));
565    }
566
567    private void unregisterActivityVerb(ActivityVerb activityVerb) {
568        activityVerbRegistry.removeContribution(activityVerb);
569        log.info(String.format("Unregistering activity verb '%s'", activityVerb.getVerb()));
570    }
571
572    private void unregisterActivityLinkBuilder(ActivityLinkBuilderDescriptor activityLinkBuilderDescriptor) {
573        activityLinkBuilderRegistry.removeContribution(activityLinkBuilderDescriptor);
574        log.info(String.format("Unregistering activity link builder '%s'", activityLinkBuilderDescriptor.getName()));
575    }
576
577    private void unregisterActivityUpgrader(ActivityUpgraderDescriptor activityUpgraderDescriptor) {
578        activityUpgraderRegistry.removeContribution(activityUpgraderDescriptor);
579        log.info(String.format("Unregistering activity upgrader '%s'", activityUpgraderDescriptor.getName()));
580    }
581
582}