001/* 002 * (C) Copyright 2007-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Thierry Martins 019 * Florent Munch 020 */ 021package org.nuxeo.ecm.core.scheduler; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.Serializable; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.Properties; 032import java.util.Set; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.common.Environment; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.runtime.model.ComponentContext; 039import org.nuxeo.runtime.model.DefaultComponent; 040import org.nuxeo.runtime.model.Extension; 041import org.nuxeo.runtime.model.RuntimeContext; 042import org.quartz.JobDetail; 043import org.quartz.JobKey; 044import org.quartz.ObjectAlreadyExistsException; 045import org.quartz.Scheduler; 046import org.quartz.SchedulerException; 047import org.quartz.Trigger; 048import org.quartz.impl.StdSchedulerFactory; 049import org.quartz.impl.jdbcjobstore.LockException; 050import org.quartz.impl.matchers.GroupMatcher; 051 052/** 053 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 054 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 055 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 056 * quartz table even other node is running. 057 */ 058public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService { 059 060 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 061 062 protected RuntimeContext context; 063 064 protected Scheduler scheduler; 065 066 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 067 068 /** 069 * @since 7.10 070 */ 071 private Map<String, JobKey> jobKeys = new HashMap<>(); 072 073 @Override 074 public void activate(ComponentContext context) { 075 log.debug("Activate"); 076 this.context = context.getRuntimeContext(); 077 } 078 079 protected void setupScheduler() throws IOException, SchedulerException { 080 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 081 File file = new File(Environment.getDefault().getConfig(), "quartz.properties"); 082 if (file.exists()) { 083 try (InputStream stream = new FileInputStream(file)) { 084 schedulerFactory.initialize(stream); 085 } 086 } else { 087 // use default config (unit tests) 088 Properties props = new Properties(); 089 props.put("org.quartz.scheduler.instanceName", "Quartz"); 090 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 091 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 092 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 093 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 094 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 095 props.put("org.quartz.threadPool.threadCount", "1"); 096 props.put("org.quartz.threadPool.threadPriority", "4"); 097 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 098 schedulerFactory.initialize(props); 099 } 100 scheduler = schedulerFactory.getScheduler(); 101 scheduler.start(); 102 // server = MBeanServerFactory.createMBeanServer(); 103 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 104 // quartzObjectName); 105 106 // clean up all nuxeo jobs 107 // https://jira.nuxeo.com/browse/NXP-7303 108 GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo"); 109 Set<JobKey> jobs = scheduler.getJobKeys(matcher); 110 try { 111 scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies 112 for (Schedule each : registry.getSchedules()) { 113 registerSchedule(each); 114 } 115 } catch (LockException cause) { 116 log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause); 117 } 118 log.info("scheduler started"); 119 } 120 121 protected void shutdownScheduler() { 122 if (scheduler == null) { 123 return; 124 } 125 try { 126 scheduler.shutdown(); 127 } catch (SchedulerException cause) { 128 log.error("Cannot shutdown scheduler", cause); 129 } finally { 130 scheduler = null; 131 } 132 } 133 134 @Override 135 public void deactivate(ComponentContext context) { 136 log.debug("Deactivate"); 137 shutdownScheduler(); 138 } 139 140 @Override 141 public void start(ComponentContext context) { 142 try { 143 setupScheduler(); 144 } catch (IOException | SchedulerException e) { 145 throw new NuxeoException(e); 146 } 147 } 148 149 @Override 150 public void stop(ComponentContext context) { 151 try { 152 scheduler.standby(); 153 } catch (SchedulerException cause) { 154 log.error("Cannot put scheduler in stand by mode", cause); 155 } 156 } 157 158 @Override 159 public boolean hasApplicationStarted() { 160 return scheduler != null; 161 } 162 163 @Override 164 public void registerExtension(Extension extension) { 165 Object[] contribs = extension.getContributions(); 166 for (Object contrib : contribs) { 167 registerSchedule((Schedule) contrib); 168 } 169 } 170 171 @Override 172 public void unregisterExtension(Extension extension) { 173 // do nothing to do ; 174 // clean up will be done when service is activated 175 // see https://jira.nuxeo.com/browse/NXP-7303 176 } 177 178 public RuntimeContext getContext() { 179 return context; 180 } 181 182 @Override 183 public void registerSchedule(Schedule schedule) { 184 registerSchedule(schedule, null); 185 } 186 187 @Override 188 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 189 registry.addContribution(schedule); 190 if (scheduler == null) { 191 return; 192 } 193 Schedule contributed = registry.getSchedule(schedule); 194 if (contributed != null) { 195 schedule(contributed, parameters); 196 } else { 197 unschedule(schedule.getId()); 198 } 199 } 200 201 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 202 log.info("Registering " + schedule); 203 204 EventJobFactory jobFactory = schedule.getJobFactory(); 205 JobDetail job = jobFactory.buildJob(schedule, parameters).build(); 206 Trigger trigger = jobFactory.buildTrigger(schedule).build(); 207 208 // This is useful when testing to avoid multiple threads: 209 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 210 211 try { 212 try { 213 schedule(schedule.getId(), job, trigger); 214 } catch (ObjectAlreadyExistsException e) { 215 // when jobs are persisted in a database, the job should already be there 216 // remove existing job and re-schedule 217 log.trace("Overriding scheduler with id: " + schedule.getId()); 218 boolean unregistered = unschedule(schedule.getId(), job.getKey()); 219 if (unregistered) { 220 schedule(schedule.getId(), job, trigger); 221 } 222 } 223 } catch (SchedulerException e) { 224 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 225 } 226 } 227 228 @Override 229 public boolean unregisterSchedule(String id) { 230 log.info("Unregistering schedule with id" + id); 231 Schedule schedule = registry.getSchedule(id); 232 if (schedule == null) { 233 return false; 234 } 235 registry.removeContribution(schedule, true); 236 return unschedule(id); 237 } 238 239 protected void schedule(String id, JobDetail job, Trigger trigger) throws SchedulerException { 240 scheduler.scheduleJob(job, trigger); 241 // record the jobKey only if scheduleJob didn't throw 242 jobKeys.put(id, job.getKey()); 243 } 244 245 protected boolean unschedule(String id) { 246 return unschedule(id, jobKeys.remove(id)); 247 } 248 249 protected boolean unschedule(String id, JobKey jobKey) { 250 if (jobKey == null) { 251 // shouldn't happen but let's be robust 252 log.warn("Unscheduling null jobKey for id: " + id); 253 return false; 254 } 255 try { 256 return scheduler.deleteJob(jobKey); 257 } catch (SchedulerException e) { 258 log.error(String.format("failed to unschedule job with '%s': %s", id, e.getMessage()), e); 259 return true; // didn't exist so consider it removed (maybe concurrency) 260 } 261 } 262 263 @Override 264 public boolean unregisterSchedule(Schedule schedule) { 265 return unregisterSchedule(schedule.getId()); 266 } 267 268}