001/* 002 * (C) Copyright 2007-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Thierry Martins 019 */ 020package org.nuxeo.ecm.core.scheduler; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.Serializable; 025import java.net.URL; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.Properties; 030import java.util.Set; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.NuxeoException; 035import org.nuxeo.runtime.model.ComponentContext; 036import org.nuxeo.runtime.model.DefaultComponent; 037import org.nuxeo.runtime.model.Extension; 038import org.nuxeo.runtime.model.RuntimeContext; 039import org.quartz.CronScheduleBuilder; 040import org.quartz.JobBuilder; 041import org.quartz.JobDataMap; 042import org.quartz.JobDetail; 043import org.quartz.JobKey; 044import org.quartz.ObjectAlreadyExistsException; 045import org.quartz.Scheduler; 046import org.quartz.SchedulerException; 047import org.quartz.Trigger; 048import org.quartz.TriggerBuilder; 049import org.quartz.impl.StdSchedulerFactory; 050import org.quartz.impl.jdbcjobstore.LockException; 051import org.quartz.impl.matchers.GroupMatcher; 052 053/** 054 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 055 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 056 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 057 * quartz table even other node is running. 058 */ 059public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService { 060 061 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 062 063 protected RuntimeContext context; 064 065 protected Scheduler scheduler; 066 067 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 068 069 /** 070 * @since 7.10 071 */ 072 private Map<String, JobKey> jobKeys = new HashMap<>(); 073 074 @Override 075 public void activate(ComponentContext context) { 076 log.debug("Activate"); 077 this.context = context.getRuntimeContext(); 078 } 079 080 protected void setupScheduler() throws IOException, SchedulerException { 081 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 082 URL cfg = context.getResource("config/quartz.properties"); 083 if (cfg != null) { 084 try (InputStream stream = cfg.openStream()) { 085 schedulerFactory.initialize(stream); 086 } 087 } else { 088 // use default config (unit tests) 089 Properties props = new Properties(); 090 props.put("org.quartz.scheduler.instanceName", "Quartz"); 091 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 092 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 093 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 094 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 095 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 096 props.put("org.quartz.threadPool.threadCount", "1"); 097 props.put("org.quartz.threadPool.threadPriority", "4"); 098 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 099 schedulerFactory.initialize(props); 100 } 101 scheduler = schedulerFactory.getScheduler(); 102 scheduler.start(); 103 // server = MBeanServerFactory.createMBeanServer(); 104 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 105 // quartzObjectName); 106 107 // clean up all nuxeo jobs 108 // https://jira.nuxeo.com/browse/NXP-7303 109 GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo"); 110 Set<JobKey> jobs = scheduler.getJobKeys(matcher); 111 try { 112 scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies 113 for (Schedule each : registry.getSchedules()) { 114 registerSchedule(each); 115 } 116 } catch (LockException cause) { 117 log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause); 118 } 119 log.info("scheduler started"); 120 } 121 122 protected void shutdownScheduler() { 123 if (scheduler == null) { 124 return; 125 } 126 try { 127 scheduler.shutdown(); 128 } catch (SchedulerException cause) { 129 log.error("Cannot shutdown scheduler", cause); 130 } finally { 131 scheduler = null; 132 } 133 } 134 135 @Override 136 public void deactivate(ComponentContext context) { 137 log.debug("Deactivate"); 138 shutdownScheduler(); 139 } 140 141 @Override 142 public void start(ComponentContext context) { 143 try { 144 setupScheduler(); 145 } catch (IOException | SchedulerException e) { 146 throw new NuxeoException(e); 147 } 148 } 149 150 @Override 151 public void stop(ComponentContext context) { 152 try { 153 scheduler.standby(); 154 } catch (SchedulerException cause) { 155 log.error("Cannot put scheduler in stand by mode", cause); 156 } 157 } 158 159 @Override 160 public boolean hasApplicationStarted() { 161 return scheduler != null; 162 } 163 164 @Override 165 public void registerExtension(Extension extension) { 166 Object[] contribs = extension.getContributions(); 167 for (Object contrib : contribs) { 168 registerSchedule((Schedule) contrib); 169 } 170 } 171 172 @Override 173 public void unregisterExtension(Extension extension) { 174 // do nothing to do ; 175 // clean up will be done when service is activated 176 // see https://jira.nuxeo.com/browse/NXP-7303 177 } 178 179 public RuntimeContext getContext() { 180 return context; 181 } 182 183 @Override 184 public void registerSchedule(Schedule schedule) { 185 registerSchedule(schedule, null); 186 } 187 188 @Override 189 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 190 registry.addContribution(schedule); 191 if (scheduler == null) { 192 return; 193 } 194 Schedule contributed = registry.getSchedule(schedule); 195 if (contributed != null) { 196 schedule(contributed, parameters); 197 } else { 198 unschedule(schedule.getId()); 199 } 200 } 201 202 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 203 log.info("Registering " + schedule); 204 205 JobDataMap map = new JobDataMap(); 206 if (parameters != null) { 207 map.putAll(parameters); 208 } 209 JobDetail job = JobBuilder.newJob(EventJob.class) 210 .withIdentity(schedule.getId(), "nuxeo") 211 .usingJobData(map) 212 .usingJobData("eventId", schedule.getEventId()) 213 .usingJobData("eventCategory", schedule.getEventCategory()) 214 .usingJobData("username", schedule.getUsername()) 215 .build(); 216 217 Trigger trigger = TriggerBuilder.newTrigger() 218 .withIdentity(schedule.getId(), "nuxeo") 219 .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression())) 220 .build(); 221 222 // This is useful when testing to avoid multiple threads: 223 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 224 225 try { 226 scheduler.scheduleJob(job, trigger); 227 jobKeys.put(schedule.getId(), job.getKey()); 228 } catch (ObjectAlreadyExistsException e) { 229 log.trace("Overriding scheduler with id: " + schedule.getId()); 230 // when jobs are persisted in a database, the job should already 231 // be there 232 // remove existing job and re-schedule 233 boolean unregistred = unregisterSchedule(schedule.getId()); 234 if (unregistred) { 235 try { 236 scheduler.scheduleJob(job, trigger); 237 } catch (SchedulerException e1) { 238 log.error( 239 String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), 240 e); 241 } 242 } 243 244 } catch (SchedulerException e) { 245 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 246 } 247 } 248 249 @Override 250 public boolean unregisterSchedule(String id) { 251 log.info("Unregistering schedule with id" + id); 252 Schedule schedule = registry.getSchedule(id); 253 if (schedule == null) { 254 return false; 255 } 256 registry.removeContribution(schedule, true); 257 return unschedule(id); 258 } 259 260 protected boolean unschedule(String jobId) { 261 try { 262 return scheduler.deleteJob(jobKeys.get(jobId)); 263 } catch (SchedulerException e) { 264 log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e); 265 } 266 return false; 267 } 268 269 @Override 270 public boolean unregisterSchedule(Schedule schedule) { 271 return unregisterSchedule(schedule.getId()); 272 } 273 274}