帧与帧栈

  • 每个线程有一块栈内存
  • 每个栈由多个栈帧组成,每个栈帧对应每次方法调用占用的内存
  • 每个线程只能有一个活动栈帧,对应当前正在执行的那个方法

线程上下文切换

  • 线程的CPU时间片用完
  • 垃圾回收
  • 有更高优先级的线程需要运行
  • 线程自己调用了sleep、yield、wait、join、park、synchronized、lock等方法

其中程序计数器的作用

操作系统保存当前线程的状态,记住下一条jvm指令的执行地址,用于恢复线程的状态

PS:状态:程序计数器、栈中每个帧栈的信息

线程状态之五种状态

操作系统层面划分

  1. 初始状态,仅仅是在语言层面上创建了线程对象,即Thead thread = new Thead();,还未与操作系统线程关联

  2. 可运行状态,也称就绪状态,指该线程已经被创建,与操作系统相关联,等待cpu给它分配时间片就可运行

  3. 运行状态,指线程获取了CPU时间片,正在运行;当CPU时间片用完,线程会转换至【可运行状态】,等待 CPU再次分配时间片,会导致我们前面讲到的上下文切换

  4. 阻塞状态

    如果调用了阻塞API,如BIO读写文件,那么线程实际上不会用到CPU,不会分配CPU时间片,会导致上下文切换,进入【阻塞状态】

等待BIO操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】 与【可运行状态】的区别是,只要操作系统一直不唤醒线程,调度器就一直不会考虑调度它们,CPU就一直不会分配时间片 5. 终止状态,表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

线程状态之六种状态

Java API层面

  1. NEW 跟五种状态里的初始状态是一个意思
  2. RUNNABLE 是当调用了 start() 方法之后的状态,注意,Java API 层面的 RUNNABLE 状态涵盖了操作系统层面的【可运行状态】、【运行状态】和【io阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行)
  3. BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分

Monitor概念

Java对象头

以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象

其中Mark Word结构为:

hashcode:对象自己的哈希码 age:对象在垃圾回收中的分代年龄 biased_lock:是不是偏向锁,加锁状态 和monitor关联成功的话,状态由01变为10

Monitor(锁)

又叫做监视器管程 每个java对象都可以关联一个Monitor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针

刚开始时Monitor中的Owner为null当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList中变成BLOCKED状态。Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的。图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析

synchronized原理进阶

轻量级锁

轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的),那么可以使用轻量级锁来进行优化。

轻量级锁对使用者是透明的,即语法仍然是synchronized,假设有两个方法同步块,利用同一个对象加锁

static final Object obj = new Object();public static void method1() { synchronized( obj ) { // 同步块 A method2(); }}public static void method2() { synchronized( obj ) { // 同步块 B }}
  1. 每次指向到synchronized代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的Mark Word和对象引用reference

  2. 让锁记录中的Object reference指向对象,并且尝试用cas(compare and sweep)替换Object对象的Mark Word ,将Mark Word 的值存入锁记录中

  3. 如果cas替换成功,那么对象的对象头储存的就是锁记录的地址和状态01,如下所示

  4. 如果cas失败,有两种情况

  • 如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段

  • 如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段

5.当线程退出synchronized代码块的时候,如果获取的是取值为 null 的锁记录 ,表示有重入,这时重置锁记录,表示重入计数减一

6.当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象

  • 成功则解锁成功
  • 失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。

1.当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

2.这时 Thread-1 加轻量级锁失败,进入锁膨胀流程

  • 即为对象申请Monitor锁,让Object指向重量级锁地址,然后自己进入Monitor 的EntryList 变成BLOCKED状态

3.当Thread-0 推出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败,那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将Owner设置为null,唤醒EntryList 中的Thread-1线程

自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁

  • 其实就是线程发现有锁,然后在那边等一会,不发生上下文切换
  • 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能

偏向锁

第一次使用CAS时将对象的Mark Word头设置为入锁线程ID,之后这个入锁线程再进行重入锁时,发现线程ID是自己的,那么就不用再进行CAS了

对象的创建过程

  1. 如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的Thread,epoch,age都是0,在加锁的时候进行设置这些的值
  2. 偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:-XX:BiasedLockingStartupDelay=0来禁用延迟
  3. 注意:处于偏向锁的对象解锁后,线程 id 仍存储于对象头中,从属于该线程
  4. 测试禁用:如果没有开启偏向锁,那么对象创建后最后三位的值为001,这时候它的hashcode,age都为0,hashcode是第一次用到hashcode时才赋值的。在上面测试代码运行时在添加 VM 参数-XX:-UseBiasedLocking禁用偏向锁(禁用偏向锁则优先使用轻量级锁),退出synchronized状态变回001

撤销偏向锁-hashcode方法

测试 hashCode:当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存hashcode的值了

当一个对象已经计算过一致性哈希码后,它就再也无法进入偏向锁状态了;

而当一个对象当前正处于偏向锁状态,又收到需要计算其一致性哈希码请求时,它的偏向状态会被立即撤销,并且锁会膨胀为重量级锁。

在重量级锁的实现中,对象头指向了重量级锁的位置,代表重量级锁的ObjectMonitor类里有字段可以记录非加锁状态(标志位为“01”)下的Mark Word,其中自然可以存储原来的哈希码

撤销偏向锁-变为轻量锁

首先得满足轻量级锁的使用条件,就是没有线程对同一个对象进行锁竞争

最开始使用的是偏向锁,但是第二个线程尝试获取对象锁时,发现本来对象偏向的是线程一,那么偏向锁就会失效,加的就是轻量级锁

撤销偏向锁-调用wait/notify

会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持

批量重偏向

如果对象被多个线程访问,但是没有竞争,这时候偏向了线程一的对象又有机会重新偏向线程二,即可以不用升级为轻量级锁

超过20对象对同一个线程如线程一撤销偏向时,那么第20个及以后的对象才可以将撤销对线程一的偏向这个动作变为将第20个及以后的对象偏向线程二

批量撤销

当撤销偏向锁阈值超过40次后,jvm会这样觉得,自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

锁消除

如果即时编译器(JIT)发现有的锁锁住的对象没有逃离方法,这锁锁了跟没锁一样,就会优化,去掉这个锁

park&unpark原理

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex

先调用park再调用upark的过程

1:先调用park

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond)
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

2:调用unpark

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

先调用unpark再调用park的过程

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

线程状态切换

1:NEW –> RUNNABLE

  • 当调用t.start()方法时,由NEW –> RUNNABLE

2:RUNNABLE WAITING

t 线程用synchronized(obj) 获取了对象锁后

  • 调用obj.wait()方法时,t线程从RUNNABLE –> WAITING

  • 调用obj.notify(),obj.notifyAll(),t.interrupt()时

    竞争锁成功,t 线程从WAITING –> RUNNABLE

竞争锁失败,t 线程从WAITING –> BLOCKED

3:RUNNABLE WAITING

  • 当前线程调用t.join()方法时,当前线程从RUNNABLE –> WAITING
  • 注意是当前线程在t 线程对象的监视器上等待
  • t 线程运行结束,或调用了当前线程的interrupt()时,当前线程从WAITING –> RUNNABLE

4:RUNNABLE WAITING

  • 当前线程调用LockSupport.park()方法会让当前线程从RUNNABLE –> WAITING
  • 调用LockSupport.unpark(目标线程)或调用了线程的interrupt(),会让目标线程从WAITING –>RUNNABLE

5:RUNNABLE TIMED_WAITING

t 线程用synchronized(obj)获取了对象锁后

  • 调用obj.wait(long n)方法时,t 线程从RUNNABLE –> TIMED_WAITING

  • t 线程等待时间超过了 n 毫秒,或调用obj.notify(),obj.notifyAll(),t.interrupt() 时

    竞争锁成功,t 线程从TIMED_WAITING –> RUNNABLE

    竞争锁失败,t 线程从TIMED_WAITING –> BLOCKED

6:RUNNABLE TIMED_WAITING

  • 当前线程调用t.join(long n) 方法时,当前线程从RUNNABLE –> TIMED_WAITING
  • 注意是当前线程在t 线程对象的监视器上等待
  • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的interrupt() 时,当前线程从TIMED_WAITING –> RUNNABLE

7:RUNNABLE TIMED_WAITING

  • 当前线程调用Thread.sleep(long n),当前线程从RUNNABLE –> TIMED_WAITING
  • 当前线程等待时间超过了 n 毫秒,当前线程从TIMED_WAITING –> RUNNABLE

8:RUNNABLE TIMED_WAITING

  • 当前线程调用LockSupport.parkNanos(long nanos)或LockSupport.parkUntil(long millis) 时 ,当前线程从RUNNABLE –> TIMED_WAITING

  • 调用LockSupport.unpark(目标线程) 或调用了线程的interrupt(),或是等待超时,会让目标线程从TIMED_WAITING–> RUNNABLE

9:RUNNABLE BLOCKED

  • t 线程用synchronized(obj) 获取了对象锁时如果竞争失败,从RUNNABLE –> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED的线程重新竞争,如果其中 t 线程竞争成功,从BLOCKED –> RUNNABLE,其它失败的线程仍然BLOCKED

10:RUNNABLE TERMINATED

  • 当前线程所有代码运行完毕,进入TERMINATED

活跃性

死锁

一个线程需要同时获取多把锁,这时就容易发生死锁t1 线程获得A对象锁,接下来想获取B对象的锁;t2 线程获得B对象锁,接下来想获取A对象的锁例。

活锁

两个线程互相改变对方的结束条件,最后谁也无法结束

饥饿

一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束

JMM(Java内存模型)

JMM 即 Java Memory Model,它从java层面定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。JMM 体现在以下几个方面

  • 原子性 – 保证指令不会受到线程上下文切换的影响
  • 可见性 – 保证指令不会受 cpu 缓存的影响
  • 有序性 – 保证指令不会受 cpu 指令并行优化的影响

可见性

先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:

public class Test1 {static boolean run = true;public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(run){// ....//System.out.println(2323);如果加上这个代码就会停下来}});t.start();utils.sleep(1);System.out.println(3434); run = false; // 线程t不会如预想的停下来}}

分析:

1.初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存

2.因为t1线程频繁地从主存中读取run的值,JIT即时编译器会将run的值缓存至自己工作内存中的高速缓存中,减少对主存中run的访问以提高效率

3.一秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值

解决方法:

  1. volatile(表示易变关键字的意思),它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存
  2. 使用synchronized关键字也有相同的效果!在Java内存模型中,synchronized规定,线程在加锁时, 先清空工作内存→在主内存中拷贝最新变量的副本到工作内存 →执行完代码→将更改后的共享变量的值刷新到主内存中→释放互斥锁

PS:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低。

System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,想一想为什么?

因为println方法里面有synchronized修饰。

有序性

指令重排

int num = 0;// volatile 修饰的变量,可以禁用指令重排 volatile boolean ready = false; 可以防止变量之前的代码被重排序boolean ready = false; // 线程1 执行此方法public void actor1(I_Result r) { if(ready) { r.r1 = num + num; }else { r.r1 = 1; }}// 线程2 执行此方法public void actor2(I_Result r) { num = 2; ready = true;}

结果可能是 0 。这种情况下是:线程2 执行 ready = true,切换到线程1,进入 if 分支,相加为 0,再切回线程2 执行 num = 2。

这种现象叫做指令重排,是 JIT 编译器在运行时的一些优化,这个现象需要通过大量测试才能复现。

重排序也需要遵守一定规则:

  1. 重排序操作不会对存在数据依赖关系的操作进行重排序。比如:a=1;b=a; 这个指令序列,由于第二个操作依赖于第一个操作,所以在编译时和处理器运行时这两个操作不会被重排序。
  2. 重排序是为了优化性能,但是不管怎么重排序,单线程下程序的执行结果不能被改变。比如:a=1;b=2;c=a+b这三个操作,第一步(a=1)和第二步(b=2)由于不存在数据依赖关系,所以可能会发生重排序,但是c=a+b这个操作是不会被重排序的,因为需要保证最终的结果一定是c=a+b=3。

重排序在单线程模式下是一定会保证最终结果的正确性,但是在多线程环境下,问题就出来了。

解决方法:volatile 修饰的变量,可以禁用指令重排

volatile原理

volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  • 对 volatile 变量的写指令后会加入写屏障
  • 对 volatile 变量的读指令前会加入读屏障

保证有序性

1.写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中

写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后

public void actor2(I_Result r) { num = 2; ready = true; // ready是被volatile修饰的 ,赋值带写屏障 // 写屏障}

2.而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

public void actor1(I_Result r) { // 读屏障 //ready是被volatile修饰的 ,读取值带读屏障 if(ready) { r.r1 = num + num; } else { r.r1 = 1; }}

还是那句话,不能解决指令交错:

  • 写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去
  • 而有序性的保证也只是保证了本线程内相关代码不被重排序

CAS原理(无锁)

AtomicInteger的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?

@Overridepublic void withdraw(Integer amount) {// 核心代码// 需要不断尝试,直到成功为止while (true){// 比如拿到了旧值 1000int pre = getBalance();// 在这个基础上 1000-10 = 990int next = pre - amount;/* compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值 - 不一致了,next 作废,返回 false 表示失败 比如,别的线程已经做了减法,当前值已经被减成了 990 那么本线程的这次 990 就作废了,进入 while 下次循环重试 - 一致,以 next 设置为新值,返回 true 表示成功 */if (atomicInteger.compareAndSet(pre,next)){break;}}}

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

CAS与volatile

在上面代码中的AtomicInteger,保存值的value属性使用了volatile 。获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。

CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

为什么无锁效率高

  1. 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  2. 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发
  1. 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
  2. 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

ABA问题

以为你是A,所以把你改成B,但其实他早就已经被别的线程从A改成B又改回A了

并发工具类

java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:

  1. 使用原子的方式更新基本类型

    AtomicInteger:整型原子类

AtomicLong:长整型原子类 AtomicBoolean :布尔型原子类 2. 原子引用 3. 原子数组 4. 字段更新器 5. 原子累加器

原子整数

public static void main(String[] args) {AtomicInteger i = new AtomicInteger(0);// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++System.out.println(i.getAndIncrement());// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++iSystem.out.println(i.incrementAndGet());// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --iSystem.out.println(i.decrementAndGet());// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--System.out.println(i.getAndDecrement());// 获取并加值(i = 0, 结果 i = 5, 返回 0)System.out.println(i.getAndAdd(5));// 加值并获取(i = 5, 结果 i = 0, 返回 0)System.out.println(i.addAndGet(-5));// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用System.out.println(i.getAndUpdate(p -> p - 2));// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用System.out.println(i.updateAndGet(p -> p + 2));// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 finalSystem.out.println(i.getAndAccumulate(10, (p, x) -> p + x));// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));}

原子引用

为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

  • AtomicReference:引用类型原子类

  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

    AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
  • AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来

    并不关心引用变量更改了几次,只是单纯的关心是否更改过

原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

字段更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常

原子累加器

LongAdder累加器的使用,性能会提升

性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

源码之 LongAdder

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化transient volatile Cell[] cells;// 基础值, 如果没有竞争, 则用 cas 累加这个域transient volatile long base;// 在 cells 创建或扩容时, 置为 1, 表示加锁transient volatile int cellsBusy;

cas锁

使用cas实现一个自旋锁

// 不要用于生产实践!!!public class LockCas {private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}}

原理之伪共享

其中 Cell 即为累加单元

// 防止缓存行伪共享@sun.misc.Contendedstatic final class Cell {volatile long value;Cell(long x) { value = x; }// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码}

下面讨论@sun.misc.Contended注解的重要意义

得从缓存说起,缓存与内存的速度比较

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离cpu越近速度越快。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0],Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效

问题解决:

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效


add方法源码解读

public void add(long x) {// as 为累加单元数组// b 为基础值// x 为累加值Cell[] as; long b, v; int m; Cell a;// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 ifif ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 没有竞争boolean uncontended = true;if (// as 还没有创建as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有被创建,a为当线程的cell(a = as[getProbe() & m]) == null || // 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}}


LongAccumulate逻辑分析

第一个if的第一个条件加上else if 中代码的逻辑,这是cells未创建时的处理逻辑

若线程对应的cell还没创建好,则执行的是第一个红框里的代码,逻辑如下

若线程对应的cell已经创建好,则执行的是第二个红框里的代码,逻辑如下

sum方法分析

获取最终结果通过 sum 方法,将各个累加单元的值加起来就得到了总的结果。

public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}

Unsafe对象

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。

Unsafe unsafe = UnsafeAccessor.getUnsafe();Field id = Student.class.getDeclaredField("id"); Field name = Student.class.getDeclaredField("name"); // 获得成员变量的偏移量long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id); long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);Student student = new Student(); // 使用 cas 方法替换成员变量的值UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20);// 返回 true UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 trueSystem.out.println(student);

线程池

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

ThreadPoolExecutor

Executor 框架结构

1:任务(Runnable /Callable) 执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2:任务的执行(Executor) 包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

3:异步计算的结果(Future) Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

使用步骤:

1:主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。

2:把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))。

3:如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。

4:最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

AQS原理

概述:全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

  1. 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

    getState – 获取 state 状态

    setState – 设置 state 状态

    compareAndSetState – cas 机制设置 state 状态

    独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

  2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList

  3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException):

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

ReentrantLock 原理

可以看到ReentrantLock提供了两个同步器,实现公平锁和非公平锁,默认是非公平锁

非公平锁实现原理

加锁解锁流程,先从构造器开始看,默认为非公平锁实现

public ReentrantLock() { sync = new NonfairSync();}

NonfairSync 继承自 AQS

没有竞争时

第一个竞争出现时,查看源码的NonfairSync的lock方法

Thread-1 执行了:

  1. lock方法中CAS 尝试将 state 由 0 改为 1,结果失败

  2. lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败

  3. 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列

    图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态

    Node 的创建是懒惰的

    其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入 acquire方法的 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败

  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

  4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

  5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true

  6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)

  7. 再次有多个线程经历上述过程竞争失败,变成这个样子

  8. Thread-0 调用unlock方法里的release方法释放锁,进入tryRelease(使用ctrl+alt+b查看tryRelease方法的具体ReentrantLock实现) 流程,如果成功,设置 exclusiveOwnerThread 为 null,state = 0

  9. unlock方法里的release方法方法中,如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程: unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程

  10. 如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)

    exclusiveOwnerThread 为 Thread-1,state = 1

    head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread

    原本的 head 因为从链表断开,而可被垃圾回收

  11. 如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

    如果不巧又被 Thread-4 占了先

    Thread-4 被设置为 exclusiveOwnerThread,state = 1

    Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

    可重入原理

    static final class NonfairSync extends Sync {// ...// Sync 继承过来的方法, 方便阅读, 放在此处final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}}

    可打断原理

    不可打断模式:在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

    // Sync 继承自 AQSstatic final class NonfairSync extends Sync {// ...private final boolean parkAndCheckInterrupt() {// 如果打断标记已经是 true, 则 park 会失效LockSupport.park(this);// interrupted 会清除打断标记return Thread.interrupted();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null;failed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true;}}} finally {if (failed)cancelAcquire(node);}}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {// 如果打断状态为 trueselfInterrupt();}}static void selfInterrupt() {// 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错Thread.currentThread().interrupt();}}}

    可打断模式(区别在于循环的出口多了一个抛异常的出口)

    static final class NonfairSync extends Sync {public final void acquireInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果没有获得到锁, 进入 ㈠if (!tryAcquire(arg))doAcquireInterruptibly(arg);}// ㈠ 可打断的获取锁流程private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 在 park 过程中如果被 interrupt 会进入此// 这时候抛出异常, 而不会再次进入 for (;;)throw new InterruptedException();}}} finally {if (failed)cancelAcquire(node);}}}

    公平锁实现原理

    static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}// 与非公平锁主要区别在于 tryAcquire 方法的实现protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t 时表示队列中有 Nodereturn h != t &&(// (s = h.next) == null 表示队列中还有没有老二(s = h.next) == null || // 或者队列中老二线程不是此线程s.thread != Thread.currentThread());}}

    条件变量实现原理

    每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

    await 流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

park 阻塞 Thread-0

signal流程

假设 Thread-1 要来唤醒 Thread-0

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1

Thread-1 释放锁,进入 unlock 流程

读写锁

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!

提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法

注意事项:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
     r.lock();try {// ...w.lock();try {// ...} finally{w.unlock();}} finally{r.unlock();}
  • 重入时降级支持:即持有写锁的情况下去获取读锁
     class CachedData {Object data;// 是否有效,如果失效,需要重新计算 datavolatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock();if (!cacheValid) {// 获取写锁前必须释放读锁rwl.readLock().unlock();rwl.writeLock().lock();try {// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新if (!cacheValid) {data = ...cacheValid = true;}// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存,同时保证自己在读的时候不会被其他线程写rwl.readLock().lock();} finally {rwl.writeLock().unlock();}}// 自己用完数据, 释放读锁try {use(data);} finally {rwl.readLock().unlock();}}}

Semaphore

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

这时 Thread-4 释放了 permits,状态如下

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

CountdownLatch

CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state = = 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。

用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

JDK7 HashMap 并发死链

死链复现

运行代码,程序在预料的断点位置停了下来,输出

长度为16时,桶下标为1的key1163550长度为32时,桶下标为1的key135扩容前大小[main]:12

这时可以在 Variables 面板观察到 e 和 next 变量,使用 view as -> Object 查看节点状态

e (1)->(35)->(16)->nullnext(35)->(16)->null

在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成

newTable[1](35)->(1)->null

这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为

e (1)->nullnext(35)->(1)->null

为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结果正确,但它结束后 Thread-0 还要继续运行 接下来就可以单步调试(F8)观察死链的产生了 下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (1)->nulle(35)->(1)->nullnext (1)->nul

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (35)->(1)->nulle(1)->nullnextnull

再看看源码

e.next = newTable[1]; // 这时 e(1,35)// 而 newTable[1] (35,1)->(1,35) 因为是同一个对象 newTable[1] = e;// 再尝试将 e 作为链表头, 死链已成 e = next;// 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了

假设map中初始元素是

原始链表,格式:[下标] (key,next) [1] (1,35)->(35,16)->(16,null)线程a执行到1 处,此时局部变量e 为 (1,35),而局部变量next 为 (35,16) 线程a 挂起 线程b开始执行第一次循环 [1] (1,null)第二次循环[1] (35,1)->(1,null) 第三次循环[1] (35,1)->(1,null) [17] (16,null)切换回线程a,此时局部变e 和next 被恢复,引用没变但内容变了:e 的内容被改为 (1,null),而next的内容被改为 (35,1) 并链向 (1,null)第一次循环 [1] (1,null)第二次循环,注意这时 e 是(35,1) 并链向 (1,null) 所以next 又是 (1,null) [1] (35,1)->(1,null)第三次循环,e 是 (1,null),而next 是null,但e被放入链表头,这样e.next 变成35(2 处) [1] (1,35)->(35,1)->(1,35)已经是死链了