001/* 002 * (C) Copyright 2007-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Thierry Martins 019 * Florent Munch 020 */ 021package org.nuxeo.ecm.core.scheduler; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.Serializable; 028import java.time.Duration; 029import java.util.ArrayList; 030import java.util.HashMap; 031import java.util.Map; 032import java.util.Properties; 033import java.util.Set; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.common.Environment; 038import org.nuxeo.common.utils.DurationUtils; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.runtime.api.Framework; 041import org.nuxeo.runtime.cluster.ClusterService; 042import org.nuxeo.runtime.model.ComponentContext; 043import org.nuxeo.runtime.model.DefaultComponent; 044import org.nuxeo.runtime.model.Extension; 045import org.nuxeo.runtime.model.RuntimeContext; 046import org.quartz.JobDetail; 047import org.quartz.JobKey; 048import org.quartz.ObjectAlreadyExistsException; 049import org.quartz.Scheduler; 050import org.quartz.SchedulerException; 051import org.quartz.Trigger; 052import org.quartz.impl.StdSchedulerFactory; 053import org.quartz.impl.jdbcjobstore.LockException; 054import org.quartz.impl.matchers.GroupMatcher; 055 056/** 057 * Schedule service implementation. Since the cleanup of the quartz job is done when service is activated, ( see see 058 * https://jira.nuxeo.com/browse/NXP-7303 ) in cluster mode, the schedules contributions MUST be the same on all nodes. 059 * Due the fact that all jobs are removed when service starts on a node it may be a short period with no schedules in 060 * quartz table even other node is running. 061 */ 062public class SchedulerServiceImpl extends DefaultComponent implements SchedulerService { 063 064 private static final Log log = LogFactory.getLog(SchedulerServiceImpl.class); 065 066 /** @since 11.1 */ 067 public static final String CLUSTER_START_DURATION_PROP = "org.nuxeo.scheduler.cluster.start.duration"; 068 069 /** @since 11.1 */ 070 public static final Duration CLUSTER_START_DURATION_DEFAULT = Duration.ofMinutes(1); 071 072 protected RuntimeContext context; 073 074 protected Scheduler scheduler; 075 076 protected final ScheduleExtensionRegistry registry = new ScheduleExtensionRegistry(); 077 078 /** 079 * @since 7.10 080 */ 081 private Map<String, JobKey> jobKeys = new HashMap<>(); 082 083 @Override 084 public void activate(ComponentContext context) { 085 log.debug("Activate"); 086 this.context = context.getRuntimeContext(); 087 } 088 089 protected void setupScheduler() throws IOException, SchedulerException { 090 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); 091 File file = new File(Environment.getDefault().getConfig(), "quartz.properties"); 092 if (file.exists()) { 093 try (InputStream stream = new FileInputStream(file)) { 094 schedulerFactory.initialize(stream); 095 } 096 } else { 097 // use default config (unit tests) 098 Properties props = new Properties(); 099 props.put("org.quartz.scheduler.instanceName", "Quartz"); 100 props.put("org.quartz.scheduler.threadName", "Quartz_Scheduler"); 101 props.put("org.quartz.scheduler.instanceId", "NON_CLUSTERED"); 102 props.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true"); 103 props.put("org.quartz.scheduler.skipUpdateCheck", "true"); 104 props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 105 props.put("org.quartz.threadPool.threadCount", "1"); 106 props.put("org.quartz.threadPool.threadPriority", "4"); 107 props.put("org.quartz.threadPool.makeThreadsDaemons", "true"); 108 schedulerFactory.initialize(props); 109 } 110 scheduler = schedulerFactory.getScheduler(); 111 scheduler.start(); 112 // server = MBeanServerFactory.createMBeanServer(); 113 // server.createMBean("org.quartz.ee.jmx.jboss.QuartzService", 114 // quartzObjectName); 115 116 // clean up all nuxeo jobs 117 // https://jira.nuxeo.com/browse/NXP-7303 118 GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals("nuxeo"); 119 Set<JobKey> jobs = scheduler.getJobKeys(matcher); 120 try { 121 scheduler.deleteJobs(new ArrayList<>(jobs)); // raise a lock error in case of concurrencies 122 for (Schedule each : registry.getSchedules()) { 123 registerSchedule(each); 124 } 125 } catch (LockException cause) { 126 log.warn("scheduler already re-initializing, another cluster node concurrent startup ?", cause); 127 } 128 log.info("scheduler started"); 129 } 130 131 protected void shutdownScheduler() { 132 if (scheduler == null) { 133 return; 134 } 135 try { 136 scheduler.shutdown(); 137 } catch (SchedulerException cause) { 138 log.error("Cannot shutdown scheduler", cause); 139 } finally { 140 scheduler = null; 141 } 142 } 143 144 @Override 145 public void deactivate(ComponentContext context) { 146 log.debug("Deactivate"); 147 shutdownScheduler(); 148 } 149 150 @Override 151 public void start(ComponentContext context) { 152 startScheduler(); 153 } 154 155 protected void startScheduler() { 156 ClusterService clusterService = Framework.getService(ClusterService.class); 157 String prop = Framework.getProperty(CLUSTER_START_DURATION_PROP); 158 Duration duration = DurationUtils.parsePositive(prop, CLUSTER_START_DURATION_DEFAULT); 159 Duration pollDelay = Duration.ofSeconds(1); 160 clusterService.runAtomically("start-scheduler", duration, pollDelay, () -> { 161 try { 162 setupScheduler(); 163 } catch (IOException | SchedulerException e) { 164 throw new NuxeoException(e); 165 } 166 }); 167 } 168 169 @Override 170 public void stop(ComponentContext context) { 171 try { 172 scheduler.standby(); 173 } catch (SchedulerException cause) { 174 log.error("Cannot put scheduler in stand by mode", cause); 175 } 176 } 177 178 @Override 179 public boolean hasApplicationStarted() { 180 return scheduler != null; 181 } 182 183 @Override 184 public void registerExtension(Extension extension) { 185 Object[] contribs = extension.getContributions(); 186 for (Object contrib : contribs) { 187 registerSchedule((Schedule) contrib); 188 } 189 } 190 191 @Override 192 public void unregisterExtension(Extension extension) { 193 // do nothing to do ; 194 // clean up will be done when service is activated 195 // see https://jira.nuxeo.com/browse/NXP-7303 196 } 197 198 public RuntimeContext getContext() { 199 return context; 200 } 201 202 @Override 203 public void registerSchedule(Schedule schedule) { 204 registerSchedule(schedule, null); 205 } 206 207 @Override 208 public void registerSchedule(Schedule schedule, Map<String, Serializable> parameters) { 209 registry.addContribution(schedule); 210 if (scheduler == null) { 211 return; 212 } 213 Schedule contributed = registry.getSchedule(schedule); 214 if (contributed != null) { 215 schedule(contributed, parameters); 216 } else { 217 unschedule(schedule.getId()); 218 } 219 } 220 221 protected void schedule(Schedule schedule, Map<String, Serializable> parameters) { 222 log.info("Registering " + schedule); 223 224 EventJobFactory jobFactory = schedule.getJobFactory(); 225 JobDetail job = jobFactory.buildJob(schedule, parameters).build(); 226 Trigger trigger = jobFactory.buildTrigger(schedule).build(); 227 228 // This is useful when testing to avoid multiple threads: 229 // trigger = new SimpleTrigger(schedule.getId(), "nuxeo"); 230 231 try { 232 try { 233 schedule(schedule.getId(), job, trigger); 234 } catch (ObjectAlreadyExistsException e) { 235 // when jobs are persisted in a database, the job should already be there 236 // remove existing job and re-schedule 237 log.trace("Overriding scheduler with id: " + schedule.getId()); 238 boolean unregistered = unschedule(schedule.getId(), job.getKey()); 239 if (unregistered) { 240 schedule(schedule.getId(), job, trigger); 241 } 242 } 243 } catch (SchedulerException e) { 244 log.error(String.format("failed to schedule job with id '%s': %s", schedule.getId(), e.getMessage()), e); 245 } 246 } 247 248 @Override 249 public boolean unregisterSchedule(String id) { 250 log.info("Unregistering schedule with id" + id); 251 Schedule schedule = registry.getSchedule(id); 252 if (schedule == null) { 253 return false; 254 } 255 registry.removeContribution(schedule, true); 256 return unschedule(id); 257 } 258 259 protected void schedule(String id, JobDetail job, Trigger trigger) throws SchedulerException { 260 scheduler.scheduleJob(job, trigger); 261 // record the jobKey only if scheduleJob didn't throw 262 jobKeys.put(id, job.getKey()); 263 } 264 265 protected boolean unschedule(String id) { 266 return unschedule(id, jobKeys.remove(id)); 267 } 268 269 protected boolean unschedule(String id, JobKey jobKey) { 270 if (jobKey == null) { 271 // shouldn't happen but let's be robust 272 log.warn("Unscheduling null jobKey for id: " + id); 273 return false; 274 } 275 try { 276 return scheduler.deleteJob(jobKey); 277 } catch (SchedulerException e) { 278 log.error(String.format("failed to unschedule job with '%s': %s", id, e.getMessage()), e); 279 return true; // didn't exist so consider it removed (maybe concurrency) 280 } 281 } 282 283 @Override 284 public boolean unregisterSchedule(Schedule schedule) { 285 return unregisterSchedule(schedule.getId()); 286 } 287 288}