001/*
002 * (C) Copyright 2007-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Thierry Martins
019 */
020package org.nuxeo.ecm.core.scheduler;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.Serializable;
025import java.net.URL;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Properties;
030import java.util.Set;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.NuxeoException;
035import org.nuxeo.runtime.RuntimeServiceEvent;
036import org.nuxeo.runtime.RuntimeServiceListener;
037import org.nuxeo.runtime.api.Framework;
038import org.nuxeo.runtime.model.ComponentContext;
039import org.nuxeo.runtime.model.DefaultComponent;
040import org.nuxeo.runtime.model.Extension;
041import org.nuxeo.runtime.model.RuntimeContext;
042import org.quartz.CronScheduleBuilder;
043import org.quartz.JobBuilder;
044import org.quartz.JobDataMap;
045import org.quartz.JobDetail;
046import org.quartz.JobKey;
047import org.quartz.ObjectAlreadyExistsException;
048import org.quartz.Scheduler;
049import org.quartz.SchedulerException;
050import org.quartz.Trigger;
051import org.quartz.TriggerBuilder;
052import org.quartz.impl.StdSchedulerFactory;
053import org.quartz.impl.matchers.GroupMatcher;
054
055/**
056 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
057 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
058 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
059 * quartz table even other node is running.
060 */
061public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService, RuntimeServiceListener {
062
063    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
064
065    protected RuntimeContext context;
066
067    protected Scheduler scheduler;
068
069    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
070
071    /**
072     * @since 7.10
073     */
074    private Map<String, JobKey> jobKeys = new HashMap<String, JobKey>();
075
076    @Override
077    public void activate(ComponentContext context) {
078        log.debug("Activate");
079        this.context = context.getRuntimeContext();
080    }
081
082    protected void setupScheduler() throws IOException, SchedulerException {
083        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
084        URL cfg = context.getResource("config/quartz.properties");
085        if (cfg != null) {
086            InputStream stream = cfg.openStream();
087            try {
088                schedulerFactory.initialize(stream);
089            } finally {
090                stream.close();
091            }
092        } else {
093            // use default config (unit tests)
094            Properties props = new Properties();
095            props.put("org.quartz.scheduler.instanceName", "Quartz");
096            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
097            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
098            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
099            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
100            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
101            props.put("org.quartz.threadPool.threadCount", "1");
102            props.put("org.quartz.threadPool.threadPriority", "4");
103            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
104            schedulerFactory.initialize(props);
105        }
106        scheduler = schedulerFactory.getScheduler();
107        scheduler.start();
108        // server = MBeanServerFactory.createMBeanServer();
109        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
110        // quartzObjectName);
111
112        // clean up all nuxeo jobs
113        // https://jira.nuxeo.com/browse/NXP-7303
114        GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo");
115        Set<JobKey> jobs = scheduler.getJobKeys(matcher);
116        scheduler.deleteJobs(new ArrayList<JobKey>(jobs));
117        for (Schedule each : registry.getSchedules()) {
118            registerSchedule(each);
119        }
120        log.info("scheduler started");
121    }
122
123    protected void shutdownScheduler() {
124        if (scheduler == null) {
125            return;
126        }
127        try {
128            scheduler.shutdown();
129        } catch (SchedulerException cause) {
130            log.error("Cannot shutdown scheduler", cause);
131        } finally {
132            scheduler = null;
133        }
134    }
135
136    @Override
137    public void deactivate(ComponentContext context) {
138        log.debug("Deactivate");
139        shutdownScheduler();
140    }
141
142    @Override
143    public void applicationStarted(ComponentContext context) {
144        Framework.addListener(this);
145        try {
146            setupScheduler();
147        } catch (IOException | SchedulerException e) {
148            throw new NuxeoException(e);
149        }
150    }
151
152    @Override
153    public boolean hasApplicationStarted() {
154        return scheduler != null;
155    }
156
157    @Override
158    public void registerExtension(Extension extension) {
159        Object[] contribs = extension.getContributions();
160        for (Object contrib : contribs) {
161            registerSchedule((Schedule) contrib);
162        }
163    }
164
165    @Override
166    public void unregisterExtension(Extension extension) {
167        // do nothing to do ;
168        // clean up will be done when service is activated
169        // see https://jira.nuxeo.com/browse/NXP-7303
170    }
171
172    public RuntimeContext getContext() {
173        return context;
174    }
175
176    @Override
177    public void registerSchedule(Schedule schedule) {
178        registerSchedule(schedule, null);
179    }
180
181    @Override
182    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
183        registry.addContribution(schedule);
184        if (scheduler == null) {
185            return;
186        }
187        Schedule contributed = registry.getSchedule(schedule);
188        if (contributed != null) {
189            schedule(contributed, parameters);
190        } else {
191            unschedule(schedule.getId());
192        }
193    }
194
195    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
196        log.info("Registering " + schedule);
197
198        JobDataMap map = new JobDataMap();
199        if (parameters != null) {
200            map.putAll(parameters);
201        }
202        JobDetail job = JobBuilder.newJob(EventJob.class)
203                .withIdentity(schedule.getId(), "nuxeo")
204                .usingJobData(map)
205                .usingJobData("eventId", schedule.getEventId())
206                .usingJobData("eventCategory", schedule.getEventCategory())
207                .usingJobData("username", schedule.getUsername())
208                .build();
209
210        Trigger trigger = TriggerBuilder.newTrigger()
211                .withIdentity(schedule.getId(), "nuxeo")
212                .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression()))
213                .build();
214
215        // This is useful when testing to avoid multiple threads:
216        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
217
218        try {
219            scheduler.scheduleJob(job, trigger);
220            jobKeys.put(schedule.getId(), job.getKey());
221        } catch (ObjectAlreadyExistsException e) {
222            log.trace("Overriding scheduler with id: " + schedule.getId());
223            // when jobs are persisted in a database, the job should already
224            // be there
225            // remove existing job and re-schedule
226            boolean unregistred = unregisterSchedule(schedule.getId());
227            if (unregistred) {
228                try {
229                    scheduler.scheduleJob(job, trigger);
230                } catch (SchedulerException e1) {
231                    log.error(
232                            String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()),
233                            e);
234                }
235            }
236
237        } catch (SchedulerException e) {
238            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
239        }
240    }
241
242    @Override
243    public boolean unregisterSchedule(String id) {
244        log.info("Unregistering schedule with id" + id);
245        Schedule schedule = registry.getSchedule(id);
246        if (schedule == null) {
247            return false;
248        }
249        registry.removeContribution(schedule, true);
250        return unschedule(id);
251    }
252
253    protected boolean unschedule(String jobId) {
254        try {
255            return scheduler.deleteJob(jobKeys.get(jobId));
256        } catch (SchedulerException e) {
257            log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e);
258        }
259        return false;
260    }
261
262    @Override
263    public boolean unregisterSchedule(Schedule schedule) {
264        return unregisterSchedule(schedule.getId());
265    }
266
267    @Override
268    public void handleEvent(RuntimeServiceEvent event) {
269        if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) {
270            return;
271        }
272        Framework.removeListener(this);
273        shutdownScheduler();
274    }
275
276}