001/* 002 * (C) Copyright 2007-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Thierry Martins 019 * Florent Munch 020 */ 021package org.nuxeo.ecm.core.scheduler; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.Serializable; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.Properties; 032import java.util.Set; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.common.Environment; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.runtime.model.ComponentContext; 039import org.nuxeo.runtime.model.DefaultComponent; 040import org.nuxeo.runtime.model.Extension; 041import org.nuxeo.runtime.model.RuntimeContext; 042import org.quartz.JobDetail; 043import org.quartz.JobKey; 044import org.quartz.ObjectAlreadyExistsException; 045import org.quartz.Scheduler; 046import org.quartz.SchedulerException; 047import org.quartz.Trigger; 048import org.quartz.impl.StdSchedulerFactory; 049import org.quartz.impl.jdbcjobstore.LockException; 050import org.quartz.impl.matchers.GroupMatcher; 051 052/** 053 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 054 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 055 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 056 * quartz table even other node is running. 057 */ 058public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService { 059 060 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 061 062 protected RuntimeContext context; 063 064 protected Scheduler scheduler; 065 066 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 067 068 /** 069 * @since 7.10 070 */ 071 private Map<String, JobKey> jobKeys = new HashMap<>(); 072 073 @Override 074 public void activate(ComponentContext context) { 075 log.debug("Activate"); 076 this.context = context.getRuntimeContext(); 077 } 078 079 protected void setupScheduler() throws IOException, SchedulerException { 080 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 081 File file = new File(Environment.getDefault().getConfig(), "quartz.properties"); 082 if (file.exists()) { 083 try (InputStream stream = new FileInputStream(file)) { 084 schedulerFactory.initialize(stream); 085 } 086 } else { 087 // use default config (unit tests) 088 Properties props = new Properties(); 089 props.put("org.quartz.scheduler.instanceName", "Quartz"); 090 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 091 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 092 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 093 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 094 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 095 props.put("org.quartz.threadPool.threadCount", "1"); 096 props.put("org.quartz.threadPool.threadPriority", "4"); 097 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 098 schedulerFactory.initialize(props); 099 } 100 scheduler = schedulerFactory.getScheduler(); 101 scheduler.start(); 102 // server = MBeanServerFactory.createMBeanServer(); 103 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 104 // quartzObjectName); 105 106 // clean up all nuxeo jobs 107 // https://jira.nuxeo.com/browse/NXP-7303 108 GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo"); 109 Set<JobKey> jobs = scheduler.getJobKeys(matcher); 110 try { 111 scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies 112 for (Schedule each : registry.getSchedules()) { 113 registerSchedule(each); 114 } 115 } catch (LockException cause) { 116 log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause); 117 } 118 log.info("scheduler started"); 119 } 120 121 protected void shutdownScheduler() { 122 if (scheduler == null) { 123 return; 124 } 125 try { 126 scheduler.shutdown(); 127 } catch (SchedulerException cause) { 128 log.error("Cannot shutdown scheduler", cause); 129 } finally { 130 scheduler = null; 131 } 132 } 133 134 @Override 135 public void deactivate(ComponentContext context) { 136 log.debug("Deactivate"); 137 shutdownScheduler(); 138 } 139 140 @Override 141 public void start(ComponentContext context) { 142 try { 143 setupScheduler(); 144 } catch (IOException | SchedulerException e) { 145 throw new NuxeoException(e); 146 } 147 } 148 149 @Override 150 public void stop(ComponentContext context) { 151 try { 152 scheduler.standby(); 153 } catch (SchedulerException cause) { 154 log.error("Cannot put scheduler in stand by mode", cause); 155 } 156 } 157 158 @Override 159 public boolean hasApplicationStarted() { 160 return scheduler != null; 161 } 162 163 @Override 164 public void registerExtension(Extension extension) { 165 Object[] contribs = extension.getContributions(); 166 for (Object contrib : contribs) { 167 registerSchedule((Schedule) contrib); 168 } 169 } 170 171 @Override 172 public void unregisterExtension(Extension extension) { 173 // do nothing to do ; 174 // clean up will be done when service is activated 175 // see https://jira.nuxeo.com/browse/NXP-7303 176 } 177 178 public RuntimeContext getContext() { 179 return context; 180 } 181 182 @Override 183 public void registerSchedule(Schedule schedule) { 184 registerSchedule(schedule, null); 185 } 186 187 @Override 188 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 189 registry.addContribution(schedule); 190 if (scheduler == null) { 191 return; 192 } 193 Schedule contributed = registry.getSchedule(schedule); 194 if (contributed != null) { 195 schedule(contributed, parameters); 196 } else { 197 unschedule(schedule.getId()); 198 } 199 } 200 201 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 202 log.info("Registering " + schedule); 203 204 EventJobFactory jobFactory = schedule.getJobFactory(); 205 JobDetail job = jobFactory.buildJob(schedule, parameters).build(); 206 Trigger trigger = jobFactory.buildTrigger(schedule).build(); 207 208 // This is useful when testing to avoid multiple threads: 209 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 210 211 try { 212 scheduler.scheduleJob(job, trigger); 213 jobKeys.put(schedule.getId(), job.getKey()); 214 } catch (ObjectAlreadyExistsException e) { 215 log.trace("Overriding scheduler with id: " + schedule.getId()); 216 // when jobs are persisted in a database, the job should already 217 // be there 218 // remove existing job and re-schedule 219 boolean unregistred = unregisterSchedule(schedule.getId()); 220 if (unregistred) { 221 try { 222 scheduler.scheduleJob(job, trigger); 223 } catch (SchedulerException e1) { 224 log.error( 225 String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), 226 e); 227 } 228 } 229 230 } catch (SchedulerException e) { 231 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 232 } 233 } 234 235 @Override 236 public boolean unregisterSchedule(String id) { 237 log.info("Unregistering schedule with id" + id); 238 Schedule schedule = registry.getSchedule(id); 239 if (schedule == null) { 240 return false; 241 } 242 registry.removeContribution(schedule, true); 243 return unschedule(id); 244 } 245 246 protected boolean unschedule(String jobId) { 247 try { 248 return scheduler.deleteJob(jobKeys.get(jobId)); 249 } catch (SchedulerException e) { 250 log.error(String.format("failed to unschedule job with '%s': %s", jobId, e.getMessage()), e); 251 } 252 return false; 253 } 254 255 @Override 256 public boolean unregisterSchedule(Schedule schedule) { 257 return unregisterSchedule(schedule.getId()); 258 } 259 260}