001/*
002 * (C) Copyright 2007-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Thierry Martins
019 *     Florent Munch
020 */
021package org.nuxeo.ecm.core.scheduler;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.Serializable;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.Properties;
032import java.util.Set;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.common.Environment;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.runtime.model.ComponentContext;
039import org.nuxeo.runtime.model.DefaultComponent;
040import org.nuxeo.runtime.model.Extension;
041import org.nuxeo.runtime.model.RuntimeContext;
042import org.quartz.JobDetail;
043import org.quartz.JobKey;
044import org.quartz.ObjectAlreadyExistsException;
045import org.quartz.Scheduler;
046import org.quartz.SchedulerException;
047import org.quartz.Trigger;
048import org.quartz.impl.StdSchedulerFactory;
049import org.quartz.impl.jdbcjobstore.LockException;
050import org.quartz.impl.matchers.GroupMatcher;
051
052/**
053 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
054 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
055 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
056 * quartz table even other node is running.
057 */
058public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService {
059
060    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
061
062    protected RuntimeContext context;
063
064    protected Scheduler scheduler;
065
066    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
067
068    /**
069     * @since 7.10
070     */
071    private Map<String, JobKey> jobKeys = new HashMap<>();
072
073    @Override
074    public void activate(ComponentContext context) {
075        log.debug("Activate");
076        this.context = context.getRuntimeContext();
077    }
078
079    protected void setupScheduler() throws IOException, SchedulerException {
080        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
081        File file = new File(Environment.getDefault().getConfig(), "quartz.properties");
082        if (file.exists()) {
083            try (InputStream stream = new FileInputStream(file)) {
084                schedulerFactory.initialize(stream);
085            }
086        } else {
087            // use default config (unit tests)
088            Properties props = new Properties();
089            props.put("org.quartz.scheduler.instanceName", "Quartz");
090            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
091            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
092            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
093            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
094            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
095            props.put("org.quartz.threadPool.threadCount", "1");
096            props.put("org.quartz.threadPool.threadPriority", "4");
097            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
098            schedulerFactory.initialize(props);
099        }
100        scheduler = schedulerFactory.getScheduler();
101        scheduler.start();
102        // server = MBeanServerFactory.createMBeanServer();
103        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
104        // quartzObjectName);
105
106        // clean up all nuxeo jobs
107        // https://jira.nuxeo.com/browse/NXP-7303
108        GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo");
109        Set<JobKey> jobs = scheduler.getJobKeys(matcher);
110        try {
111            scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies
112            for (Schedule each : registry.getSchedules()) {
113                registerSchedule(each);
114            }
115        } catch (LockException cause) {
116            log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause);
117        }
118        log.info("scheduler started");
119    }
120
121    protected void shutdownScheduler() {
122        if (scheduler == null) {
123            return;
124        }
125        try {
126            scheduler.shutdown();
127        } catch (SchedulerException cause) {
128            log.error("Cannot shutdown scheduler", cause);
129        } finally {
130            scheduler = null;
131        }
132    }
133
134    @Override
135    public void deactivate(ComponentContext context) {
136        log.debug("Deactivate");
137        shutdownScheduler();
138    }
139
140    @Override
141    public void start(ComponentContext context) {
142        try {
143            setupScheduler();
144        } catch (IOException | SchedulerException e) {
145            throw new NuxeoException(e);
146        }
147    }
148
149    @Override
150    public void stop(ComponentContext context) {
151        try {
152            scheduler.standby();
153        } catch (SchedulerException cause) {
154            log.error("Cannot put scheduler in stand by mode", cause);
155        }
156    }
157
158    @Override
159    public boolean hasApplicationStarted() {
160        return scheduler != null;
161    }
162
163    @Override
164    public void registerExtension(Extension extension) {
165        Object[] contribs = extension.getContributions();
166        for (Object contrib : contribs) {
167            registerSchedule((Schedule) contrib);
168        }
169    }
170
171    @Override
172    public void unregisterExtension(Extension extension) {
173        // do nothing to do ;
174        // clean up will be done when service is activated
175        // see https://jira.nuxeo.com/browse/NXP-7303
176    }
177
178    public RuntimeContext getContext() {
179        return context;
180    }
181
182    @Override
183    public void registerSchedule(Schedule schedule) {
184        registerSchedule(schedule, null);
185    }
186
187    @Override
188    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
189        registry.addContribution(schedule);
190        if (scheduler == null) {
191            return;
192        }
193        Schedule contributed = registry.getSchedule(schedule);
194        if (contributed != null) {
195            schedule(contributed, parameters);
196        } else {
197            unschedule(schedule.getId());
198        }
199    }
200
201    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
202        log.info("Registering " + schedule);
203
204        EventJobFactory jobFactory = schedule.getJobFactory();
205        JobDetail job = jobFactory.buildJob(schedule, parameters).build();
206        Trigger trigger = jobFactory.buildTrigger(schedule).build();
207
208        // This is useful when testing to avoid multiple threads:
209        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
210
211        try {
212            scheduler.scheduleJob(job, trigger);
213            jobKeys.put(schedule.getId(), job.getKey());
214        } catch (ObjectAlreadyExistsException e) {
215            log.trace("Overriding scheduler with id: " + schedule.getId());
216            // when jobs are persisted in a database, the job should already
217            // be there
218            // remove existing job and re-schedule
219            boolean unregistred = unregisterSchedule(schedule.getId());
220            if (unregistred) {
221                try {
222                    scheduler.scheduleJob(job, trigger);
223                } catch (SchedulerException e1) {
224                    log.error(
225                            String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()),
226                            e);
227                }
228            }
229
230        } catch (SchedulerException e) {
231            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
232        }
233    }
234
235    @Override
236    public boolean unregisterSchedule(String id) {
237        log.info("Unregistering schedule with id" + id);
238        Schedule schedule = registry.getSchedule(id);
239        if (schedule == null) {
240            return false;
241        }
242        registry.removeContribution(schedule, true);
243        return unschedule(id);
244    }
245
246    protected boolean unschedule(String jobId) {
247        try {
248            return scheduler.deleteJob(jobKeys.get(jobId));
249        } catch (SchedulerException e) {
250            log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e);
251        }
252        return false;
253    }
254
255    @Override
256    public boolean unregisterSchedule(Schedule schedule) {
257        return unregisterSchedule(schedule.getId());
258    }
259
260}