001/*
002 * (C) Copyright 2007-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Thierry Martins
019 */
020package org.nuxeo.ecm.core.scheduler;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.Serializable;
025import java.net.URL;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Properties;
030import java.util.Set;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.NuxeoException;
035import org.nuxeo.runtime.model.ComponentContext;
036import org.nuxeo.runtime.model.DefaultComponent;
037import org.nuxeo.runtime.model.Extension;
038import org.nuxeo.runtime.model.RuntimeContext;
039import org.quartz.CronScheduleBuilder;
040import org.quartz.JobBuilder;
041import org.quartz.JobDataMap;
042import org.quartz.JobDetail;
043import org.quartz.JobKey;
044import org.quartz.ObjectAlreadyExistsException;
045import org.quartz.Scheduler;
046import org.quartz.SchedulerException;
047import org.quartz.Trigger;
048import org.quartz.TriggerBuilder;
049import org.quartz.impl.StdSchedulerFactory;
050import org.quartz.impl.jdbcjobstore.LockException;
051import org.quartz.impl.matchers.GroupMatcher;
052
053/**
054 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
055 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
056 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
057 * quartz table even other node is running.
058 */
059public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService {
060
061    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
062
063    protected RuntimeContext context;
064
065    protected Scheduler scheduler;
066
067    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
068
069    /**
070     * @since 7.10
071     */
072    private Map<String, JobKey> jobKeys = new HashMap<>();
073
074    @Override
075    public void activate(ComponentContext context) {
076        log.debug("Activate");
077        this.context = context.getRuntimeContext();
078    }
079
080    protected void setupScheduler() throws IOException, SchedulerException {
081        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
082        URL cfg = context.getResource("config/quartz.properties");
083        if (cfg != null) {
084            try (InputStream stream = cfg.openStream()) {
085                schedulerFactory.initialize(stream);
086            }
087        } else {
088            // use default config (unit tests)
089            Properties props = new Properties();
090            props.put("org.quartz.scheduler.instanceName", "Quartz");
091            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
092            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
093            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
094            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
095            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
096            props.put("org.quartz.threadPool.threadCount", "1");
097            props.put("org.quartz.threadPool.threadPriority", "4");
098            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
099            schedulerFactory.initialize(props);
100        }
101        scheduler = schedulerFactory.getScheduler();
102        scheduler.start();
103        // server = MBeanServerFactory.createMBeanServer();
104        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
105        // quartzObjectName);
106
107        // clean up all nuxeo jobs
108        // https://jira.nuxeo.com/browse/NXP-7303
109        GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo");
110        Set<JobKey> jobs = scheduler.getJobKeys(matcher);
111        try {
112            scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies
113            for (Schedule each : registry.getSchedules()) {
114                registerSchedule(each);
115            }
116        } catch (LockException cause) {
117            log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause);
118        }
119        log.info("scheduler started");
120    }
121
122    protected void shutdownScheduler() {
123        if (scheduler == null) {
124            return;
125        }
126        try {
127            scheduler.shutdown();
128        } catch (SchedulerException cause) {
129            log.error("Cannot shutdown scheduler", cause);
130        } finally {
131            scheduler = null;
132        }
133    }
134
135    @Override
136    public void deactivate(ComponentContext context) {
137        log.debug("Deactivate");
138        shutdownScheduler();
139    }
140
141    @Override
142    public void start(ComponentContext context) {
143        try {
144            setupScheduler();
145        } catch (IOException | SchedulerException e) {
146            throw new NuxeoException(e);
147        }
148    }
149
150    @Override
151    public void stop(ComponentContext context) {
152        try {
153            scheduler.standby();
154        } catch (SchedulerException cause) {
155            log.error("Cannot put scheduler in stand by mode", cause);
156        }
157    }
158
159    @Override
160    public boolean hasApplicationStarted() {
161        return scheduler != null;
162    }
163
164    @Override
165    public void registerExtension(Extension extension) {
166        Object[] contribs = extension.getContributions();
167        for (Object contrib : contribs) {
168            registerSchedule((Schedule) contrib);
169        }
170    }
171
172    @Override
173    public void unregisterExtension(Extension extension) {
174        // do nothing to do ;
175        // clean up will be done when service is activated
176        // see https://jira.nuxeo.com/browse/NXP-7303
177    }
178
179    public RuntimeContext getContext() {
180        return context;
181    }
182
183    @Override
184    public void registerSchedule(Schedule schedule) {
185        registerSchedule(schedule, null);
186    }
187
188    @Override
189    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
190        registry.addContribution(schedule);
191        if (scheduler == null) {
192            return;
193        }
194        Schedule contributed = registry.getSchedule(schedule);
195        if (contributed != null) {
196            schedule(contributed, parameters);
197        } else {
198            unschedule(schedule.getId());
199        }
200    }
201
202    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
203        log.info("Registering " + schedule);
204
205        JobDataMap map = new JobDataMap();
206        if (parameters != null) {
207            map.putAll(parameters);
208        }
209        JobDetail job = JobBuilder.newJob(EventJob.class)
210                .withIdentity(schedule.getId(), "nuxeo")
211                .usingJobData(map)
212                .usingJobData("eventId", schedule.getEventId())
213                .usingJobData("eventCategory", schedule.getEventCategory())
214                .usingJobData("username", schedule.getUsername())
215                .build();
216
217        Trigger trigger = TriggerBuilder.newTrigger()
218                .withIdentity(schedule.getId(), "nuxeo")
219                .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression()))
220                .build();
221
222        // This is useful when testing to avoid multiple threads:
223        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
224
225        try {
226            scheduler.scheduleJob(job, trigger);
227            jobKeys.put(schedule.getId(), job.getKey());
228        } catch (ObjectAlreadyExistsException e) {
229            log.trace("Overriding scheduler with id: " + schedule.getId());
230            // when jobs are persisted in a database, the job should already
231            // be there
232            // remove existing job and re-schedule
233            boolean unregistred = unregisterSchedule(schedule.getId());
234            if (unregistred) {
235                try {
236                    scheduler.scheduleJob(job, trigger);
237                } catch (SchedulerException e1) {
238                    log.error(
239                            String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()),
240                            e);
241                }
242            }
243
244        } catch (SchedulerException e) {
245            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
246        }
247    }
248
249    @Override
250    public boolean unregisterSchedule(String id) {
251        log.info("Unregistering schedule with id" + id);
252        Schedule schedule = registry.getSchedule(id);
253        if (schedule == null) {
254            return false;
255        }
256        registry.removeContribution(schedule, true);
257        return unschedule(id);
258    }
259
260    protected boolean unschedule(String jobId) {
261        try {
262            return scheduler.deleteJob(jobKeys.get(jobId));
263        } catch (SchedulerException e) {
264            log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e);
265        }
266        return false;
267    }
268
269    @Override
270    public boolean unregisterSchedule(Schedule schedule) {
271        return unregisterSchedule(schedule.getId());
272    }
273
274}