001/*
002 * (C) Copyright 2007-2010 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.scheduler;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.text.ParseException;
024import java.util.Map;
025import java.util.Properties;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.core.api.NuxeoException;
030import org.nuxeo.runtime.RuntimeServiceEvent;
031import org.nuxeo.runtime.RuntimeServiceListener;
032import org.nuxeo.runtime.api.Framework;
033import org.nuxeo.runtime.model.ComponentContext;
034import org.nuxeo.runtime.model.DefaultComponent;
035import org.nuxeo.runtime.model.Extension;
036import org.nuxeo.runtime.model.RuntimeContext;
037import org.quartz.CronTrigger;
038import org.quartz.JobDataMap;
039import org.quartz.JobDetail;
040import org.quartz.ObjectAlreadyExistsException;
041import org.quartz.Scheduler;
042import org.quartz.SchedulerException;
043import org.quartz.Trigger;
044import org.quartz.impl.StdSchedulerFactory;
045
046/**
047 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
048 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
049 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
050 * quartz table even other node is running.
051 */
052public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService, RuntimeServiceListener {
053
054    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
055
056    protected RuntimeContext bundle;
057
058    protected Scheduler scheduler;
059
060    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
061
062    @Override
063    public void activate(ComponentContext context) {
064        log.debug("Activate");
065        bundle = context.getRuntimeContext();
066    }
067
068    protected void setupScheduler(ComponentContext context) throws IOException, SchedulerException {
069        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
070        URL cfg = context.getRuntimeContext().getResource("config/quartz.properties");
071        if (cfg != null) {
072            InputStream stream = cfg.openStream();
073            try {
074                schedulerFactory.initialize(stream);
075            } finally {
076                stream.close();
077            }
078        } else {
079            // use default config (unit tests)
080            Properties props = new Properties();
081            props.put("org.quartz.scheduler.instanceName", "Quartz");
082            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
083            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
084            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
085            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
086            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
087            props.put("org.quartz.threadPool.threadCount", "1");
088            props.put("org.quartz.threadPool.threadPriority", "4");
089            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
090            schedulerFactory.initialize(props);
091        }
092        scheduler = schedulerFactory.getScheduler();
093        scheduler.start();
094        // server = MBeanServerFactory.createMBeanServer();
095        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
096        // quartzObjectName);
097
098        // clean up all nuxeo jobs
099        // https://jira.nuxeo.com/browse/NXP-7303
100        String[] jobs = scheduler.getJobNames("nuxeo");
101        for (String job : jobs) {
102            unschedule(job);
103        }
104        for (Schedule each : registry.getSchedules()) {
105            registerSchedule(each);
106        }
107        log.info("scheduler started");
108    }
109
110    protected void shutdownScheduler() {
111        if (scheduler == null) {
112            return;
113        }
114        try {
115            scheduler.shutdown();
116        } catch (SchedulerException cause) {
117            log.error("Cannot shutdown scheduler", cause);
118        } finally {
119            scheduler = null;
120        }
121    }
122
123    @Override
124    public void deactivate(ComponentContext context) {
125        log.debug("Deactivate");
126        shutdownScheduler();
127    }
128
129    @Override
130    public void applicationStarted(ComponentContext context) {
131        Framework.addListener(this);
132        try {
133            setupScheduler(context);
134        } catch (IOException | SchedulerException e) {
135            throw new NuxeoException(e);
136        }
137    }
138
139    @Override
140    public boolean hasApplicationStarted() {
141        return scheduler != null;
142    }
143
144    @Override
145    public void registerExtension(Extension extension) {
146        Object[] contribs = extension.getContributions();
147        for (Object contrib : contribs) {
148            registerSchedule((Schedule) contrib);
149        }
150    }
151
152    @Override
153    public void unregisterExtension(Extension extension) {
154        // do nothing to do ;
155        // clean up will be done when service is activated
156        // see https://jira.nuxeo.com/browse/NXP-7303
157    }
158
159    public RuntimeContext getContext() {
160        return bundle;
161    }
162
163    @Override
164    public void registerSchedule(Schedule schedule) {
165        registerSchedule(schedule, null);
166    }
167
168    @Override
169    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
170        registry.addContribution(schedule);
171        if (scheduler == null) {
172            return;
173        }
174        Schedule contributed = registry.getSchedule(schedule);
175        if (contributed != null) {
176            schedule(contributed, parameters);
177        } else {
178            unschedule(schedule.getId());
179        }
180    }
181
182    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
183        log.info("Registering " + schedule);
184        JobDetail job = new JobDetail(schedule.getId(), "nuxeo", EventJob.class);
185        JobDataMap map = job.getJobDataMap();
186        map.put("eventId", schedule.getEventId());
187        map.put("eventCategory", schedule.getEventCategory());
188        map.put("username", schedule.getUsername());
189
190        if (parameters != null) {
191            map.putAll(parameters);
192        }
193
194        Trigger trigger;
195        try {
196            trigger = new CronTrigger(schedule.getId(), "nuxeo", schedule.getCronExpression());
197        } catch (ParseException e) {
198            log.error(String.format("invalid cron expresion '%s' for schedule '%s'", schedule.getCronExpression(),
199                    schedule.getId()), e);
200            return;
201        }
202        // This is useful when testing to avoid multiple threads:
203        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
204
205        try {
206            scheduler.scheduleJob(job, trigger);
207        } catch (ObjectAlreadyExistsException e) {
208            log.trace("Overriding scheduler with id: " + schedule.getId());
209            // when jobs are persisted in a database, the job should already
210            // be there
211            // remove existing job and re-schedule
212            boolean unregistred = unregisterSchedule(schedule.getId());
213            if (unregistred) {
214                try {
215                    scheduler.scheduleJob(job, trigger);
216                } catch (SchedulerException e1) {
217                    log.error(
218                            String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()),
219                            e);
220                }
221            }
222
223        } catch (SchedulerException e) {
224            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
225        }
226    }
227
228    @Override
229    public boolean unregisterSchedule(String id) {
230        log.info("Unregistering schedule with id" + id);
231        Schedule schedule = registry.getSchedule(id);
232        if (schedule == null) {
233            return false;
234        }
235        registry.removeContribution(schedule, true);
236        return unschedule(id);
237    }
238
239    protected boolean unschedule(String jobId) {
240        try {
241            return scheduler.deleteJob(jobId, "nuxeo");
242        } catch (SchedulerException e) {
243            log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e);
244        }
245        return false;
246    }
247
248    @Override
249    public boolean unregisterSchedule(Schedule schedule) {
250        return unregisterSchedule(schedule.getId());
251    }
252
253    @Override
254    public void handleEvent(RuntimeServiceEvent event) {
255        if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) {
256            return;
257        }
258        Framework.removeListener(this);
259        shutdownScheduler();
260    }
261
262}