001/*
002 * (C) Copyright 2007-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Thierry Martins
019 */
020package org.nuxeo.ecm.core.scheduler;
021
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.Properties;
031import java.util.Set;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.common.Environment;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.runtime.model.ComponentContext;
038import org.nuxeo.runtime.model.DefaultComponent;
039import org.nuxeo.runtime.model.Extension;
040import org.nuxeo.runtime.model.RuntimeContext;
041import org.quartz.CronScheduleBuilder;
042import org.quartz.JobBuilder;
043import org.quartz.JobDataMap;
044import org.quartz.JobDetail;
045import org.quartz.JobKey;
046import org.quartz.ObjectAlreadyExistsException;
047import org.quartz.Scheduler;
048import org.quartz.SchedulerException;
049import org.quartz.Trigger;
050import org.quartz.TriggerBuilder;
051import org.quartz.impl.StdSchedulerFactory;
052import org.quartz.impl.jdbcjobstore.LockException;
053import org.quartz.impl.matchers.GroupMatcher;
054
055/**
056 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
057 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
058 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
059 * quartz table even other node is running.
060 */
061public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService {
062
063    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
064
065    protected RuntimeContext context;
066
067    protected Scheduler scheduler;
068
069    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
070
071    /**
072     * @since 7.10
073     */
074    private Map<String, JobKey> jobKeys = new HashMap<>();
075
076    @Override
077    public void activate(ComponentContext context) {
078        log.debug("Activate");
079        this.context = context.getRuntimeContext();
080    }
081
082    protected void setupScheduler() throws IOException, SchedulerException {
083        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
084        File file = new File(Environment.getDefault().getConfig(), "quartz.properties");
085        if (file.exists()) {
086            try (InputStream stream = new FileInputStream(file)) {
087                schedulerFactory.initialize(stream);
088            }
089        } else {
090            // use default config (unit tests)
091            Properties props = new Properties();
092            props.put("org.quartz.scheduler.instanceName", "Quartz");
093            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
094            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
095            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
096            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
097            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
098            props.put("org.quartz.threadPool.threadCount", "1");
099            props.put("org.quartz.threadPool.threadPriority", "4");
100            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
101            schedulerFactory.initialize(props);
102        }
103        scheduler = schedulerFactory.getScheduler();
104        scheduler.start();
105        // server = MBeanServerFactory.createMBeanServer();
106        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
107        // quartzObjectName);
108
109        // clean up all nuxeo jobs
110        // https://jira.nuxeo.com/browse/NXP-7303
111        GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo");
112        Set<JobKey> jobs = scheduler.getJobKeys(matcher);
113        try {
114            scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies
115            for (Schedule each : registry.getSchedules()) {
116                registerSchedule(each);
117            }
118        } catch (LockException cause) {
119            log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause);
120        }
121        log.info("scheduler started");
122    }
123
124    protected void shutdownScheduler() {
125        if (scheduler == null) {
126            return;
127        }
128        try {
129            scheduler.shutdown();
130        } catch (SchedulerException cause) {
131            log.error("Cannot shutdown scheduler", cause);
132        } finally {
133            scheduler = null;
134        }
135    }
136
137    @Override
138    public void deactivate(ComponentContext context) {
139        log.debug("Deactivate");
140        shutdownScheduler();
141    }
142
143    @Override
144    public void start(ComponentContext context) {
145        try {
146            setupScheduler();
147        } catch (IOException | SchedulerException e) {
148            throw new NuxeoException(e);
149        }
150    }
151
152    @Override
153    public void stop(ComponentContext context) {
154        try {
155            scheduler.standby();
156        } catch (SchedulerException cause) {
157            log.error("Cannot put scheduler in stand by mode", cause);
158        }
159    }
160
161    @Override
162    public boolean hasApplicationStarted() {
163        return scheduler != null;
164    }
165
166    @Override
167    public void registerExtension(Extension extension) {
168        Object[] contribs = extension.getContributions();
169        for (Object contrib : contribs) {
170            registerSchedule((Schedule) contrib);
171        }
172    }
173
174    @Override
175    public void unregisterExtension(Extension extension) {
176        // do nothing to do ;
177        // clean up will be done when service is activated
178        // see https://jira.nuxeo.com/browse/NXP-7303
179    }
180
181    public RuntimeContext getContext() {
182        return context;
183    }
184
185    @Override
186    public void registerSchedule(Schedule schedule) {
187        registerSchedule(schedule, null);
188    }
189
190    @Override
191    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
192        registry.addContribution(schedule);
193        if (scheduler == null) {
194            return;
195        }
196        Schedule contributed = registry.getSchedule(schedule);
197        if (contributed != null) {
198            schedule(contributed, parameters);
199        } else {
200            unschedule(schedule.getId());
201        }
202    }
203
204    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
205        log.info("Registering " + schedule);
206
207        JobDataMap map = new JobDataMap();
208        if (parameters != null) {
209            map.putAll(parameters);
210        }
211        JobDetail job = JobBuilder.newJob(EventJob.class)
212                .withIdentity(schedule.getId(), "nuxeo")
213                .usingJobData(map)
214                .usingJobData("eventId", schedule.getEventId())
215                .usingJobData("eventCategory", schedule.getEventCategory())
216                .usingJobData("username", schedule.getUsername())
217                .build();
218
219        Trigger trigger = TriggerBuilder.newTrigger()
220                .withIdentity(schedule.getId(), "nuxeo")
221                .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression()))
222                .build();
223
224        // This is useful when testing to avoid multiple threads:
225        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
226
227        try {
228            scheduler.scheduleJob(job, trigger);
229            jobKeys.put(schedule.getId(), job.getKey());
230        } catch (ObjectAlreadyExistsException e) {
231            log.trace("Overriding scheduler with id: " + schedule.getId());
232            // when jobs are persisted in a database, the job should already
233            // be there
234            // remove existing job and re-schedule
235            boolean unregistred = unregisterSchedule(schedule.getId());
236            if (unregistred) {
237                try {
238                    scheduler.scheduleJob(job, trigger);
239                } catch (SchedulerException e1) {
240                    log.error(
241                            String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()),
242                            e);
243                }
244            }
245
246        } catch (SchedulerException e) {
247            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
248        }
249    }
250
251    @Override
252    public boolean unregisterSchedule(String id) {
253        log.info("Unregistering schedule with id" + id);
254        Schedule schedule = registry.getSchedule(id);
255        if (schedule == null) {
256            return false;
257        }
258        registry.removeContribution(schedule, true);
259        return unschedule(id);
260    }
261
262    protected boolean unschedule(String jobId) {
263        try {
264            return scheduler.deleteJob(jobKeys.get(jobId));
265        } catch (SchedulerException e) {
266            log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e);
267        }
268        return false;
269    }
270
271    @Override
272    public boolean unregisterSchedule(Schedule schedule) {
273        return unregisterSchedule(schedule.getId());
274    }
275
276}