解决方案

Java - 一分钟掌握定时任务 - 6 - Quartz定时任务

seo靠我 2023-09-25 09:41:49

作者:Mars酱

声明:本文章由Mars酱原创,部分内容来源于网络,如有疑问请联系本人。

转载:欢迎转载,转载前先请联系我!

前言

前几篇介绍了单体架构的定时任务解决方式,但是现代软件架构由于业务复杂度高,业SEO靠我务的耦合性太强,已经由单体架构拆分成了分布式架构。因此,定时任务的架构也随之修改。而Quartz是分布式定时任务解决方案中使用简单,结构清晰,且不依赖第三方分布式调度中间件的。上车,mars酱带你车里SEO靠我细说~

角色介绍

Quartz入门使用的角色不多,三个角色足够,分别是:

Scheduler:调度器。用来负责任务的调度;

Job:任务。这是一个接口,业务代码继承Job接口并实现它的execute方法,是业SEO靠我务执行的主体部分;

Trigger: 触发器。也是个接口,有两个触发器比较关键,一个是SimpleTrigger,另一个是CronTrigger。前者支持简单的定时,比如:按时、按秒等;后者直接支持crSEO靠我on表达式。下面我们从官方的源代码入手,看看Quartz如何做到分布式的。

官方例子

官方源代码down下来之后,有个examples文件夹:

example1是入门级中最简单的。就两个java文件,一个HSEO靠我elloJob:

package org.quartz.examples.example1;import java.util.Date;import org.slf4j.Logger; SEO靠我import org.slf4j.LoggerFactory; import org.quartz.Job; import org.quartz.JobExecutioSEO靠我nContext; import org.quartz.JobExecutionException;/*** <p>* This is just a simple job that sSEO靠我ays "Hello" to the world.* </p>* * @author Bill Kratzer*/ public class HelloJob implements JSEO靠我ob {private static Logger _log = LoggerFactory.getLogger(HelloJob.class);/*** <p>* Empty constructorSEO靠我 for job initilization* </p>* <p>* Quartz requires a public empty constructor so that the* schedulerSEO靠我 can instantiate the class whenever it needs.* </p>*/public HelloJob() {}/*** <p>* Called by the <coSEO靠我de>{@link org.quartz.Scheduler}</code> when a* <code>{@link org.quartz.Trigger}</code> fires that isSEO靠我 associated with* the <code>Job</code>.* </p>* * @throws JobExecutionException* if there is an excepSEO靠我tion while executing the job.*/public void execute(JobExecutionContext context)throws JobExecutionExSEO靠我ception {// Say Hello to the World and display the date/time_log.info("Hello World! - " + new Date()SEO靠我);}}

另一个SimpleExample:

package org.quartz.examples.example1;import org.quartz.JobDetail;import org.quaSEO靠我rtz.Scheduler;import org.quartz.SchedulerFactory;import org.quartz.Trigger;import org.quartz.impl.StSEO靠我dSchedulerFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Date;imporSEO靠我t static org.quartz.DateBuilder.evenMinuteDate;import static org.quartz.JobBuilder.newJob;import staSEO靠我tic org.quartz.TriggerBuilder.newTrigger;/*** This Example will demonstrate how to start and shutdowSEO靠我n the Quartz scheduler and how to schedule a job to run in* Quartz.** @author Bill Kratzer*/public cSEO靠我lass SimpleExample {public void run() throws Exception {Logger log = LoggerFactory.getLogger(SimpleESEO靠我xample.class);log.info("------- Initializing ----------------------");// 1. 创建一个schedulerSchedulerFaSEO靠我ctory sf = new StdSchedulerFactory();Scheduler sched = sf.getScheduler();log.info("------- InitializSEO靠我ation Complete -----------");// computer a time that is on the next round minuteDate runTime = evenMSEO靠我inuteDate(new Date());log.info("------- Scheduling Job -------------------");// 2. 指定一个jobJobDetail SEO靠我job = newJob(HelloJob.class).withIdentity("job1", "group1").build();// 3. 指定一个triggerTrigger triggerSEO靠我 = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();// 4. 绑定job和triggerschedSEO靠我.scheduleJob(job, trigger);log.info(job.getKey() + " will run at: " + runTime);// 5. 执行sched.start()SEO靠我;log.info("------- Started Scheduler -----------------");// wait long enough so that the scheduler aSEO靠我s an opportunity to// run the job!log.info("------- Waiting 65 seconds... -------------");try {// waSEO靠我it 65 seconds to show jobThread.sleep(65L * 1000L);// executing...} catch (Exception e) {//}// shut SEO靠我down the schedulerlog.info("------- Shutting Down ---------------------");sched.shutdown(true);log.iSEO靠我nfo("------- Shutdown Complete -----------------");}public static void main(String[] args) throws ExSEO靠我ception {SimpleExample example = new SimpleExample();example.run();}}

整个SimpleExample只有五个步骤:

创建SchedulSEO靠我er,这是一个调度器,例子中使用调度器工厂来创建一个调度器;创建一个Job。实际上Job就是那个HelloJob,但是这里把HelloJob丢给了JobDetail对象,Job接口本身只有一个execSEO靠我ute函数,没有其他的属性了,如果需要附加其他属性,JobDetail就支持,比如我们需要往Job中传递参数,JobDetail中提供了一个JobDataMap。当Job在运行的时候,execute函SEO靠我数里面的就能获取到JobDetail对象,并将设置的数据传递给Job接口的实现;创建一个Trigger。Trigger对象主责是任务的执行时间,比如官方例子中的startAt函数,就指定了具体的运行时SEO靠我间,还有startNow(立即执行);用scheduler绑定Job和Trigger;执行scheduler。

Quartz的使用是不是简单又清晰?Job是任务,单一职责,不做任何其他事情。TriggeSEO靠我r负责执行的频率等等属性。Scheduler负责按照Trigger的规则去执行Job的内容。各自部分的功能符合单一原则。

但是,到这里都不是分布式的方式,依然是单体架构的。那么,Quartz如何做到分布SEO靠我式的呢?

Quartz如何分布式?

Quartz的分布式实现方式并不依赖其他分布式协调管理中间件完成,而是使用数据锁来实现。使用数据做协调管理中间件的唯一的前提是:需要把集群的每台机器时间校对一致。

QuaSEO靠我rtz数据库核心表如下:

表名功能描述QRTZ_CALENDARS存储Quartz的Calendar信息QRTZ_CRON_TRIGGERS存储CronTrigger,包括Cron表达式和时区信息QRTSEO靠我Z_FIRED_TRIGGERS存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息QRTZ_PAUSED_TRIGGER_GRPS存储已暂停的Trigger组的信息QRTZ_SCHESEO靠我DULER_STATE存储少量的有关Scheduler的状态信息,和别的Scheduler实例QRTZ_LOCKS存储程序的悲观锁的信息QRTZ_JOB_DETAILS存储每一个已配置的Job的详细信SEO靠我息QRTZ_JOB_LISTENERS存储有关已配置的JobListener的信息QRTZ_SIMPLE_TRIGGERS存储简单的Trigger,包括重复次数、间隔、以及已触的次数QRTZ_BLOGSEO靠我_TRIGGERSTrigger作为Blob类型存储QRTZ_TRIGGER_LISTENERS存储已配置的TriggerListener的信息QRTZ_TRIGGERS存储已配置的Trigger的信SEO靠我

字体加粗的QRTZ_LOCKS表是一个悲观锁的存储实现,Quartz认为每条记录都可能会产生并发冲突。以上表的SQL可以在quartz目录中找到:

找到自己喜欢的数据库品牌,并创建好表即可。

跟着官方例SEO靠我子看源码

我们从Hello的execute方法开始,反着找,继续看看分布式的方式如何实现。为什么反着找呢?因为这里是我们业务实现的主体内容,Quartz框架最终必须要调用到这个execute的具体实现的SEO靠我。我们找到调用execute的地方有很多处:

从类名来分析,DirectoryScanJob看着是目录扫描任务,FileScanJob直译是文件扫描任务,SendMailJob是发送邮件任务,最后只剩那SEO靠我个JobRunShell,毕竟翻译过来叫“任务运行の核心”啊。进入JobRunShell,找到调用execute函数的部分,execute函数被包裹在一个一百三十多行长又长的run函数中:

publicSEO靠我 void run() {qs.addInternalSchedulerListener(this);try {// ...省略很多源代码do {// ...省略很多源代码try {begin();}SEO靠我 catch (SchedulerException se) {// ... 省略源代码}// ... 省略源代码try {log.debug("Calling execute on job " + SEO靠我jobDetail.getKey());// 这里负责执行job的execute函数job.execute(jec);endTime = System.currentTimeMillis();} caSEO靠我tch (JobExecutionException jee) {// ... 省略源代码} catch (Throwable e) {// ... 省略源代码}// ...省略很多源代码try {cSEO靠我omplete(true);} catch (SchedulerException se) {// ... 省略源代码}// ...省略很多源代码} while (true);} finally {qSEO靠我s.removeInternalSchedulerListener(this);} }

可以看到run中间的execute被夹在一个begin函数和comlete函数中,而begin和cSEO靠我omplete的实现是一个基于JTA事务的JTAJobRunShell的实现来完成的。JobRunShell是一个Runnable接口的实现,那么刚刚的run方法,必定在某处启用了线程(池)的starSEO靠我t方法。

mars酱继续跟踪查找源代码,在QuartzSchedulerThread中的run函数中,找到JobRunShell的调用部分:

@Override public void rSEO靠我un() {int acquiresFailed = 0;while (!halted.get()) {// ...省略很多源代码// 源代码279行int availThreadCount = qsSEO靠我Rsrcs.getThreadPool().blockForAvailableThreads();// ...省略很多源代码if(availThreadCount > 0) { // ...省略很多源SEO靠我代码// 取下一个trigger,周期是30秒取一次triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, MSEO靠我ath.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());// ...省略很多源代码// SEO靠我触发triggerList<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);// ...省略很多源代码/SEO靠我/ 释放trigger,当bndle的结果是null就释放triggerif (bndle == null) {qsRsrcs.getJobStore().releaseAcquiredTriggerSEO靠我(triggers.get(i));continue;}// ...省略很多源代码JobRunShell shell = null;try {shell = qsRsrcs.getJobRunShelSEO靠我lFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.geSEO靠我tJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstructioSEO靠我n.SET_ALL_JOB_TRIGGERS_ERROR);continue;}// 这里调用JobRunShellif (qsRsrcs.getThreadPool().runInThread(shSEO靠我ell) == false) {// ...省略很多源代码}} } }

QuartzSchedulerThread的run函数就是核心处理流程了,qsRsrcs.getThreadPooSEO靠我l().runInThread(shell)内部就根据具体的SimpleThreadPool或者ZeroSizeThreadPool来执行run函数,while循环基本就是不停的在轮询不断的去拿triSEO靠我gger,然后判断trigger的时间是不是到了,再按照时间trigger的时间规则执行Job,最后再标记为完成、释放trigger。

Trigger的处理

上面的逻辑都是通过qsRsrcs.getJobSEO靠我Store()得到的对象去处理Trigger的,返回对象是JobStore。任意查看qsRsrcs.getJobStore()调用的函数,比如:releaseAcquiredTriggerJobStoSEO靠我re,它的实现有两个是比较重要的:一个是RAMJobStore,一个是JobStoreSupport。前者是RAM作为存储介质,作者还特意写上了这样一段注释:

This class implementsSEO靠我 a JobStore that utilizes RAM as its storage device.

As you should know, the ramification of this is SEO靠我that access is extrememly fast, but the data is completely volatile - therefore this JobStore shouldSEO靠我 not be used if true persistence between program shutdowns is required.

这段英文的央视翻译:

这个类实现了一个使用RAM作为存储设备SEO靠我的JobStore。

您应该知道,这样做的后果是访问速度非常快,但是数据是完全不稳定的——因此,如果需要在程序关闭之间实现真正的持久性,则不应该使用这个JobStore。

而且内存存储也无法分布式处理吧?SEO靠我所以,mars酱选择了观看JobStoreSupport:

从import可以知道,这个玩意是连接了数据库的,所以呢,acquireNextTriggers、triggersFired、releaseASEO靠我cquiredTrigger这些方法负责具体trigger的相关操作,都最终会执行到JobStoreSupport的第3844行的executeInNonManagedTXLock函数:

/*** ExSEO靠我ecute the given callback having optionally acquired the given lock.* This uses the non-managed transSEO靠我action connection.* * @param lockName The name of the lock to acquire, for example* "TRIGGER_ACCESS"SEO靠我. If null, then no lock is acquired, but the* lockCallback is still executed in a non-managed transaSEO靠我ction. */protected <T> T executeInNonManagedTXLock(String lockName, TransactionCallback<T> txCallbacSEO靠我k, final TransactionValidator<T> txValidator) throws JobPersistenceException {boolean transOwner = fSEO靠我alse;Connection conn = null;try {if (lockName != null) {// If we arent using db locks, then delay geSEO靠我tting DB connection // until after acquiring the lock since it isnt needed.if (getLockHandler().requSEO靠我iresConnection()) {conn = getNonManagedTXConnection();}transOwner = getLockHandler().obtainLock(connSEO靠我, lockName);}if (conn == null) {conn = getNonManagedTXConnection();}final T result = txCallback.execSEO靠我ute(conn);try {commitConnection(conn);} catch (JobPersistenceException e) {rollbackConnection(conn);SEO靠我if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<BooleanSEO靠我>() {@Overridepublic Boolean execute(Connection conn) throws JobPersistenceException {return txValidSEO靠我ator.validate(conn, result);}})) {throw e;}}Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompSEO靠我letion();if(sigTime != null && sigTime >= 0) {signalSchedulingChangeImmediately(sigTime);}return resSEO靠我ult;} catch (JobPersistenceException e) {rollbackConnection(conn);throw e;} catch (RuntimeException SEO靠我e) {rollbackConnection(conn);throw new JobPersistenceException("Unexpected runtime exception: "+ e.gSEO靠我etMessage(), e);} finally {try {releaseLock(lockName, transOwner);} finally {cleanupConnection(conn)SEO靠我;}}}

整体的过程简要说明就是:获取数据库连接,给需要执行的trigger加锁,处理完之后再释放锁。

结合起来

结合前面的流程来看,一个调度器在执行前如果涉及到分布式的情况,流程如下:

首先要获取QUARTSEO靠我Z_LOCKS表中对应的锁(在executeInNonManagedTXLock函数的getLockHandler().obtainLock(conn, lockName)中);获取锁后执行QuartSEO靠我zSchedulerThread中的JobRunShell,完成任务的执行;最后QuartzSchedulerThread中调用triggeredJobComplete函数,锁被释放,在executeSEO靠我InNonManagedTXLock函数的releaseLock(lockName, transOwner)中执行;

集群中的每一个调度器实例都遵循这样的操作流程。

总结

Quartz 是一款用于分布式系统SEO靠我的高性能调度框架,它采用了数据库作为分布式锁机制来保证同一时刻只有一个 Scheduler 实例访问数据库中的 Trigger。

在 Quartz 中,调度器线程会争抢访问数据库中的 Trigger,以SEO靠我确保在同一时刻只有一个调度器线程执行某个 Trigger 的操作。如果有多个调度器线程同时尝试访问同一个 Trigger,它们会相互等待对方释放锁。当一个调度器线程获得了锁,它就可以访问数据库并执行相SEO靠我应的操作。

另外,Quartz 还采用了悲观锁的策略来避免死锁的发生。当一个调度器线程尝试获取锁时,如果锁已经被其他线程占用,那么这个线程会等待,直到有线程释放了锁。如果在等待过程中没有其他线程释放锁,SEO靠我那么这个线程就会一直等待下去,直到调度器重新分配了锁。

总之,Quartz 的分布式调度原理是通过数据库锁和悲观锁来实现的,以保证同一时刻只有一个调度器线程访问数据库中的 Trigger,从而提高系统的SEO靠我性能和可靠性。

“SEO靠我”的新闻页面文章、图片、音频、视频等稿件均为自媒体人、第三方机构发布或转载。如稿件涉及版权等问题,请与 我们联系删除或处理,客服邮箱:html5sh@163.com,稿件内容仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同 其观点或证实其内容的真实性。

网站备案号:浙ICP备17034767号-2