disgare 的博客
首页
博客
分类
标签
首页
博客
分类
标签
  • 网络

    • 计算机网络学习笔记
    • 网络安全相关
    • 域名和子网掩码
    • CORS 跨域资源共享
    • DNS、HTTP 与 HTTPS
    • Server-Sent Events (SSE)
    • WebSocket 长连接
  • 计算机基础

    • 操作系统 IO 相关知识
    • 操作系统学习笔记
    • 程序的机器级表示
    • 音频文件基础
    • 正则表达式相关概念
    • ffmpeg 的安装以及实现音频切分功能
    • Hex 和 Base64 编码
    • XML 的使用
  • 数据结构与算法

    • 动态规划算法学习笔记
    • 基于比较的排序算法的最坏情况下的最优下界为什么是O(nlogn)
    • 集合与数据结构学习笔记
    • 面试常见算法总结
    • 算法导论第二部分排序学习笔记
    • 算法导论第一部分学习笔记
  • Java

    • 对象之间的映射与转换
    • 反射学习笔记
    • 泛型相关概念
    • 关于 boolean 类型的坑
    • 如何使用 lambda 表达式实现排序
    • CompletableFuture 相关用法
    • CompletableFuture 源码浅要阅读
    • FutureTask 源码阅读
    • Guava 常用 API
    • Guava 源码阅读:Multimap 相关
    • Jackson 的各种使用
    • Java 的 Excel 相关操作
    • java 的常见性能问题分析以及出现场景
    • java 基础知识
    • JAVA 枚举的基础和原理
    • Java 图片文件上传下载处理
    • Java 序列化
    • Java 异常
    • Java 语法糖
    • Java 中关于字符串处理的常用方法
    • Java 中强、软、弱、虚引用
    • JAVA 注解小结
    • Java Http 访问框架
    • Java Stream 的使用
    • Java8 新特性
    • netty 学习笔记
    • Scanner 的各种用法
    • Servlet 学习笔记
    • String、StringBuffer、StringBuilder 学习笔记
  • JVM

    • 虚拟机执行子系统
    • JVM 自动内存管理
    • Linux 中 JVM 常用工具以及常见问题解决思路
  • Linux

    • crontab 表达式
    • Linux 常见命令
    • Linux 文件系统
  • 中间件

    • 关于定时任务原理
    • 详解 kafka
    • ES 搜索引擎
    • flink 提交流程
    • Grape-RAG
    • Hadoop 基础原理
  • 多线程

    • 多线程基础学习笔记
    • 简单了解并发集合
    • 如何手写单例
    • 深入理解 java 多线程安全
    • 生产者消费者问题
    • 线程池作用、用法以及原理
    • AQS 组件
      • 使用方法(模板模式)
      • 源码分析以及原理
      • 公平锁和非公平锁
      • 对资源共享的方式
        • 独占
        • ReentrantLock
        • ReadWriteLock
        • 共享
        • CountDownLatch(倒计时器)
        • CyclicBarrier(循环栅栏)
        • Semphore(信号量)
      • 自定义示例
    • ThreadLocal 原理以及使用
  • 非关系型数据库

    • Redis 集群
    • Redis 数据结构、对象与数据库
    • Redis 学习笔记
  • 关系型数据库

    • B+ 树的插入、删除和数据页分裂机制
    • MySQL 的 binglog、redolog、undolog
    • MySQL 的记录存储结构、存储引擎与 Buffer Pool
    • MySQL 基本的特性
    • MySQL 开发规范
    • MySQL 事务与锁与 MVCC
    • MySQL 数据类型、字符集相关内容
    • MySQL 索引与索引优化
    • PostgreSQL 更新数据时 HOT优化
    • PostgreSQL 相关用法
  • Python

    • Python 基础语法
    • Python 学习
  • Spring 项目

    • Lombok 的常用注解
    • maven 小结
    • MyBatis 框架的使用
    • MyBatis 重要知识点总结
    • MybatisPlus 的使用
    • Spring 框架基础使用
    • Spring 事务相关
    • Spring IOC 的原理及源码
    • Spring AOP 的使用和原理
    • SpringBoot 的原理
    • SpringBoot 基础使用
    • SpringWeb 重要知识点
  • 分布式

    • 初步了解 docker
    • 从 ACID 到 BASE 事务处理的实现
    • 访问远程服务
    • 分布式 id
    • 分布式缓存相关问题
    • 分布式集群理论和分布式事务协议
    • 分布式架构的观测
    • 分布式一致性算法
    • 负载均衡 Load Balancing
    • 关于分布式系统 RPC 中高可用功能的实现
    • 集群间数据同步的目的
    • 三高问题下的系统优化
    • 数据库分库分表
    • 详解 Spring Cloud
    • Dubbo 基础概念
    • Gossip 协议
    • nginx 学习笔记
    • Protobuf 通信协议
    • Zookeeper 基础学习
  • 架构设计

    • 参数校验与异常处理
    • 抽象方法与设计模式
    • 代码整洁之道
    • 权限系统设计
    • 用低内存处理大量数据
    • 设计模式——策略模式
    • 设计模式——过滤器模式在 Spring 中的实践
    • 状态模式
    • 统一结果返回
    • 为什么要打日志?怎么打日志?打什么日志?
    • 运维监控常见指标含义
    • 资深研发进阶
    • DDD 架构学习笔记
    • Java 常用的规则引擎
    • MVC 架构学习笔记
  • AI

    • 如何编写 Prompt
    • Agent 工程架构
    • LLM 相关内容
    • NLP 相关知识
    • vibe coding 最佳实践
    • windows 下 ollama 迁移到 D 盘
  • 开发工具

    • 如何画时序图、流程图、状态流转图
    • excel 关于 =vlookup 的用法
    • git 的学习以及使用
    • IDEA 插件推荐
    • IDEA 常用快捷键以及调试
    • Shell 脚本
    • swagger 的使用
  • 前端

    • 简单了解前端页面开发
    • 伪静态是什么
    • GitHub Pages 部署教程
    • Vercel 部署教程
    • vue-admin-template 简单使用
    • VuePress 博客搭建指南
  • 项目

    • 面试刷题网——技术方案
    • 影视资源聚合站——技术方案
  • 问题记录

    • 定时任务单线程消费 redis 中数据导致消费能力不足
    • 提供可传递的易受攻击的依赖项
    • Liteflow 在 SpringBoot 启动时无法注入组件问题 couldn‘t find chain with the id[THEN(NodeComponent)]
  • 金融

    • 股票分析——关于电力
    • 股票技术面——量价关系
    • 股票技术面——盘口
    • 股票技术面——基础
    • 基础的金融知识
    • 基金与股票
    • 韭菜的自我总结
    • 聊聊价值投资
  • 其他

    • 程序员职场工作需要注意什么
    • 创业全链路SOP:从灵光一现到系统化增长的实战指南
    • 观罗翔讲刑法随笔
    • 价格和价值
    • 立直麻将牌效益理论
    • 梅花易数学习笔记
    • 压力管理
2022-08-09
多线程
目录

AQS 组件

AQS 是一个 JUC 下构造同步器的框架,用来构造同步器,如 ReentrantLock、倒计时器、以及自定义同步器,他封装了线程间沟通方式

JUC 是指 Java.util.concurrent 包,它是 Java 平台提供的并发编程工具集。提供如线程池、并发容器、原子变量、锁等工具,旨在简化多线程编程并提高程序的并发性能

# 使用方法(模板模式)

继承 AbstractQueuedSynchronizer 并重写指定的方法,定义获取与释放 state 的流程

有关等待队列的步骤不能也不用重写(因为被 final 定义,而且进队列出队列的方式已经被 AQS 写好了)

# 源码分析以及原理

AQS 的核心原理可以简化为三个部分:

1,状态变量 State:一个 volatile int 类型的变量,用于表示同步状态(如锁是否被持有)

    private volatile int state;
1

如果某个线程可以修改 state,标记该线程为可用线程,如果不可获取,则加入等待队列,使用 CAS 实现对 state 的修改(导入了 Unsafe 类),获取资源的流程则是由用户继承后写入的

	protected final int getState() {
	        return state;
	}
	protected final void setState(int newState) {
	        state = newState;
	}
    protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSetInt(this, STATE, expect, update);
    }
1
2
3
4
5
6
7
8
9

2,CLH 队列:一个双向链表,用于管理获取锁失败的线程。CLH 完成获取资源线程的排队工作,这个队列中装着内部类 Node,AQS 有一套完整的线程等待与唤醒机制

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        ...
    }
1
2
3
4
5
6
7

3,CAS 操作:通过 Unsafe 类提供的 CAS 功能来原子性地修改 State

核心工作流程:

  • 尝试获取锁:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
  • 入队等待:如果被请求的共享资源被占用(tryAcquire 失败),线程会被封装成 Node 节点加入 CLH 队列尾部
  • 阻塞线程:我们需要一套线程阻塞等待以及被唤醒时锁分配的机制,以独占锁为例,节点入队后,线程会被 LockSupport.park 阻塞
  • 释放锁:持有锁的线程释放锁时会调用 release 方法,修改 State 并唤醒队列中的后继节点

# 公平锁和非公平锁

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 则使用公平锁)

对公平锁而言,首先判断 state 是否为0,如果为0,直接判断 CLH 队列中有没有在等待的线程,如果有,它会在后面排队;如果没有则 CAS 拿锁;如果 state 不为0,后面排队

	// 这是公平锁的acquire方法
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 公平锁特有的tryAcquire方法
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 和非公平锁相比,这里多了一个判断:是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 这里和非公平锁一样,都是去排队
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

对非公平锁而言,在调用 lock 函数的时候它会直接 CAS 试试能不能拿锁,然后进入和公平锁差不多的 acquire 方法,如果发现锁这个时候被释放了(state==0),非公平锁会直接 CAS 抢锁,其他的步骤与公平锁相似

static final class NonfairSync extends Sync {
    final void lock() {
        // 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 非公平锁的tryAcquire,主要执行nonfairTryAcquire方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 这里没有对阻塞队列进行判断,直接尝试去抢锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果抢不到,那就算了
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

如果一个非公平线程进入 CLH 队列了,那它还是会乖乖排队的,但是在排队之前会进行抢锁流程

# 对资源共享的方式

# 独占

只有一个线程可以获取状态,如 ReentrantLock

在独占状态下可以实现两种模式,公平锁(排队获取锁)与非公平锁(抢锁)

# ReentrantLock

锁的使用非常简单,但是需要注意一点,lock 方法下必须使用 try 环绕

// 方式一:Oracle 官方推荐的写法
private val look = ReentrantLock()
fun printNumber() {
    look.lock()
    try {
        // TODO
    } finally {
        look.unlock()
    }
}

// 方式二:错误的写法
private val look = ReentrantLock()
fun printNumber() {
    try {
        look.lock()
    } finally {
        look.unlock()
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

方法一是 Oracle 推荐的方式, 并且在 「阿里巴巴JAVA开发手册」 明确规定了不建议使用 方式二, 即不建议将 lock.lock() 写在 try...finally 代码块内部。这么做是为了避免线程还未加锁就抛出异常,解锁时对没有没有被上锁的对象解锁,此时会 unlock 方法会抛出异常,覆盖之前的异常信息

有 tryLock 尝试加锁操作,如果失败了,则会立即返回 false。lockInterruptibly 打断其他线程操作

private val look = ReentrantLock()
fun printNumber() {
    val isLocked = look.tryLock()
    if (isLocked) {
        try {
            // TODO
        } finally {
            look.unlock()
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11

在构造方法中加入 true 定义此锁为公平锁

可重入锁支持 condition 功能,去唤醒特定的线程,下面是一个 condition 实现消费者生产者的例子:

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();  // 队列不满条件
    private final Condition notEmpty = lock.newCondition(); // 队列不空条件

    private final Queue<String> queue = new LinkedList<>();
    private final int MAX_SIZE = 5;

    public void produce(String data) throws InterruptedException {
        lock.lock();
        try {
            // 队列已满,等待不满条件
            while (queue.size() == MAX_SIZE) {
                System.out.println("队列已满,生产者等待...");
                notFull.await(); // 生产者在notFull条件上等待
            }

            queue.add(data);
            System.out.println("生产数据: " + data + ", 当前队列大小: " + queue.size());

            // 通知消费者队列不为空 - 精确通知,只唤醒消费者线程
            notEmpty.signal();
        } finally {
            // 必须在finally中释放锁,确保锁一定被释放
            lock.unlock();
        }
    }

    public String consume() throws InterruptedException {
        lock.lock();
        try {
            // 队列为空,等待不空条件
            while (queue.isEmpty()) {
                System.out.println("队列为空,消费者等待...");
                notEmpty.await(); // 消费者在notEmpty条件上等待
            }

            String data = queue.poll();
            System.out.println("消费数据: " + data + ", 当前队列大小: " + queue.size());

            // 通知生产者队列不满 - 精确通知,只唤醒生产者线程
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    // 使用可中断锁尝试获取数据,带超时控制
    public String consumeWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
        // 尝试获取锁,可设置超时
        if (!lock.tryLock(timeout, unit)) {
            System.out.println("获取锁超时,放弃消费");
            return null;
        }

        try {
            // 使用超时等待
            if (queue.isEmpty() && !notEmpty.await(timeout, unit)) {
                System.out.println("等待数据超时,放弃消费");
                return null;
            }

            if (!queue.isEmpty()) {
                String data = queue.poll();
                System.out.println("消费数据: " + data + ", 当前队列大小: " + queue.size());
                notFull.signal();
                return data;
            }
            return null;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionExample example = new ConditionExample();

        // 创建生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    example.produce("数据-" + i);
                    Thread.sleep(new Random().nextInt(1000));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 创建消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    example.consume();
                    Thread.sleep(new Random().nextInt(1000));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

# ReadWriteLock

读写锁

和数据库的读写锁定义一样

        ReadWriteLock lock = new ReentrantReadWriteLock();
        Lock lock1 = lock.readLock();
        Lock lock2 = lock.writeLock();
1
2
3

# 共享

多个资源都可以访问状态,如信号量、倒计时器、循环栏列等,一般来说有一个上限

# CountDownLatch(倒计时器)

计算减少锁,中文翻译为倒计时器,基于 AQS

作用是调用 await 方法让一个或者多个线程阻塞,直至一些线程调用 countDown 方法将减少计数内部的 state 减少为0,被阻塞的方法才会继续执行

以下是一些常用方法

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。
​
await();//阻塞当前线程,将当前线程加入阻塞队列。
​
await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,
​
countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
1
2
3
4
5
6
7

比如如果多个线程执行完毕之后,才可以打印日志,此时可以使用这个类,注意,这个倒计时器只能使用一次。以下是一些使用示例

        CountDownLatch latch = new CountDownLatch(priceKeyList.size());
        List<String> hotel = priceKeyList.stream().map(t -> {
        			try {
                        return intToList.apply(hotelPriceQueryParam);
                    } finally {
                        latch.countDown();
                    }
                })
        ).collect(Collectors.toList());

        try {
            latch.await(timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        	e.sout;
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# CyclicBarrier(循环栅栏)

它的作用是调用 await 方法让线程等待并且将栅栏中的 state 加1,直到屏障满了才会让它们继续执行并且调用栅栏中的方法,和人满发车一个道理,以下是它的使用方法:

//循环屏障的定义
CyclicBarrier cyclicBarrier = new CyclicBarrier(20, () -> {System.out.println("ready");});
//在线程中使用这个方法让线程等待
cyclicBarrier.await();
1
2
3
4

它与 CountDownLatch 的最大区别在倒计时器计数到0就打开,无法重置。可重复栅栏凑够人就放行,然后重置继续用

# Semphore(信号量)

信号量在 Linux 中也是一个比较重要的进程间通信方式

它定义最多有几个线程同时执行,只有抢到 state 的线程才会运行,抢到 state 的线程可能有多个,其他的线程会在 CLH 队列中等待,如果其中某一个线程运行完毕调用 release 方法,信号量会自动唤醒等待队列中的线程

信号量最大的作用就是限制一定数目的线程同时访问某个资源,因此,在某些业务资源需要被更改的情况需要特别注意

//定义最多有两个线程同时运行
Semaphore semaphore = new Semaphore(2);
//得到运行许可
semaphore.acquire();
//释放运行许可
semaphore.release();
1
2
3
4
5
6

除了以上这些之外,还有 BlockingQueue 族(用来解决生产者消费者问题)等一些其他 API,都在 JUC 包中

# 自定义示例

我们可以继承 AQS 来实现自己的锁,下面是一个 lock 的示例

注意在实现中,tryAcquire 和 acquire 是两个核心方法,它们的关系可以概括为:acquire 是框架提供的模板方法,实现完整的锁获取逻辑(包括尝试、入队、阻塞等),而 tryAcquire 是需要子类实现的钩子方法,即定义是否可以获取锁的具体判断逻辑

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

// 自定义互斥锁(独占锁)
public class MutexLock {
    // 内部继承AQS实现同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 尝试获取锁(State=0表示锁未被占用)
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread()); // 标记当前线程持有锁
                return true;
            }
            return false;
        }

        // 尝试释放锁
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0); // 释放锁,State设为0
            return true;
        }

        // 判断当前是否是独占状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private final Sync sync = new Sync();

    // 加锁方法
    public void lock() {
        sync.acquire(1);
    }

    // 解锁方法
    public void unlock() {
        sync.release(1);
    }

    // 测试示例
    public static void main(String[] args) {
        MutexLock lock = new MutexLock();
        
        Runnable task = () -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " 释放锁");
            }
        };

        new Thread(task, "Thread-1").start();
        new Thread(task, "Thread-2").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#AQS
最后更新: 1/17/2026, 2:51:21 AM
线程池作用、用法以及原理
ThreadLocal 原理以及使用

← 线程池作用、用法以及原理 ThreadLocal 原理以及使用→

最近更新
01
vibe coding 最佳实践
02-24
02
立直麻将牌效益理论
02-23
03
伪静态是什么
02-08
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式