001/* 002 * (C) Copyright 2007-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Thierry Martins 019 */ 020package org.nuxeo.ecm.core.scheduler; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.Serializable; 025import java.net.URL; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.Properties; 030import java.util.Set; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.NuxeoException; 035import org.nuxeo.runtime.RuntimeServiceEvent; 036import org.nuxeo.runtime.RuntimeServiceListener; 037import org.nuxeo.runtime.api.Framework; 038import org.nuxeo.runtime.model.ComponentContext; 039import org.nuxeo.runtime.model.DefaultComponent; 040import org.nuxeo.runtime.model.Extension; 041import org.nuxeo.runtime.model.RuntimeContext; 042import org.quartz.CronScheduleBuilder; 043import org.quartz.JobBuilder; 044import org.quartz.JobDataMap; 045import org.quartz.JobDetail; 046import org.quartz.JobKey; 047import org.quartz.ObjectAlreadyExistsException; 048import org.quartz.Scheduler; 049import org.quartz.SchedulerException; 050import org.quartz.Trigger; 051import org.quartz.TriggerBuilder; 052import org.quartz.impl.StdSchedulerFactory; 053import org.quartz.impl.matchers.GroupMatcher; 054 055/** 056 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 057 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 058 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 059 * quartz table even other node is running. 060 */ 061public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService, RuntimeServiceListener { 062 063 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 064 065 protected RuntimeContext context; 066 067 protected Scheduler scheduler; 068 069 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 070 071 /** 072 * @since 7.10 073 */ 074 private Map<String, JobKey> jobKeys = new HashMap<String, JobKey>(); 075 076 @Override 077 public void activate(ComponentContext context) { 078 log.debug("Activate"); 079 this.context = context.getRuntimeContext(); 080 } 081 082 protected void setupScheduler() throws IOException, SchedulerException { 083 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 084 URL cfg = context.getResource("config/quartz.properties"); 085 if (cfg != null) { 086 InputStream stream = cfg.openStream(); 087 try { 088 schedulerFactory.initialize(stream); 089 } finally { 090 stream.close(); 091 } 092 } else { 093 // use default config (unit tests) 094 Properties props = new Properties(); 095 props.put("org.quartz.scheduler.instanceName", "Quartz"); 096 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 097 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 098 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 099 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 100 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 101 props.put("org.quartz.threadPool.threadCount", "1"); 102 props.put("org.quartz.threadPool.threadPriority", "4"); 103 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 104 schedulerFactory.initialize(props); 105 } 106 scheduler = schedulerFactory.getScheduler(); 107 scheduler.start(); 108 // server = MBeanServerFactory.createMBeanServer(); 109 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 110 // quartzObjectName); 111 112 // clean up all nuxeo jobs 113 // https://jira.nuxeo.com/browse/NXP-7303 114 GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo"); 115 Set<JobKey> jobs = scheduler.getJobKeys(matcher); 116 scheduler.deleteJobs(new ArrayList<JobKey>(jobs)); 117 for (Schedule each : registry.getSchedules()) { 118 registerSchedule(each); 119 } 120 log.info("scheduler started"); 121 } 122 123 protected void shutdownScheduler() { 124 if (scheduler == null) { 125 return; 126 } 127 try { 128 scheduler.shutdown(); 129 } catch (SchedulerException cause) { 130 log.error("Cannot shutdown scheduler", cause); 131 } finally { 132 scheduler = null; 133 } 134 } 135 136 @Override 137 public void deactivate(ComponentContext context) { 138 log.debug("Deactivate"); 139 shutdownScheduler(); 140 } 141 142 @Override 143 public void applicationStarted(ComponentContext context) { 144 Framework.addListener(this); 145 try { 146 setupScheduler(); 147 } catch (IOException | SchedulerException e) { 148 throw new NuxeoException(e); 149 } 150 } 151 152 @Override 153 public boolean hasApplicationStarted() { 154 return scheduler != null; 155 } 156 157 @Override 158 public void registerExtension(Extension extension) { 159 Object[] contribs = extension.getContributions(); 160 for (Object contrib : contribs) { 161 registerSchedule((Schedule) contrib); 162 } 163 } 164 165 @Override 166 public void unregisterExtension(Extension extension) { 167 // do nothing to do ; 168 // clean up will be done when service is activated 169 // see https://jira.nuxeo.com/browse/NXP-7303 170 } 171 172 public RuntimeContext getContext() { 173 return context; 174 } 175 176 @Override 177 public void registerSchedule(Schedule schedule) { 178 registerSchedule(schedule, null); 179 } 180 181 @Override 182 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 183 registry.addContribution(schedule); 184 if (scheduler == null) { 185 return; 186 } 187 Schedule contributed = registry.getSchedule(schedule); 188 if (contributed != null) { 189 schedule(contributed, parameters); 190 } else { 191 unschedule(schedule.getId()); 192 } 193 } 194 195 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 196 log.info("Registering " + schedule); 197 198 JobDataMap map = new JobDataMap(); 199 if (parameters != null) { 200 map.putAll(parameters); 201 } 202 JobDetail job = JobBuilder.newJob(EventJob.class) 203 .withIdentity(schedule.getId(), "nuxeo") 204 .usingJobData(map) 205 .usingJobData("eventId", schedule.getEventId()) 206 .usingJobData("eventCategory", schedule.getEventCategory()) 207 .usingJobData("username", schedule.getUsername()) 208 .build(); 209 210 Trigger trigger = TriggerBuilder.newTrigger() 211 .withIdentity(schedule.getId(), "nuxeo") 212 .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression())) 213 .build(); 214 215 // This is useful when testing to avoid multiple threads: 216 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 217 218 try { 219 scheduler.scheduleJob(job, trigger); 220 jobKeys.put(schedule.getId(), job.getKey()); 221 } catch (ObjectAlreadyExistsException e) { 222 log.trace("Overriding scheduler with id: " + schedule.getId()); 223 // when jobs are persisted in a database, the job should already 224 // be there 225 // remove existing job and re-schedule 226 boolean unregistred = unregisterSchedule(schedule.getId()); 227 if (unregistred) { 228 try { 229 scheduler.scheduleJob(job, trigger); 230 } catch (SchedulerException e1) { 231 log.error( 232 String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), 233 e); 234 } 235 } 236 237 } catch (SchedulerException e) { 238 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 239 } 240 } 241 242 @Override 243 public boolean unregisterSchedule(String id) { 244 log.info("Unregistering schedule with id" + id); 245 Schedule schedule = registry.getSchedule(id); 246 if (schedule == null) { 247 return false; 248 } 249 registry.removeContribution(schedule, true); 250 return unschedule(id); 251 } 252 253 protected boolean unschedule(String jobId) { 254 try { 255 return scheduler.deleteJob(jobKeys.get(jobId)); 256 } catch (SchedulerException e) { 257 log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e); 258 } 259 return false; 260 } 261 262 @Override 263 public boolean unregisterSchedule(Schedule schedule) { 264 return unregisterSchedule(schedule.getId()); 265 } 266 267 @Override 268 public void handleEvent(RuntimeServiceEvent event) { 269 if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) { 270 return; 271 } 272 Framework.removeListener(this); 273 shutdownScheduler(); 274 } 275 276}