001/*
002 * Copyright (c) 2007-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 *     Thierry Martins
012 */
013package org.nuxeo.ecm.core.scheduler;
014
015import java.io.IOException;
016import java.io.InputStream;
017import java.io.Serializable;
018import java.net.URL;
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.Properties;
023import java.util.Set;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.nuxeo.ecm.core.api.NuxeoException;
028import org.nuxeo.runtime.RuntimeServiceEvent;
029import org.nuxeo.runtime.RuntimeServiceListener;
030import org.nuxeo.runtime.api.Framework;
031import org.nuxeo.runtime.model.ComponentContext;
032import org.nuxeo.runtime.model.DefaultComponent;
033import org.nuxeo.runtime.model.Extension;
034import org.nuxeo.runtime.model.RuntimeContext;
035import org.quartz.CronScheduleBuilder;
036import org.quartz.JobBuilder;
037import org.quartz.JobDataMap;
038import org.quartz.JobDetail;
039import org.quartz.JobKey;
040import org.quartz.ObjectAlreadyExistsException;
041import org.quartz.Scheduler;
042import org.quartz.SchedulerException;
043import org.quartz.Trigger;
044import org.quartz.TriggerBuilder;
045import org.quartz.impl.StdSchedulerFactory;
046import org.quartz.impl.matchers.GroupMatcher;
047
048/**
049 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
050 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
051 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
052 * quartz table even other node is running.
053 */
054public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService, RuntimeServiceListener {
055
056    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
057
058    protected RuntimeContext bundle;
059
060    protected Scheduler scheduler;
061
062    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
063
064    /**
065     * @since 7.10
066     */
067    private Map<String, JobKey> jobKeys = new HashMap<String, JobKey>();
068
069    @Override
070    public void activate(ComponentContext context) {
071        log.debug("Activate");
072        bundle = context.getRuntimeContext();
073    }
074
075    protected void setupScheduler(ComponentContext context) throws IOException, SchedulerException {
076        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
077        URL cfg = context.getRuntimeContext().getResource("config/quartz.properties");
078        if (cfg != null) {
079            InputStream stream = cfg.openStream();
080            try {
081                schedulerFactory.initialize(stream);
082            } finally {
083                stream.close();
084            }
085        } else {
086            // use default config (unit tests)
087            Properties props = new Properties();
088            props.put("org.quartz.scheduler.instanceName", "Quartz");
089            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
090            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
091            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
092            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
093            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
094            props.put("org.quartz.threadPool.threadCount", "1");
095            props.put("org.quartz.threadPool.threadPriority", "4");
096            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
097            schedulerFactory.initialize(props);
098        }
099        scheduler = schedulerFactory.getScheduler();
100        scheduler.start();
101        // server = MBeanServerFactory.createMBeanServer();
102        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
103        // quartzObjectName);
104
105        // clean up all nuxeo jobs
106        // https://jira.nuxeo.com/browse/NXP-7303
107        GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo");
108        Set<JobKey> jobs = scheduler.getJobKeys(matcher);
109        scheduler.deleteJobs(new ArrayList<JobKey>(jobs));
110        for (Schedule each : registry.getSchedules()) {
111            registerSchedule(each);
112        }
113        log.info("scheduler started");
114    }
115
116    protected void shutdownScheduler() {
117        if (scheduler == null) {
118            return;
119        }
120        try {
121            scheduler.shutdown();
122        } catch (SchedulerException cause) {
123            log.error("Cannot shutdown scheduler", cause);
124        } finally {
125            scheduler = null;
126        }
127    }
128
129    @Override
130    public void deactivate(ComponentContext context) {
131        log.debug("Deactivate");
132        shutdownScheduler();
133    }
134
135    @Override
136    public void applicationStarted(ComponentContext context) {
137        Framework.addListener(this);
138        try {
139            setupScheduler(context);
140        } catch (IOException | SchedulerException e) {
141            throw new NuxeoException(e);
142        }
143    }
144
145    @Override
146    public boolean hasApplicationStarted() {
147        return scheduler != null;
148    }
149
150    @Override
151    public void registerExtension(Extension extension) {
152        Object[] contribs = extension.getContributions();
153        for (Object contrib : contribs) {
154            registerSchedule((Schedule) contrib);
155        }
156    }
157
158    @Override
159    public void unregisterExtension(Extension extension) {
160        // do nothing to do ;
161        // clean up will be done when service is activated
162        // see https://jira.nuxeo.com/browse/NXP-7303
163    }
164
165    public RuntimeContext getContext() {
166        return bundle;
167    }
168
169    @Override
170    public void registerSchedule(Schedule schedule) {
171        registerSchedule(schedule, null);
172    }
173
174    @Override
175    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
176        registry.addContribution(schedule);
177        if (scheduler == null) {
178            return;
179        }
180        Schedule contributed = registry.getSchedule(schedule);
181        if (contributed != null) {
182            schedule(contributed, parameters);
183        } else {
184            unschedule(schedule.getId());
185        }
186    }
187
188    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
189        log.info("Registering " + schedule);
190
191        JobDataMap map = new JobDataMap();
192        if (parameters != null) {
193            map.putAll(parameters);
194        }
195        JobDetail job = JobBuilder.newJob(EventJob.class)
196                .withIdentity(schedule.getId(), "nuxeo")
197                .usingJobData(map)
198                .usingJobData("eventId", schedule.getEventId())
199                .usingJobData("eventCategory", schedule.getEventCategory())
200                .usingJobData("username", schedule.getUsername())
201                .build();
202
203        Trigger trigger = TriggerBuilder.newTrigger()
204                .withIdentity(schedule.getId(), "nuxeo")
205                .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression()))
206                .build();
207
208        // This is useful when testing to avoid multiple threads:
209        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
210
211        try {
212            scheduler.scheduleJob(job, trigger);
213            jobKeys.put(schedule.getId(), job.getKey());
214        } catch (ObjectAlreadyExistsException e) {
215            log.trace("Overriding scheduler with id: " + schedule.getId());
216            // when jobs are persisted in a database, the job should already
217            // be there
218            // remove existing job and re-schedule
219            boolean unregistred = unregisterSchedule(schedule.getId());
220            if (unregistred) {
221                try {
222                    scheduler.scheduleJob(job, trigger);
223                } catch (SchedulerException e1) {
224                    log.error(
225                            String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()),
226                            e);
227                }
228            }
229
230        } catch (SchedulerException e) {
231            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
232        }
233    }
234
235    @Override
236    public boolean unregisterSchedule(String id) {
237        log.info("Unregistering schedule with id" + id);
238        Schedule schedule = registry.getSchedule(id);
239        if (schedule == null) {
240            return false;
241        }
242        registry.removeContribution(schedule, true);
243        return unschedule(id);
244    }
245
246    protected boolean unschedule(String jobId) {
247        try {
248            return scheduler.deleteJob(jobKeys.get(jobId));
249        } catch (SchedulerException e) {
250            log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e);
251        }
252        return false;
253    }
254
255    @Override
256    public boolean unregisterSchedule(Schedule schedule) {
257        return unregisterSchedule(schedule.getId());
258    }
259
260    @Override
261    public void handleEvent(RuntimeServiceEvent event) {
262        if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) {
263            return;
264        }
265        Framework.removeListener(this);
266        shutdownScheduler();
267    }
268
269}