摘要:線程池任務(wù)隊(duì)列構(gòu)造方法,實(shí)例化時(shí)啟動(dòng)線程設(shè)置任務(wù)隊(duì)列,用于任務(wù)重新入隊(duì)任務(wù)入隊(duì)從延遲隊(duì)列中獲取任務(wù)利用線程池執(zhí)行任務(wù)實(shí)現(xiàn)了接口,執(zhí)行實(shí)際的業(yè)務(wù)并決定任務(wù)是否重新進(jìn)入延遲隊(duì)列。
前言
接入微信支付的時(shí)候,看到微信支付的回調(diào)是按照某種頻率去回調(diào)的,
像15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h這樣,其中有一次成功就不會(huì)再回調(diào)。
于是在想怎么用Java做這個(gè)事情。
有定時(shí)任務(wù)這類功能的框架像Spring和Quartz貌似都沒(méi)有直接提供以上的功能。
也是出于想練手的目的,決定自己寫(xiě)一寫(xiě)。
// 具體的業(yè)務(wù) BaseJob task = new BaseJob() { // 任務(wù)執(zhí)行的次數(shù)(模擬真實(shí)業(yè)務(wù)上的退出) int runTime = 1; @Override public void run() { // 業(yè)務(wù)邏輯 System.out.println("hello world"); // 這里模擬了微信回調(diào)成功,任務(wù)完成 if (runTime++ > 3) { this.setExit(true); } } };
/** * 測(cè)試按照指定時(shí)間隔執(zhí)行某個(gè)任務(wù) * @throws IOException */ @Test public void test1() throws IOException { // 新建一個(gè)產(chǎn)生指定時(shí)間的延遲時(shí)間生成器,內(nèi)部就是個(gè)隊(duì)列 DesignatDTGenerator designatDTGenerator = new DesignatDTGenerator(); // 設(shè)置時(shí)間間隔 designatDTGenerator.addDelayTime(1_000) // 1秒后執(zhí)行 .addDelayTime(4_000) // 距離上次執(zhí)行4秒后執(zhí)行 .addDelayTime(15_000) // 距離上次執(zhí)行15秒后執(zhí)行 .addDelayTime(180_000) // 距離上次執(zhí)行3分鐘后執(zhí)行 .addDelayTime(180_000) // 距離上次執(zhí)行3分鐘后執(zhí)行 .addDelayTime(360_000) // 距離上次執(zhí)行6分鐘后執(zhí)行 .addDelayTime(3_600_000); // 距離上次執(zhí)行1小時(shí)后執(zhí)行 // 構(gòu)造一個(gè)提交的任務(wù),傳入具體的業(yè)務(wù)對(duì)象task,傳入延遲時(shí)間生成器designatDTGenerator DelayTimeJob delayTimeJob = new DelayTimeJob(task, designatDTGenerator); // 新建一個(gè)執(zhí)行器,執(zhí)行器可以重復(fù)使用,每次提交新的任務(wù)即可 JobActuator actuator = new JobActuator(); // 提交任務(wù),開(kāi)始執(zhí)行任務(wù) actuator.addJob(delayTimeJob); // 阻塞主線程,方便查看運(yùn)行結(jié)果 System.in.read(); }
/** * 測(cè)試按照固定時(shí)間間隔執(zhí)行某個(gè)任務(wù) * 只是延遲時(shí)間生成器不同而已,可以達(dá)到不同的調(diào)用效果 * @throws IOException */ @Test public void test2() throws IOException { // 新建一個(gè)執(zhí)行器 JobActuator actuator = new JobActuator(); // 新建一個(gè)產(chǎn)生固定時(shí)間的延遲時(shí)間生成器,每3s執(zhí)行一次 FixedRateDTGenerator fixedRateDTGenerator = new FixedRateDTGenerator(3000); // 新建一個(gè)任務(wù) DelayTimeJob delayTimeJob = new DelayTimeJob(task, fixedRateDTGenerator); // 提交任務(wù),開(kāi)始執(zhí)行任務(wù) actuator.addJob(delayTimeJob); // 阻塞主線程,方便查看運(yùn)行結(jié)果 System.in.read(); }類圖 各個(gè)類的作用
項(xiàng)目地址
JobActuator
任務(wù)執(zhí)行器,本身繼承了Thread,職責(zé)是在run方法中不斷從延遲任務(wù)隊(duì)列DelayQueue中獲取延遲到期的任務(wù),
再交由線程池ExecutorService執(zhí)行。延遲效果的都是依靠DelayQueue實(shí)現(xiàn)。
public class JobActuator extends Thread { /** 線程池 */ ExecutorService es = Executors.newFixedThreadPool(2); /** 任務(wù)隊(duì)列 */ DelayQueuejobs = new DelayQueue<>(); /** 構(gòu)造方法,實(shí)例化時(shí)啟動(dòng)線程 */ public JobActuator() { this.start(); } public void addJob(DelayTimeJob job) { // 設(shè)置任務(wù)隊(duì)列,用于任務(wù)重新入隊(duì) job.setJobs(jobs); // 任務(wù)入隊(duì) jobs.offer(job); } @Override public void run() { while (true) { try { // 從延遲隊(duì)列中獲取任務(wù) DelayTimeJob job = jobs.take(); // 利用線程池執(zhí)行任務(wù) es.submit(job); } catch (InterruptedException e) { e.printStackTrace(); } } } }
DelayTimeJob
實(shí)現(xiàn)了Delayed接口,執(zhí)行實(shí)際的業(yè)務(wù)并決定任務(wù)是否重新進(jìn)入延遲隊(duì)列。
public class DelayTimeJob implements Runnable, Delayed { /** 執(zhí)行器的任務(wù)隊(duì)列,用于任務(wù)重新入隊(duì) */ @Setter private DelayQueuejobs; /** 延遲時(shí)間生成器 */ IDelayTimeGenerator delayTimeGenerator; /** 具體要執(zhí)行的任務(wù) */ private BaseJob realJob; private long time = 0L; public DelayTimeJob(BaseJob baseJob, IDelayTimeGenerator delayTimeGenerator) { this.realJob = baseJob; this.delayTimeGenerator = delayTimeGenerator; Integer delayTime = delayTimeGenerator.getDelayTime(); if (delayTime == null) { return ; } this.time = delayTime + System.currentTimeMillis(); } @Override public void run() { // 執(zhí)行業(yè)務(wù) realJob.run(); // 任務(wù)不再需要執(zhí)行,主動(dòng)退出 if (realJob.isExit) { return ; } // 獲取延遲 Integer delayTime = delayTimeGenerator.getDelayTime(); // 無(wú)延遲時(shí)間,則任務(wù)不再執(zhí)行 if (delayTime == null) { return ; } // 重新入隊(duì) time += delayTime; jobs.offer(this); return ; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayTimeJob other = (DelayTimeJob) o; long diff = time - other.time; if (diff > 0) { return 1; } if (diff < 0) { return -1; } return 0; } }
BaseJob
用戶繼承此抽象類,在run方法中編寫(xiě)業(yè)務(wù)代碼,通過(guò)控制isExit變量控制任務(wù)是否執(zhí)行。
public abstract class BaseJob implements Runnable { /** 用于控制任務(wù)是否退出 */ @Setter boolean isExit = false; }
IDelayTimeGenerator
延遲時(shí)間生成器接口,返回一個(gè)延遲時(shí)間??梢詫?shí)現(xiàn)不同的策略,達(dá)到不同的延遲效果。
如DesignatDTGenerator是定義每一次執(zhí)行的時(shí)間間隔,FixedRateDTGenerator是按照某一個(gè)固定頻率執(zhí)行。
public interface IDelayTimeGenerator { /** 返回延遲的時(shí)間,單位:毫秒 */ Integer getDelayTime(); }
/** * 指定時(shí)間的時(shí)間生成器 * @author cck */ public class DesignatDTGenerator implements IDelayTimeGenerator { private final DequedelayTimeQueue = new ArrayDeque<>(); /** * 添加延遲時(shí)間 * @param delayTime */ public DesignatDTGenerator addDelayTime(Integer delayTime) { delayTimeQueue.offer(delayTime); return this; } @Override public Integer getDelayTime() { return delayTimeQueue.poll(); } }
/** * 固定間隔的時(shí)間生成器 * @author cck */ public class FixedRateDTGenerator implements IDelayTimeGenerator { private Integer delayTime; public FixedRateDTGenerator(Integer delayTime) { this.delayTime = delayTime; } @Override public Integer getDelayTime() { return delayTime; } }關(guān)鍵類DelayQueue和Delayed
DelayQueue是Java提供的延遲隊(duì)列,該隊(duì)列只允許實(shí)現(xiàn)了Delayed接口的對(duì)象入隊(duì)。
調(diào)用隊(duì)列的take方法時(shí),隊(duì)列會(huì)阻塞,直到有延遲到期的元素才會(huì)返回。
這個(gè)方式是可以實(shí)現(xiàn)一開(kāi)始想要的按照15s/15s/30s/3m/10m/..指定的間隔執(zhí)行任務(wù)的效果的。
定制延遲的效果只需要給出不同的IDelayTimeGenerator接口實(shí)現(xiàn)即可。
在和spring一起使用時(shí),任務(wù)執(zhí)行器JobActuator應(yīng)該是單例的,
不過(guò)提交任務(wù)的整個(gè)操作相比于spring的一個(gè)注解,還是顯得麻煩囧,使用時(shí)再封裝一層會(huì)更好。
現(xiàn)在的實(shí)現(xiàn)方式是和Java的延遲隊(duì)列綁定了的,但是延遲隊(duì)列有多種實(shí)現(xiàn)方式,
例如redis,rabbitMQ等,如果能夠做出更高級(jí)的抽象,合入不同的延遲隊(duì)列那會(huì)更好。
此外這種實(shí)現(xiàn)方式性能方面也有待驗(yàn)證。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/75399.html
摘要:報(bào)文類型對(duì)于框架來(lái)說(shuō),報(bào)文可能有多種類型心跳類型報(bào)文認(rèn)證類型報(bào)文請(qǐng)求類型報(bào)文響應(yīng)類型報(bào)文等。接口調(diào)用請(qǐng)求的發(fā)送,在多條連接之間進(jìn)行負(fù)載均衡。 1 需求分析 RPC 全稱 Remote Procedure Call ,簡(jiǎn)單地來(lái)說(shuō),它能讓使用者像調(diào)用本地方法一樣,調(diào)用遠(yuǎn)程的接口,而不需要關(guān)注底層的具體細(xì)節(jié)。 例如車輛違章代辦功能,如果車輛因?yàn)槟撤N原因違章,只需要通過(guò)這個(gè)違章代辦功能(它也許...
摘要:前言用組件實(shí)現(xiàn)簡(jiǎn)易的定時(shí)任務(wù)功能。步驟創(chuàng)建一個(gè)啟動(dòng)類注意,是關(guān)鍵,加了這個(gè)注解才能啟動(dòng)定時(shí)任務(wù)。編寫(xiě)定時(shí)任務(wù)方法可以實(shí)現(xiàn)兩種定時(shí),一種是每個(gè)一段時(shí)間執(zhí)行一次方法,另一種是執(zhí)行一次方法之后間隔若干時(shí)間后再執(zhí)行下一次。 前言 用Spring-Context組件實(shí)現(xiàn)簡(jiǎn)易的定時(shí)任務(wù)功能。只可以支持較簡(jiǎn)單的業(yè)務(wù)場(chǎng)景,實(shí)用價(jià)值不高。如果想要投放到生產(chǎn)環(huán)境,需要進(jìn)行一些改造。 步驟 1. pom.x...
閱讀 617·2023-04-25 18:37
閱讀 2780·2021-10-12 10:12
閱讀 8315·2021-09-22 15:07
閱讀 564·2019-08-30 15:55
閱讀 3174·2019-08-30 15:44
閱讀 2194·2019-08-30 15:44
閱讀 1625·2019-08-30 13:03
閱讀 1560·2019-08-30 12:55