一 ScheduledThreadPoolExecutor

定时线程池类的类结构图


它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三种提交任务的方式:

  1. schedule
  2. scheduledAtFixedRate
  3. scheduledWithFixedDelay

它采用DelayQueue存储等待的任务

  1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
  2. DelayQueue也是一个无界队列;

1.1 ScheduledThreadPoolExecutor

延迟执行,只能执行一次

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @author :sgw * @date :Created in 2022/7/17 * @version: V1.0 * @slogan:  * @description: 场景:项目完全启动后需要执行一些任务(防止项目没有完全启动就去执行的话,可能有些过滤器、拦截器还没有加载完,spring里的bean等还没有初始化完而报错....) *  **/@Slf4jpublic class ScheduleThreadPoolRunner {    public static void main(String[] args) {        //参数:线程数        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);        scheduledThreadPoolExecutor.schedule(() -> {            System.out.println("我要延迟5s执行");        }, 5000, TimeUnit.MILLISECONDS);    }}

当定时任务执行完后,要继续执行下一个业务逻辑,可以这样做:

 ScheduledFuture<Integer> future = scheduledThreadPoolExecutor.schedule(() -> {            System.out.println("我要延迟3s执行");            //这里可以返回任意类型数据            return 1; }, 3000, TimeUnit.MILLISECONDS);//这里主线程可以接着干其他活//XXXXXXXXXXXXXXXXXXXXXX   //提交任务的线程-接着干活   try {   //得到的是1后,说明定时任务执行完了,接着做下一个业务       System.out.println(future.get());   } catch (InterruptedException e) {       e.printStackTrace();   } catch (ExecutionException e) {       e.printStackTrace();   }

周期定时任务

上边的代码,只能执行一次定时任务,那么想要周期性的执行定时任务

//参数:线程数(这里设置只有一个线程)ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);//发心跳,service1->service2,每次过2s,发送一个心跳,证明s2可用scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {     log.info("send heart beat");     //线程启动1秒后执行上边的任务逻辑,并且每2秒执行一次上边的任务 }, 1000, 2000, TimeUnit.MILLISECONDS);


上边代码可能有问题,假如任务的执行时间比较长(10秒),但是间隔时间是2秒执行一次(只有一个线程)

 //参数:线程数ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {log.info("send heart beat");long starttime = System.currentTimeMillis(), nowtime = starttime;while ((nowtime - starttime) < 5000) { nowtime = System.currentTimeMillis(); try {     Thread.sleep(100); } catch (InterruptedException e) {     e.printStackTrace(); } }log.info("task over....");//如果这里抛一个异常的话,就卡在这里了,不会向下执行了;所以有一次的话一定要捕获   //throw new RuntimeException("unexpected error , stop working");}, 1000, 2000, TimeUnit.MILLISECONDS);


虽然周期时间到了,但是需要等上一个任务执行完后再执行下一个任务:

 //参数:线程数ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {  log.info("send heart beat");   long starttime = System.currentTimeMillis(), nowtime = starttime;   while ((nowtime - starttime) < 5000) {       nowtime = System.currentTimeMillis();       try {           Thread.sleep(100);       } catch (InterruptedException e) {           e.printStackTrace();       }   }   log.info("task over....");   //如果这里抛一个异常的话,   //throw new RuntimeException("unexpected error , stop working");}, 1000, 2000, TimeUnit.MILLISECONDS);


多个线程数的话,要想起作用,得需要开启多个scheduledThreadPoolExecutor 任务,如果只有一个scheduledThreadPoolExecutor任务的话,这里参数写多个线程的话没有用

 //参数:线程数如果是2ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);

任务周期执行的其他类——定时类:Timer

//定时类 Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() {     @Override     public void run() {         log.info("send heart beat");         //由于是单线程的,这里抛异常后,线程就没了,所以下边的timer任务就无法执行了         throw new RuntimeException("unexpected error , stop working");     } }, 1000, 2000); try {     Thread.sleep(5000); } catch (InterruptedException e) {     e.printStackTrace(); }//由于上边抛异常了,所以这里不能继续执行 timer.scheduleAtFixedRate(new TimerTask() {     @Override     public void run() {         log.info("send heart beat");         throw new RuntimeException("unexpected error , stop working");     } }, 1000, 2000);

阿里规范不建议使用,因为有异常后容易挂掉整个线程,推荐使用上边的ScheduledThreadPoolExecutor

面试:Time是单线程的吗?抛异常后会发生什么现象?
答:是单线程的,跑异常后会把整个线程停掉,导致后续其他任务不能继续执行;

场景一:分布式锁-redis
通过setNx(“”,锁过期时间),如果拿到锁,就去执行业务逻辑;没拿到的话sleep,循环一直尝试拿锁;

//获取分布式锁if(setNx("",锁过期时间)){执行业务逻辑......释放锁}else{sleep循环获取锁}

分布式锁可以使用zookeeper、redis、DB等来实现,zookeeper可靠性比redis好,但是高并发、高吞吐量场景下性能上redis要好;

redis可靠性不好的原因:锁已经过期了,但是业务逻辑还没有执行完——锁的过期时间我们不确定;
解决方式:定时任务,每2秒续约(续命)一次,有框架已经实现了这个续命(redisson)

伪代码如下:

//获取分布式锁if(setNx("",锁过期时间)){定时线程池:2秒续命一次,看看redis锁还在不在,延长时间(相当于redission框架里的看门狗概念)执行业务逻辑......释放锁}else{sleep循环获取锁}

1.2 SchduledFutureTask

SchduledFutureTask接收的参数(成员变量):

private long time:任务开始的时间private final long sequenceNumber;:任务的序号private final long period:任务执行的时间间隔

工作线程的执行过程:

  • 工作线程会从DelayQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayQueue

ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下:

public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<" />
可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组:

在这种结构中,可以发现有如下特性:
假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则:

  1. 一个节点的左子节点的索引为:k = p * 2 + 1;
  2. 一个节点的右子节点的索引为:k = (p + 1) * 2;
  3. 一个节点的父节点的索引为:p = (k - 1) / 2。

为什么要使用DelayedWorkQueue呢?
定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。

DelayedWorkQueue属性

// 队列初始容量private static final int INITIAL_CAPACITY = 16;// 根据初始容量创建RunnableScheduledFuture类型的数组private RunnableScheduledFuture<" />
  • 计算新父节点的索引:parent = (k - 1) >>> 1,parent = 3,那么queue[3]的时间间隔值为8,因为 5 < 8 ,将执行queue[7] = queue[3]:
  • 这时将k设置为3,继续循环,再次计算parent为1,queue[1]的时间间隔为3,因为 5 > 3 ,这时退出循环,最终k为3:

    可见,每次新增节点时,只是根据父节点来判断,而不会影响兄弟节点。
  • take方法

    public RunnableScheduledFuture<" />
    假设 k = 3 ,那么 k = half ,没有子节点,在执行siftDown方法时直接把索引为3的节点设置为数组的最后一个节点:

    有子节点的情况:
    假设 k = 0 ,那么执行以下步骤:
    1、获取左子节点,child = 1 ,获取右子节点, right = 2 :

    2、由于 right < size ,这时比较左子节点和右子节点时间间隔的大小,这里 3 < 7 ,所以 c = queue[child] ;
    3、比较key的时间间隔是否小于c的时间间隔,这里不满足,继续执行,把索引为k的节点设置为c,然后将k设置为child;

    4、因为 half = 3 ,k = 1 ,继续执行循环,这时的索引变为:

    5、这时再经过如上判断后,将k的值为3,最终的结果如下:

    6、最后,如果在finishPoll方法中调用的话,会把索引为0的节点的索引设置为-1,表示已经删除了该节点,并且size也减了1,最后的结果如下:

    可见,siftdown方法在执行完并不是有序的,但可以发现,子节点的下次执行时间一定比父节点的下次执行时间要大,由于每次都会取左子节点和右子节点中下次执行时间最小的节点,所以还是可以保证在take和poll时出队是有序的。

    remove方法

    public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture<" />
    这时要删除8的节点,那么这时 k = 1,key为最后一个节点:

    这时通过上文对siftDown方法的分析,siftDown方法执行后的结果如下:

    这时会发现,最后一个节点的值比父节点还要小,所以这里要执行一次siftUp方法来保证子节点的下次执行时间要比父节点的大,所以最终结果如下:

    总结
    主要总结为以下几个方面:

    • 与Timer执行定时任务的比较,相比Timer,ScheduedThreadPoolExecutor有什么优点;
    • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以它也是一个线程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在于,自己实现了优先工作队列DelayedWorkQueue;
    • ScheduedThreadPoolExecutor实现了ScheduledExecutorService,所以就有了任务调度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同时注意他们之间的区别;
    • 内部类ScheduledFutureTask继承自FutureTask,实现了任务的异步执行并且可以获取返回结果。同时也实现了Delayed接口,可以通过getDelay方法获取将要执行的时间间隔;
    • 周期任务的执行其实是调用了FutureTask类中的runAndReset方法,每次执行完不设置结果和状态。
    • 详细分析了DelayedWorkQueue的数据结构,它是一个基于最小堆结构的优先队列,并且每次出队时能够保证取出的任务是当前队列中下次执行时间最小的任务。同时注意一下优先队列中堆的顺序,堆中的顺序并不是绝对的,但要保证子节点的值要比父节点的值要大,这样就不会影响出队的顺序。
      总体来说,ScheduedThreadPoolExecutor的重点是要理解下次执行时间的计算,以及优先队列的出队、入队和删除的过程,这两个是理解ScheduedThreadPoolExecutor的关键。