001/*
002 * (C) Copyright 2007-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Thierry Martins
019 *     Florent Munch
020 */
021package org.nuxeo.ecm.core.scheduler;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.Serializable;
028import java.time.Duration;
029import java.util.ArrayList;
030import java.util.HashMap;
031import java.util.Map;
032import java.util.Properties;
033import java.util.Set;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.common.Environment;
038import org.nuxeo.common.utils.DurationUtils;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.runtime.api.Framework;
041import org.nuxeo.runtime.cluster.ClusterService;
042import org.nuxeo.runtime.model.ComponentContext;
043import org.nuxeo.runtime.model.DefaultComponent;
044import org.nuxeo.runtime.model.Extension;
045import org.nuxeo.runtime.model.RuntimeContext;
046import org.quartz.JobDetail;
047import org.quartz.JobKey;
048import org.quartz.ObjectAlreadyExistsException;
049import org.quartz.Scheduler;
050import org.quartz.SchedulerException;
051import org.quartz.Trigger;
052import org.quartz.impl.StdSchedulerFactory;
053import org.quartz.impl.jdbcjobstore.LockException;
054import org.quartz.impl.matchers.GroupMatcher;
055
056/**
057 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see
058 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes.
059 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in
060 * quartz table even other node is running.
061 */
062public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService {
063
064    private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class);
065
066    /** @since 11.1 */
067    public static final String CLUSTER_START_DURATION_PROP = "org.nuxeo.scheduler.cluster.start.duration";
068
069    /** @since 11.1 */
070    public static final Duration CLUSTER_START_DURATION_DEFAULT = Duration.ofMinutes(1);
071
072    protected RuntimeContext context;
073
074    protected Scheduler scheduler;
075
076    protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry();
077
078    /**
079     * @since 7.10
080     */
081    private Map<String, JobKey> jobKeys = new HashMap<>();
082
083    @Override
084    public void activate(ComponentContext context) {
085        log.debug("Activate");
086        this.context = context.getRuntimeContext();
087    }
088
089    protected void setupScheduler() throws IOException, SchedulerException {
090        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
091        File file = new File(Environment.getDefault().getConfig(), "quartz.properties");
092        if (file.exists()) {
093            try (InputStream stream = new FileInputStream(file)) {
094                schedulerFactory.initialize(stream);
095            }
096        } else {
097            // use default config (unit tests)
098            Properties props = new Properties();
099            props.put("org.quartz.scheduler.instanceName", "Quartz");
100            props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler");
101            props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED");
102            props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
103            props.put("org.quartz.scheduler.skipUpdateCheck", "true");
104            props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
105            props.put("org.quartz.threadPool.threadCount", "1");
106            props.put("org.quartz.threadPool.threadPriority", "4");
107            props.put("org.quartz.threadPool.makeThreadsDaemons", "true");
108            schedulerFactory.initialize(props);
109        }
110        scheduler = schedulerFactory.getScheduler();
111        scheduler.start();
112        // server = MBeanServerFactory.createMBeanServer();
113        // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService",
114        // quartzObjectName);
115
116        // clean up all nuxeo jobs
117        // https://jira.nuxeo.com/browse/NXP-7303
118        GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo");
119        Set<JobKey> jobs = scheduler.getJobKeys(matcher);
120        try {
121            scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies
122            for (Schedule each : registry.getSchedules()) {
123                registerSchedule(each);
124            }
125        } catch (LockException cause) {
126            log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause);
127        }
128        log.info("scheduler started");
129    }
130
131    protected void shutdownScheduler() {
132        if (scheduler == null) {
133            return;
134        }
135        try {
136            scheduler.shutdown();
137        } catch (SchedulerException cause) {
138            log.error("Cannot shutdown scheduler", cause);
139        } finally {
140            scheduler = null;
141        }
142    }
143
144    @Override
145    public void deactivate(ComponentContext context) {
146        log.debug("Deactivate");
147        shutdownScheduler();
148    }
149
150    @Override
151    public void start(ComponentContext context) {
152        startScheduler();
153    }
154
155    protected void startScheduler() {
156        ClusterService clusterService = Framework.getService(ClusterService.class);
157        String prop = Framework.getProperty(CLUSTER_START_DURATION_PROP);
158        Duration duration = DurationUtils.parsePositive(prop, CLUSTER_START_DURATION_DEFAULT);
159        Duration pollDelay = Duration.ofSeconds(1);
160        clusterService.runAtomically("start-scheduler", duration, pollDelay, () -> {
161            try {
162                setupScheduler();
163            } catch (IOException | SchedulerException e) {
164                throw new NuxeoException(e);
165            }
166        });
167    }
168
169    @Override
170    public void stop(ComponentContext context) {
171        try {
172            scheduler.standby();
173        } catch (SchedulerException cause) {
174            log.error("Cannot put scheduler in stand by mode", cause);
175        }
176    }
177
178    @Override
179    public boolean hasApplicationStarted() {
180        return scheduler != null;
181    }
182
183    @Override
184    public void registerExtension(Extension extension) {
185        Object[] contribs = extension.getContributions();
186        for (Object contrib : contribs) {
187            registerSchedule((Schedule) contrib);
188        }
189    }
190
191    @Override
192    public void unregisterExtension(Extension extension) {
193        // do nothing to do ;
194        // clean up will be done when service is activated
195        // see https://jira.nuxeo.com/browse/NXP-7303
196    }
197
198    public RuntimeContext getContext() {
199        return context;
200    }
201
202    @Override
203    public void registerSchedule(Schedule schedule) {
204        registerSchedule(schedule, null);
205    }
206
207    @Override
208    public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) {
209        registry.addContribution(schedule);
210        if (scheduler == null) {
211            return;
212        }
213        Schedule contributed = registry.getSchedule(schedule);
214        if (contributed != null) {
215            schedule(contributed, parameters);
216        } else {
217            unschedule(schedule.getId());
218        }
219    }
220
221    protected void schedule(Schedule schedule, Map<String, Serializable> parameters) {
222        log.info("Registering " + schedule);
223
224        EventJobFactory jobFactory = schedule.getJobFactory();
225        JobDetail job = jobFactory.buildJob(schedule, parameters).build();
226        Trigger trigger = jobFactory.buildTrigger(schedule).build();
227
228        // This is useful when testing to avoid multiple threads:
229        // trigger = new SimpleTrigger(schedule.getId(), "nuxeo");
230
231        try {
232            try {
233                schedule(schedule.getId(), job, trigger);
234            } catch (ObjectAlreadyExistsException e) {
235                // when jobs are persisted in a database, the job should already be there
236                // remove existing job and re-schedule
237                log.trace("Overriding scheduler with id: " + schedule.getId());
238                boolean unregistered = unschedule(schedule.getId(), job.getKey());
239                if (unregistered) {
240                    schedule(schedule.getId(), job, trigger);
241                }
242            }
243        } catch (SchedulerException e) {
244            log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e);
245        }
246    }
247
248    @Override
249    public boolean unregisterSchedule(String id) {
250        log.info("Unregistering schedule with id" + id);
251        Schedule schedule = registry.getSchedule(id);
252        if (schedule == null) {
253            return false;
254        }
255        registry.removeContribution(schedule, true);
256        return unschedule(id);
257    }
258
259    protected void schedule(String id, JobDetail job, Trigger trigger) throws SchedulerException {
260        scheduler.scheduleJob(job, trigger);
261        // record the jobKey only if scheduleJob didn't throw
262        jobKeys.put(id, job.getKey());
263    }
264
265    protected boolean unschedule(String id) {
266        return unschedule(id, jobKeys.remove(id));
267    }
268
269    protected boolean unschedule(String id, JobKey jobKey) {
270        if (jobKey == null) {
271            // shouldn't happen but let's be robust
272            log.warn("Unscheduling null jobKey for id: " + id);
273            return false;
274        }
275        try {
276            return scheduler.deleteJob(jobKey);
277        } catch (SchedulerException e) {
278            log.error(String.format("failed to unschedule job with '%s': %s", id, e.getMessage()), e);
279            return true; // didn't exist so consider it removed (maybe concurrency)
280        }
281    }
282
283    @Override
284    public boolean unregisterSchedule(Schedule schedule) {
285        return unregisterSchedule(schedule.getId());
286    }
287
288}