001/* 002 * (C) Copyright 2007-2010 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.scheduler; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.text.ParseException; 024import java.util.Map; 025import java.util.Properties; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.NuxeoException; 030import org.nuxeo.runtime.RuntimeServiceEvent; 031import org.nuxeo.runtime.RuntimeServiceListener; 032import org.nuxeo.runtime.api.Framework; 033import org.nuxeo.runtime.model.ComponentContext; 034import org.nuxeo.runtime.model.DefaultComponent; 035import org.nuxeo.runtime.model.Extension; 036import org.nuxeo.runtime.model.RuntimeContext; 037import org.quartz.CronTrigger; 038import org.quartz.JobDataMap; 039import org.quartz.JobDetail; 040import org.quartz.ObjectAlreadyExistsException; 041import org.quartz.Scheduler; 042import org.quartz.SchedulerException; 043import org.quartz.Trigger; 044import org.quartz.impl.StdSchedulerFactory; 045 046/** 047 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 048 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 049 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 050 * quartz table even other node is running. 051 */ 052public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService, RuntimeServiceListener { 053 054 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 055 056 protected RuntimeContext bundle; 057 058 protected Scheduler scheduler; 059 060 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 061 062 @Override 063 public void activate(ComponentContext context) { 064 log.debug("Activate"); 065 bundle = context.getRuntimeContext(); 066 } 067 068 protected void setupScheduler(ComponentContext context) throws IOException, SchedulerException { 069 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 070 URL cfg = context.getRuntimeContext().getResource("config/quartz.properties"); 071 if (cfg != null) { 072 InputStream stream = cfg.openStream(); 073 try { 074 schedulerFactory.initialize(stream); 075 } finally { 076 stream.close(); 077 } 078 } else { 079 // use default config (unit tests) 080 Properties props = new Properties(); 081 props.put("org.quartz.scheduler.instanceName", "Quartz"); 082 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 083 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 084 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 085 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 086 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 087 props.put("org.quartz.threadPool.threadCount", "1"); 088 props.put("org.quartz.threadPool.threadPriority", "4"); 089 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 090 schedulerFactory.initialize(props); 091 } 092 scheduler = schedulerFactory.getScheduler(); 093 scheduler.start(); 094 // server = MBeanServerFactory.createMBeanServer(); 095 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 096 // quartzObjectName); 097 098 // clean up all nuxeo jobs 099 // https://jira.nuxeo.com/browse/NXP-7303 100 String[] jobs = scheduler.getJobNames("nuxeo"); 101 for (String job : jobs) { 102 unschedule(job); 103 } 104 for (Schedule each : registry.getSchedules()) { 105 registerSchedule(each); 106 } 107 log.info("scheduler started"); 108 } 109 110 protected void shutdownScheduler() { 111 if (scheduler == null) { 112 return; 113 } 114 try { 115 scheduler.shutdown(); 116 } catch (SchedulerException cause) { 117 log.error("Cannot shutdown scheduler", cause); 118 } finally { 119 scheduler = null; 120 } 121 } 122 123 @Override 124 public void deactivate(ComponentContext context) { 125 log.debug("Deactivate"); 126 shutdownScheduler(); 127 } 128 129 @Override 130 public void applicationStarted(ComponentContext context) { 131 Framework.addListener(this); 132 try { 133 setupScheduler(context); 134 } catch (IOException | SchedulerException e) { 135 throw new NuxeoException(e); 136 } 137 } 138 139 @Override 140 public boolean hasApplicationStarted() { 141 return scheduler != null; 142 } 143 144 @Override 145 public void registerExtension(Extension extension) { 146 Object[] contribs = extension.getContributions(); 147 for (Object contrib : contribs) { 148 registerSchedule((Schedule) contrib); 149 } 150 } 151 152 @Override 153 public void unregisterExtension(Extension extension) { 154 // do nothing to do ; 155 // clean up will be done when service is activated 156 // see https://jira.nuxeo.com/browse/NXP-7303 157 } 158 159 public RuntimeContext getContext() { 160 return bundle; 161 } 162 163 @Override 164 public void registerSchedule(Schedule schedule) { 165 registerSchedule(schedule, null); 166 } 167 168 @Override 169 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 170 registry.addContribution(schedule); 171 if (scheduler == null) { 172 return; 173 } 174 Schedule contributed = registry.getSchedule(schedule); 175 if (contributed != null) { 176 schedule(contributed, parameters); 177 } else { 178 unschedule(schedule.getId()); 179 } 180 } 181 182 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 183 log.info("Registering " + schedule); 184 JobDetail job = new JobDetail(schedule.getId(), "nuxeo", EventJob.class); 185 JobDataMap map = job.getJobDataMap(); 186 map.put("eventId", schedule.getEventId()); 187 map.put("eventCategory", schedule.getEventCategory()); 188 map.put("username", schedule.getUsername()); 189 190 if (parameters != null) { 191 map.putAll(parameters); 192 } 193 194 Trigger trigger; 195 try { 196 trigger = new CronTrigger(schedule.getId(), "nuxeo", schedule.getCronExpression()); 197 } catch (ParseException e) { 198 log.error(String.format("invalid cron expresion '%s' for schedule '%s'", schedule.getCronExpression(), 199 schedule.getId()), e); 200 return; 201 } 202 // This is useful when testing to avoid multiple threads: 203 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 204 205 try { 206 scheduler.scheduleJob(job, trigger); 207 } catch (ObjectAlreadyExistsException e) { 208 log.trace("Overriding scheduler with id: " + schedule.getId()); 209 // when jobs are persisted in a database, the job should already 210 // be there 211 // remove existing job and re-schedule 212 boolean unregistred = unregisterSchedule(schedule.getId()); 213 if (unregistred) { 214 try { 215 scheduler.scheduleJob(job, trigger); 216 } catch (SchedulerException e1) { 217 log.error( 218 String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), 219 e); 220 } 221 } 222 223 } catch (SchedulerException e) { 224 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 225 } 226 } 227 228 @Override 229 public boolean unregisterSchedule(String id) { 230 log.info("Unregistering schedule with id" + id); 231 Schedule schedule = registry.getSchedule(id); 232 if (schedule == null) { 233 return false; 234 } 235 registry.removeContribution(schedule, true); 236 return unschedule(id); 237 } 238 239 protected boolean unschedule(String jobId) { 240 try { 241 return scheduler.deleteJob(jobId, "nuxeo"); 242 } catch (SchedulerException e) { 243 log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e); 244 } 245 return false; 246 } 247 248 @Override 249 public boolean unregisterSchedule(Schedule schedule) { 250 return unregisterSchedule(schedule.getId()); 251 } 252 253 @Override 254 public void handleEvent(RuntimeServiceEvent event) { 255 if (event.id != RuntimeServiceEvent.RUNTIME_ABOUT_TO_STOP) { 256 return; 257 } 258 Framework.removeListener(this); 259 shutdownScheduler(); 260 } 261 262}