001/* 002 * Copyright (c) 2007-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 * Thierry Martins 012 */ 013package org.nuxeo.ecm.core.scheduler; 014 015import java.io.IOException; 016import java.io.InputStream; 017import java.io.Serializable; 018import java.net.URL; 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.Properties; 023import java.util.Set; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.nuxeo.ecm.core.api.NuxeoException; 028import org.nuxeo.runtime.RuntimeServiceEvent; 029import org.nuxeo.runtime.RuntimeServiceListener; 030import org.nuxeo.runtime.api.Framework; 031import org.nuxeo.runtime.model.ComponentContext; 032import org.nuxeo.runtime.model.DefaultComponent; 033import org.nuxeo.runtime.model.Extension; 034import org.nuxeo.runtime.model.RuntimeContext; 035import org.quartz.CronScheduleBuilder; 036import org.quartz.JobBuilder; 037import org.quartz.JobDataMap; 038import org.quartz.JobDetail; 039import org.quartz.JobKey; 040import org.quartz.ObjectAlreadyExistsException; 041import org.quartz.Scheduler; 042import org.quartz.SchedulerException; 043import org.quartz.Trigger; 044import org.quartz.TriggerBuilder; 045import org.quartz.impl.StdSchedulerFactory; 046import org.quartz.impl.matchers.GroupMatcher; 047 048/** 049 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 050 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 051 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 052 * quartz table even other node is running. 053 */ 054public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService, RuntimeServiceListener { 055 056 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 057 058 protected RuntimeContext bundle; 059 060 protected Scheduler scheduler; 061 062 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 063 064 /** 065 * @since 7.10 066 */ 067 private Map<String, JobKey> jobKeys = new HashMap<String, JobKey>(); 068 069 @Override 070 public void activate(ComponentContext context) { 071 log.debug("Activate"); 072 bundle = context.getRuntimeContext(); 073 } 074 075 protected void setupScheduler(ComponentContext context) throws IOException, SchedulerException { 076 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 077 URL cfg = context.getRuntimeContext().getResource("config/quartz.properties"); 078 if (cfg != null) { 079 InputStream stream = cfg.openStream(); 080 try { 081 schedulerFactory.initialize(stream); 082 } finally { 083 stream.close(); 084 } 085 } else { 086 // use default config (unit tests) 087 Properties props = new Properties(); 088 props.put("org.quartz.scheduler.instanceName", "Quartz"); 089 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 090 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 091 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 092 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 093 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 094 props.put("org.quartz.threadPool.threadCount", "1"); 095 props.put("org.quartz.threadPool.threadPriority", "4"); 096 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 097 schedulerFactory.initialize(props); 098 } 099 scheduler = schedulerFactory.getScheduler(); 100 scheduler.start(); 101 // server = MBeanServerFactory.createMBeanServer(); 102 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 103 // quartzObjectName); 104 105 // clean up all nuxeo jobs 106 // https://jira.nuxeo.com/browse/NXP-7303 107 GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo"); 108 Set<JobKey> jobs = scheduler.getJobKeys(matcher); 109 scheduler.deleteJobs(new ArrayList<JobKey>(jobs)); 110 for (Schedule each : registry.getSchedules()) { 111 registerSchedule(each); 112 } 113 log.info("scheduler started"); 114 } 115 116 protected void shutdownScheduler() { 117 if (scheduler == null) { 118 return; 119 } 120 try { 121 scheduler.shutdown(); 122 } catch (SchedulerException cause) { 123 log.error("Cannot shutdown scheduler", cause); 124 } finally { 125 scheduler = null; 126 } 127 } 128 129 @Override 130 public void deactivate(ComponentContext context) { 131 log.debug("Deactivate"); 132 shutdownScheduler(); 133 } 134 135 @Override 136 public void applicationStarted(ComponentContext context) { 137 Framework.addListener(this); 138 try { 139 setupScheduler(context); 140 } catch (IOException | SchedulerException e) { 141 throw new NuxeoException(e); 142 } 143 } 144 145 @Override 146 public boolean hasApplicationStarted() { 147 return scheduler != null; 148 } 149 150 @Override 151 public void registerExtension(Extension extension) { 152 Object[] contribs = extension.getContributions(); 153 for (Object contrib : contribs) { 154 registerSchedule((Schedule) contrib); 155 } 156 } 157 158 @Override 159 public void unregisterExtension(Extension extension) { 160 // do nothing to do ; 161 // clean up will be done when service is activated 162 // see https://jira.nuxeo.com/browse/NXP-7303 163 } 164 165 public RuntimeContext getContext() { 166 return bundle; 167 } 168 169 @Override 170 public void registerSchedule(Schedule schedule) { 171 registerSchedule(schedule, null); 172 } 173 174 @Override 175 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 176 registry.addContribution(schedule); 177 if (scheduler == null) { 178 return; 179 } 180 Schedule contributed = registry.getSchedule(schedule); 181 if (contributed != null) { 182 schedule(contributed, parameters); 183 } else { 184 unschedule(schedule.getId()); 185 } 186 } 187 188 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 189 log.info("Registering " + schedule); 190 191 JobDataMap map = new JobDataMap(); 192 if (parameters != null) { 193 map.putAll(parameters); 194 } 195 JobDetail job = JobBuilder.newJob(EventJob.class) 196 .withIdentity(schedule.getId(), "nuxeo") 197 .usingJobData(map) 198 .usingJobData("eventId", schedule.getEventId()) 199 .usingJobData("eventCategory", schedule.getEventCategory()) 200 .usingJobData("username", schedule.getUsername()) 201 .build(); 202 203 Trigger trigger = TriggerBuilder.newTrigger() 204 .withIdentity(schedule.getId(), "nuxeo") 205 .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression())) 206 .build(); 207 208 // This is useful when testing to avoid multiple threads: 209 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 210 211 try { 212 scheduler.scheduleJob(job, trigger); 213 jobKeys.put(schedule.getId(), job.getKey()); 214 } catch (ObjectAlreadyExistsException e) { 215 log.trace("Overriding scheduler with id: " + schedule.getId()); 216 // when jobs are persisted in a database, the job should already 217 // be there 218 // remove existing job and re-schedule 219 boolean unregistred = unregisterSchedule(schedule.getId()); 220 if (unregistred) { 221 try { 222 scheduler.scheduleJob(job, trigger); 223 } catch (SchedulerException e1) { 224 log.error( 225 String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), 226 e); 227 } 228 } 229 230 } catch (SchedulerException e) { 231 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 232 } 233 } 234 235 @Override 236 public boolean unregisterSchedule(String id) { 237 log.info("Unregistering schedule with id" + id); 238 Schedule schedule = registry.getSchedule(id); 239 if (schedule == null) { 240 return false; 241 } 242 registry.removeContribution(schedule, true); 243 return unschedule(id); 244 } 245 246 protected boolean unschedule(String jobId) { 247 try { 248 return scheduler.deleteJob(jobKeys.get(jobId)); 249 } catch (SchedulerException e) { 250 log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e); 251 } 252 return false; 253 } 254 255 @Override 256 public boolean unregisterSchedule(Schedule schedule) { 257 return unregisterSchedule(schedule.getId()); 258 } 259 260 @Override 261 public void handleEvent(RuntimeServiceEvent event) { 262 if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) { 263 return; 264 } 265 Framework.removeListener(this); 266 shutdownScheduler(); 267 } 268 269}