序
本文主要研究下quartz的QuartzSchedulerThread的调度以及quartz的性能问题。
SchedulerFactoryBean
spring-context-support-4.3.7.RELEASE-sources.jar!/org/springframework/scheduling/quartz/SchedulerFactoryBean.java
@Override public void afterPropertiesSet() throws Exception { if (this.dataSource == null && this.nonTransactionalDataSource != null) { this.dataSource = this.nonTransactionalDataSource; } if (this.applicationContext != null && this.resourceLoader == null) { this.resourceLoader = this.applicationContext; } // Create SchedulerFactory instance... SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass); initSchedulerFactory(schedulerFactory); if (this.resourceLoader != null) { // Make given ResourceLoader available for SchedulerFactory configuration. configTimeResourceLoaderHolder.set(this.resourceLoader); } if (this.taskExecutor != null) { // Make given TaskExecutor available for SchedulerFactory configuration. configTimeTaskExecutorHolder.set(this.taskExecutor); } if (this.dataSource != null) { // Make given DataSource available for SchedulerFactory configuration. configTimeDataSourceHolder.set(this.dataSource); } if (this.nonTransactionalDataSource != null) { // Make given non-transactional DataSource available for SchedulerFactory configuration. configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource); } // Get Scheduler instance from SchedulerFactory. try { this.scheduler = createScheduler(schedulerFactory, this.schedulerName); populateSchedulerContext(); if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) { // Use AdaptableJobFactory as default for a local Scheduler, unless when // explicitly given a null value through the "jobFactory" bean property. this.jobFactory = new AdaptableJobFactory(); } if (this.jobFactory != null) { if (this.jobFactory instanceof SchedulerContextAware) { ((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext()); } this.scheduler.setJobFactory(this.jobFactory); } } finally { if (this.resourceLoader != null) { configTimeResourceLoaderHolder.remove(); } if (this.taskExecutor != null) { configTimeTaskExecutorHolder.remove(); } if (this.dataSource != null) { configTimeDataSourceHolder.remove(); } if (this.nonTransactionalDataSource != null) { configTimeNonTransactionalDataSourceHolder.remove(); } } registerListeners(); registerJobsAndTriggers(); }
这里在afterPropertiesSet的时候,createScheduler(schedulerFactory, this.schedulerName);
SchedulerFactoryBean.createScheduler
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException { // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading. Thread currentThread = Thread.currentThread(); ClassLoader threadContextClassLoader = currentThread.getContextClassLoader(); boolean overrideClassLoader = (this.resourceLoader != null && !this.resourceLoader.getClassLoader().equals(threadContextClassLoader)); if (overrideClassLoader) { currentThread.setContextClassLoader(this.resourceLoader.getClassLoader()); } try { SchedulerRepository repository = SchedulerRepository.getInstance(); synchronized (repository) { Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null); Scheduler newScheduler = schedulerFactory.getScheduler(); if (newScheduler == existingScheduler) { throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!"); } if (!this.exposeSchedulerInRepository) { // Need to remove it in this case, since Quartz shares the Scheduler instance by default! SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName()); } return newScheduler; } } finally { if (overrideClassLoader) { // Reset original thread context ClassLoader. currentThread.setContextClassLoader(threadContextClassLoader); } } }
这里调用schedulerFactory.getScheduler()来创建
quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java
public Scheduler getScheduler() throws SchedulerException { if (cfg == null) { initialize(); } SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); if (sched != null) { if (sched.isShutdown()) { schedRep.remove(getSchedulerName()); } else { return sched; } } sched = instantiate(); return sched; }
这里调用instantiate来初始化。里头初始化了一个QuartzScheduler
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
QuartzScheduler
quartz-2.3.0-sources.jar!/org/quartz/core/QuartzScheduler.java
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); } this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); schedThreadExecutor.execute(this.schedThread); if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime); } jobMgr = new ExecutingJobsManager(); addInternalJobListener(jobMgr); errLogger = new ErrorLogger(); addInternalSchedulerListener(errLogger); signaler = new SchedulerSignalerImpl(this, this.schedThread); getLog().info("Quartz Scheduler v." + getVersion() + " created."); }
这个又初始化了QuartzSchedulerThread
QuartzSchedulerThread
这个是调度的核心类
quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerThread.java
public class QuartzSchedulerThread extends Thread { //...... /** ** The main processing loop of the
*/ @Override public void run() { int acquiresFailed = 0; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... ListQuartzSchedulerThread
. *triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) { if (acquiresFailed == 0) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List bndles = new ArrayList (); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }}
这里有几个关键的信息
blockForAvailableThreads
就是qsRsrcs.getThreadPool().blockForAvailableThreads(),如果线程池满了的话,则会阻塞,因而会影响调度的准确性。
acquireNextTriggers
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())
timeUntilContinue
这里进行等待
long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } }
而getRandomizedIdleWaitTime如下
private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L; private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME; private long getRandomizedIdleWaitTime() { return idleWaitTime - random.nextInt(idleWaitVariablness); }
idleWaitTime默认是30秒,通过org.quartz.scheduler.idleWaitTime这个来配置
computeDelayForRepeatedErrors
// wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } }
这里优化了对数据库挂的时候,避免频繁轮询的问题
private static final long MIN_DELAY = 20; private static final long MAX_DELAY = 600000; private static long computeDelayForRepeatedErrors(JobStore jobStore, int acquiresFailed) { long delay; try { delay = jobStore.getAcquireRetryDelay(acquiresFailed); } catch (Exception ignored) { // we're trying to be useful in case of error states, not cause // additional errors.. delay = 100; } // sanity check per getAcquireRetryDelay specification if (delay < MIN_DELAY) delay = MIN_DELAY; if (delay > MAX_DELAY) delay = MAX_DELAY; return delay; }
connectionProvider.class
比如
org.quartz.jobStore.dataSource=myDataSourceNameorg.quartz.dataSource.myDataSourceName.connectionProvider.class=org.quartz.utils.HikariCpPoolingConnectionProviderorg.quartz.dataSource.myDataSourceName.driver=org.postgresql.Driverorg.quartz.dataSource.myDataSourceName.URL=jdbc:postgresql://192.168.99.100:5432/postgresorg.quartz.dataSource.myDataSourceName.user:postgresorg.quartz.dataSource.myDataSourceName.password:postgresorg.quartz.dataSource.myDataSourceName.maxConnection:30org.quartz.dataSource.myDataSourceName.validationQuery: select 1
这样设置,貌似报错
Caused by: org.quartz.SchedulerException: ConnectionProvider class 'org.quartz.utils.HikariCpPoolingConnectionProvider' could not be instantiated. at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:936) ~[quartz-2.3.0.jar:na] at org.quartz.impl.StdSchedulerFactory.getScheduler(StdSchedulerFactory.java:1559) ~[quartz-2.3.0.jar:na]
quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java
ConnectionProvider cp = null; try { cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("ConnectionProvider class '" + cpClass + "' could not be instantiated.", e); throw initException; }
不过我是在spring中直接配置连接池的,详见,因而就不管了
性能问题
批量参数
采用jdbc store的话,定时轮询数据库,比较消耗时间,一种解决方案就是尽量批量查询,这里可以设置两个参数如下
org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow=1 ## 时间窗口,默认0org.quartz.scheduler.batchTriggerAcquisitionMaxCount=5 ##这里默认是1,可以设置跟thread pool的core size相等
quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerResources.java
batchTriggerAcquisitionMaxCount对应的就是QuartzSchedulerResources中的maxBatchSize
batchTriggerAcquisitionFireAheadTimeWindow对应的就是QuartzSchedulerResources中的batchTimeWindow
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
这里实际取availThreadCount与maxBatchSize的最小值作为maxCount
最后作用的是selectTriggerToAcquirequartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
protected ListacquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List acquiredTriggers = new ArrayList (); Set acquiredJobKeysForNoConcurrentExec = new HashSet (); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet. if (keys == null || keys.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; // ... // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list return acquiredTriggers; }
sql实例
quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java
public ListselectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException { PreparedStatement ps = null; ResultSet rs = null; List nextTriggers = new LinkedList (); try { ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE)); // Set max rows to retrieve if (maxCount < 1) maxCount = 1; // we want at least one trigger back. ps.setMaxRows(maxCount); // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need. // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception! ps.setFetchSize(maxCount); ps.setString(1, STATE_WAITING); ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan))); ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan))); rs = ps.executeQuery(); while (rs.next() && nextTriggers.size() <= maxCount) { nextTriggers.add(triggerKey( rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP))); } return nextTriggers; } finally { closeResultSet(rs); closeStatement(ps); } }
这里的maxCount对应ps.setMaxRows(maxCount)
noLaterThan = noLaterThan + timeWindow ; noEarlierThan = getMisfireTime() 分别作用在NEXT_FIRE_TIME的范围上NEXT_FIRE_TIME <= noLaterThan && NEXT_FIRE_TIME >= noEarlierThan查询sql实例
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = 'schedulerFactoryBean' AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
小结
由于是定时轮询trigger,那么在采用jdbc store的话,则这里的轮询可能是个不小的开销,对数据库有潜在的压力,另外在数据库挂掉的时候,可能也有问题,虽然做了延迟等待。另外尽量配置连接池,还有如果需要的话,调整batch参数。