博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊quartz的调度及性能
阅读量:5835 次
发布时间:2019-06-18

本文共 25317 字,大约阅读时间需要 84 分钟。

本文主要研究下quartz的QuartzSchedulerThread的调度以及quartz的性能问题。

SchedulerFactoryBean

spring-context-support-4.3.7.RELEASE-sources.jar!/org/springframework/scheduling/quartz/SchedulerFactoryBean.java

@Override    public void afterPropertiesSet() throws Exception {        if (this.dataSource == null && this.nonTransactionalDataSource != null) {            this.dataSource = this.nonTransactionalDataSource;        }        if (this.applicationContext != null && this.resourceLoader == null) {            this.resourceLoader = this.applicationContext;        }        // Create SchedulerFactory instance...        SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);        initSchedulerFactory(schedulerFactory);        if (this.resourceLoader != null) {            // Make given ResourceLoader available for SchedulerFactory configuration.            configTimeResourceLoaderHolder.set(this.resourceLoader);        }        if (this.taskExecutor != null) {            // Make given TaskExecutor available for SchedulerFactory configuration.            configTimeTaskExecutorHolder.set(this.taskExecutor);        }        if (this.dataSource != null) {            // Make given DataSource available for SchedulerFactory configuration.            configTimeDataSourceHolder.set(this.dataSource);        }        if (this.nonTransactionalDataSource != null) {            // Make given non-transactional DataSource available for SchedulerFactory configuration.            configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);        }        // Get Scheduler instance from SchedulerFactory.        try {            this.scheduler = createScheduler(schedulerFactory, this.schedulerName);            populateSchedulerContext();            if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) {                // Use AdaptableJobFactory as default for a local Scheduler, unless when                // explicitly given a null value through the "jobFactory" bean property.                this.jobFactory = new AdaptableJobFactory();            }            if (this.jobFactory != null) {                if (this.jobFactory instanceof SchedulerContextAware) {                    ((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext());                }                this.scheduler.setJobFactory(this.jobFactory);            }        }        finally {            if (this.resourceLoader != null) {                configTimeResourceLoaderHolder.remove();            }            if (this.taskExecutor != null) {                configTimeTaskExecutorHolder.remove();            }            if (this.dataSource != null) {                configTimeDataSourceHolder.remove();            }            if (this.nonTransactionalDataSource != null) {                configTimeNonTransactionalDataSourceHolder.remove();            }        }        registerListeners();        registerJobsAndTriggers();    }

这里在afterPropertiesSet的时候,createScheduler(schedulerFactory, this.schedulerName);

SchedulerFactoryBean.createScheduler

protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)            throws SchedulerException {        // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.        Thread currentThread = Thread.currentThread();        ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();        boolean overrideClassLoader = (this.resourceLoader != null &&                !this.resourceLoader.getClassLoader().equals(threadContextClassLoader));        if (overrideClassLoader) {            currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());        }        try {            SchedulerRepository repository = SchedulerRepository.getInstance();            synchronized (repository) {                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);                Scheduler newScheduler = schedulerFactory.getScheduler();                if (newScheduler == existingScheduler) {                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");                }                if (!this.exposeSchedulerInRepository) {                    // Need to remove it in this case, since Quartz shares the Scheduler instance by default!                    SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());                }                return newScheduler;            }        }        finally {            if (overrideClassLoader) {                // Reset original thread context ClassLoader.                currentThread.setContextClassLoader(threadContextClassLoader);            }        }    }

这里调用schedulerFactory.getScheduler()来创建

quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java

public Scheduler getScheduler() throws SchedulerException {        if (cfg == null) {            initialize();        }        SchedulerRepository schedRep = SchedulerRepository.getInstance();        Scheduler sched = schedRep.lookup(getSchedulerName());        if (sched != null) {            if (sched.isShutdown()) {                schedRep.remove(getSchedulerName());            } else {                return sched;            }        }        sched = instantiate();        return sched;    }

这里调用instantiate来初始化。里头初始化了一个QuartzScheduler

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

QuartzScheduler

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzScheduler.java

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)        throws SchedulerException {        this.resources = resources;        if (resources.getJobStore() instanceof JobListener) {            addInternalJobListener((JobListener)resources.getJobStore());        }        this.schedThread = new QuartzSchedulerThread(this, resources);        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();        schedThreadExecutor.execute(this.schedThread);        if (idleWaitTime > 0) {            this.schedThread.setIdleWaitTime(idleWaitTime);        }        jobMgr = new ExecutingJobsManager();        addInternalJobListener(jobMgr);        errLogger = new ErrorLogger();        addInternalSchedulerListener(errLogger);        signaler = new SchedulerSignalerImpl(this, this.schedThread);                getLog().info("Quartz Scheduler v." + getVersion() + " created.");    }

这个又初始化了QuartzSchedulerThread

QuartzSchedulerThread

这个是调度的核心类

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerThread.java

public class QuartzSchedulerThread extends Thread {    //......    /**     * 

* The main processing loop of the QuartzSchedulerThread. *

*/ @Override public void run() { int acquiresFailed = 0; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List
triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) { if (acquiresFailed == 0) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List
bndles = new ArrayList
(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List
res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }}

这里有几个关键的信息

blockForAvailableThreads

就是qsRsrcs.getThreadPool().blockForAvailableThreads(),如果线程池满了的话,则会阻塞,因而会影响调度的准确性。

acquireNextTriggers

triggers = qsRsrcs.getJobStore().acquireNextTriggers(                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())

timeUntilContinue

这里进行等待

long now = System.currentTimeMillis();                long waitTime = now + getRandomizedIdleWaitTime();                long timeUntilContinue = waitTime - now;                synchronized(sigLock) {                    try {                      if(!halted.get()) {                        // QTZ-336 A job might have been completed in the mean time and we might have                        // missed the scheduled changed signal by not waiting for the notify() yet                        // Check that before waiting for too long in case this very job needs to be                        // scheduled very soon                        if (!isScheduleChanged()) {                          sigLock.wait(timeUntilContinue);                        }                      }                    } catch (InterruptedException ignore) {                    }                }

而getRandomizedIdleWaitTime如下

private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;    private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;    private long getRandomizedIdleWaitTime() {        return idleWaitTime - random.nextInt(idleWaitVariablness);    }

idleWaitTime默认是30秒,通过org.quartz.scheduler.idleWaitTime这个来配置

computeDelayForRepeatedErrors

// wait a bit, if reading from job store is consistently                // failing (e.g. DB is down or restarting)..                if (acquiresFailed > 1) {                    try {                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);                        Thread.sleep(delay);                    } catch (Exception ignore) {                    }                }

这里优化了对数据库挂的时候,避免频繁轮询的问题

private static final long MIN_DELAY = 20;    private static final long MAX_DELAY = 600000;    private static long computeDelayForRepeatedErrors(JobStore jobStore, int acquiresFailed) {        long delay;        try {            delay = jobStore.getAcquireRetryDelay(acquiresFailed);        } catch (Exception ignored) {            // we're trying to be useful in case of error states, not cause            // additional errors..            delay = 100;        }        // sanity check per getAcquireRetryDelay specification        if (delay < MIN_DELAY)            delay = MIN_DELAY;        if (delay > MAX_DELAY)            delay = MAX_DELAY;        return delay;    }

connectionProvider.class

比如

org.quartz.jobStore.dataSource=myDataSourceNameorg.quartz.dataSource.myDataSourceName.connectionProvider.class=org.quartz.utils.HikariCpPoolingConnectionProviderorg.quartz.dataSource.myDataSourceName.driver=org.postgresql.Driverorg.quartz.dataSource.myDataSourceName.URL=jdbc:postgresql://192.168.99.100:5432/postgresorg.quartz.dataSource.myDataSourceName.user:postgresorg.quartz.dataSource.myDataSourceName.password:postgresorg.quartz.dataSource.myDataSourceName.maxConnection:30org.quartz.dataSource.myDataSourceName.validationQuery: select 1

这样设置,貌似报错

Caused by: org.quartz.SchedulerException: ConnectionProvider class 'org.quartz.utils.HikariCpPoolingConnectionProvider' could not be instantiated.    at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:936) ~[quartz-2.3.0.jar:na]    at org.quartz.impl.StdSchedulerFactory.getScheduler(StdSchedulerFactory.java:1559) ~[quartz-2.3.0.jar:na]

quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java

ConnectionProvider cp = null;                try {                    cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();                } catch (Exception e) {                    initException = new SchedulerException("ConnectionProvider class '" + cpClass                            + "' could not be instantiated.", e);                    throw initException;                }

不过我是在spring中直接配置连接池的,详见,因而就不管了

性能问题

批量参数

采用jdbc store的话,定时轮询数据库,比较消耗时间,一种解决方案就是尽量批量查询,这里可以设置两个参数如下

org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow=1 ## 时间窗口,默认0org.quartz.scheduler.batchTriggerAcquisitionMaxCount=5 ##这里默认是1,可以设置跟thread pool的core size相等

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerResources.java

batchTriggerAcquisitionMaxCount对应的就是QuartzSchedulerResources中的maxBatchSize

batchTriggerAcquisitionFireAheadTimeWindow对应的就是QuartzSchedulerResources中的batchTimeWindow

triggers = qsRsrcs.getJobStore().acquireNextTriggers(                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

这里实际取availThreadCount与maxBatchSize的最小值作为maxCount

最后作用的是selectTriggerToAcquire
quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

protected List
acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List
acquiredTriggers = new ArrayList
(); Set
acquiredJobKeysForNoConcurrentExec = new HashSet
(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { List
keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet. if (keys == null || keys.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; // ... // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list return acquiredTriggers; }

sql实例

quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java

public List
selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException { PreparedStatement ps = null; ResultSet rs = null; List
nextTriggers = new LinkedList
(); try { ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE)); // Set max rows to retrieve if (maxCount < 1) maxCount = 1; // we want at least one trigger back. ps.setMaxRows(maxCount); // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need. // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception! ps.setFetchSize(maxCount); ps.setString(1, STATE_WAITING); ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan))); ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan))); rs = ps.executeQuery(); while (rs.next() && nextTriggers.size() <= maxCount) { nextTriggers.add(triggerKey( rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP))); } return nextTriggers; } finally { closeResultSet(rs); closeStatement(ps); } }

这里的maxCount对应ps.setMaxRows(maxCount)

noLaterThan = noLaterThan + timeWindow ; noEarlierThan = getMisfireTime() 分别作用在NEXT_FIRE_TIME的范围上
NEXT_FIRE_TIME <= noLaterThan && NEXT_FIRE_TIME >= noEarlierThan

查询sql实例

SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = 'schedulerFactoryBean' AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

小结

由于是定时轮询trigger,那么在采用jdbc store的话,则这里的轮询可能是个不小的开销,对数据库有潜在的压力,另外在数据库挂掉的时候,可能也有问题,虽然做了延迟等待。另外尽量配置连接池,还有如果需要的话,调整batch参数。

doc

转载地址:http://byycx.baihongyu.com/

你可能感兴趣的文章
《HTML5与CSS3实战指南》——导读
查看>>
《趣题学算法》—第1章1.4节图的性质
查看>>
《Unreal Engine 4蓝图可视化编程》一第1章 使用蓝图进行对象交互
查看>>
《Spring 5 官方文档》18. Web MVC 框架(七)
查看>>
创建文件,并格式化输出最后修改时间
查看>>
《MINECRAFT我的世界 新手完全攻略(第3版)》一第1章 入手指南
查看>>
Java视角理解系统结构
查看>>
《QTP自动化测试权威指南(第二版)》—第2章2.2节索引标签(Index Tab)
查看>>
Java中用字节数组表示整数和用整数表示字节数组
查看>>
《Axure原型蓝图》一1.2 “Adaptive Views”(自适应视图)
查看>>
基于PostgreSQL和地理位置信息打造的洞察平台 - CARTO
查看>>
navicat工具来将SQL Server数据迁移到MySQL
查看>>
java ME、java SE和java EE的区别
查看>>
《C语言及程序设计》实践参考——文件中的符号个数
查看>>
centos7 升级python 3.4
查看>>
【Spark Summit EU 2016】汽车研发中基于Spark的时间序列分析
查看>>
码栈开发手册(二)---编码方式开发(中级级课程④)
查看>>
《大话设计模式》Python版代码实现
查看>>
创建灵活的用户界面
查看>>
Powershell&TFS_Part 1
查看>>