disgare 的博客
首页
博客
分类
标签
首页
博客
分类
标签
  • 网络

    • 计算机网络学习笔记
    • 网络安全相关
    • 域名和子网掩码
    • CORS 跨域资源共享
    • DNS、HTTP 与 HTTPS
    • Server-Sent Events (SSE)
    • WebSocket 长连接
  • 计算机基础

    • 操作系统 IO 相关知识
    • 操作系统学习笔记
    • 程序的机器级表示
    • 音频文件基础
    • 正则表达式相关概念
    • ffmpeg 的安装以及实现音频切分功能
    • Hex 和 Base64 编码
    • XML 的使用
  • 数据结构与算法

    • 动态规划算法学习笔记
    • 基于比较的排序算法的最坏情况下的最优下界为什么是O(nlogn)
    • 集合与数据结构学习笔记
    • 面试常见算法总结
    • 算法导论第二部分排序学习笔记
    • 算法导论第一部分学习笔记
  • Java

    • 对象之间的映射与转换
    • 反射学习笔记
    • 泛型相关概念
    • 关于 boolean 类型的坑
    • 如何使用 lambda 表达式实现排序
    • CompletableFuture 相关用法
    • CompletableFuture 源码浅要阅读
    • FutureTask 源码阅读
    • Guava 常用 API
    • Guava 源码阅读:Multimap 相关
    • Jackson 的各种使用
    • Java 的 Excel 相关操作
    • java 的常见性能问题分析以及出现场景
    • java 基础知识
    • JAVA 枚举的基础和原理
    • Java 图片文件上传下载处理
    • Java 序列化
    • Java 异常
    • Java 语法糖
    • Java 中关于字符串处理的常用方法
    • Java 中强、软、弱、虚引用
    • JAVA 注解小结
    • Java Http 访问框架
    • Java Stream 的使用
    • Java8 新特性
    • netty 学习笔记
    • Scanner 的各种用法
    • Servlet 学习笔记
    • String、StringBuffer、StringBuilder 学习笔记
  • JVM

    • 虚拟机执行子系统
    • JVM 自动内存管理
    • Linux 中 JVM 常用工具以及常见问题解决思路
  • Linux

    • crontab 表达式
    • Linux 常见命令
    • Linux 文件系统
  • 中间件

    • 关于定时任务原理
    • 详解 kafka
    • ES 搜索引擎
    • flink 提交流程
    • Grape-RAG
    • Hadoop 基础原理
  • 多线程

    • 多线程基础学习笔记
    • 简单了解并发集合
    • 如何手写单例
    • 深入理解 java 多线程安全
    • 生产者消费者问题
    • 线程池作用、用法以及原理
      • 作用
      • 用法
        • 建议设定大小
        • 快捷构造线程池
        • submit 与 execute
        • shutdown 与 shutdownNow
        • Future 与 FutureTast
        • 代码
      • 状态(生命周期)
      • 底层原理
        • 继承关系
        • 主要参数
        • 工作原理
        • 饱和策略
        • 连接复用
      • 常见问题
        • 死锁
        • ForkJoinPool
    • AQS 组件
    • ThreadLocal 原理以及使用
  • 非关系型数据库

    • Redis 集群
    • Redis 数据结构、对象与数据库
    • Redis 学习笔记
  • 关系型数据库

    • B+ 树的插入、删除和数据页分裂机制
    • MySQL 的 binglog、redolog、undolog
    • MySQL 的记录存储结构、存储引擎与 Buffer Pool
    • MySQL 基本的特性
    • MySQL 开发规范
    • MySQL 事务与锁与 MVCC
    • MySQL 数据类型、字符集相关内容
    • MySQL 索引与索引优化
    • PostgreSQL 更新数据时 HOT优化
    • PostgreSQL 相关用法
  • Python

    • Python 基础语法
    • Python 学习
  • Spring 项目

    • Lombok 的常用注解
    • maven 小结
    • MyBatis 框架的使用
    • MyBatis 重要知识点总结
    • MybatisPlus 的使用
    • Spring 框架基础使用
    • Spring 事务相关
    • Spring IOC 的原理及源码
    • Spring AOP 的使用和原理
    • SpringBoot 的原理
    • SpringBoot 基础使用
    • SpringWeb 重要知识点
  • 分布式

    • 初步了解 docker
    • 从 ACID 到 BASE 事务处理的实现
    • 访问远程服务
    • 分布式 id
    • 分布式缓存相关问题
    • 分布式集群理论和分布式事务协议
    • 分布式架构的观测
    • 分布式一致性算法
    • 负载均衡 Load Balancing
    • 关于分布式系统 RPC 中高可用功能的实现
    • 集群间数据同步的目的
    • 三高问题下的系统优化
    • 数据库分库分表
    • 详解 Spring Cloud
    • Dubbo 基础概念
    • Gossip 协议
    • nginx 学习笔记
    • Protobuf 通信协议
    • Zookeeper 基础学习
  • 架构设计

    • 参数校验与异常处理
    • 抽象方法与设计模式
    • 代码整洁之道
    • 权限系统设计
    • 用低内存处理大量数据
    • 设计模式——策略模式
    • 设计模式——过滤器模式在 Spring 中的实践
    • 状态模式
    • 统一结果返回
    • 为什么要打日志?怎么打日志?打什么日志?
    • 运维监控常见指标含义
    • 资深研发进阶
    • DDD 架构学习笔记
    • Java 常用的规则引擎
    • MVC 架构学习笔记
  • AI

    • 如何编写 Prompt
    • Agent 工程架构
    • LLM 相关内容
    • NLP 相关知识
    • vibe coding 最佳实践
    • windows 下 ollama 迁移到 D 盘
  • 开发工具

    • 如何画时序图、流程图、状态流转图
    • excel 关于 =vlookup 的用法
    • git 的学习以及使用
    • IDEA 插件推荐
    • IDEA 常用快捷键以及调试
    • Shell 脚本
    • swagger 的使用
  • 前端

    • 简单了解前端页面开发
    • 伪静态是什么
    • GitHub Pages 部署教程
    • Vercel 部署教程
    • vue-admin-template 简单使用
    • VuePress 博客搭建指南
  • 项目

    • 面试刷题网——技术方案
    • 影视资源聚合站——技术方案
  • 问题记录

    • 定时任务单线程消费 redis 中数据导致消费能力不足
    • 提供可传递的易受攻击的依赖项
    • Liteflow 在 SpringBoot 启动时无法注入组件问题 couldn‘t find chain with the id[THEN(NodeComponent)]
  • 金融

    • 股票分析——关于电力
    • 股票技术面——量价关系
    • 股票技术面——盘口
    • 股票技术面——基础
    • 基础的金融知识
    • 基金与股票
    • 韭菜的自我总结
    • 聊聊价值投资
  • 其他

    • 程序员职场工作需要注意什么
    • 创业全链路SOP:从灵光一现到系统化增长的实战指南
    • 观罗翔讲刑法随笔
    • 价格和价值
    • 立直麻将牌效益理论
    • 梅花易数学习笔记
    • 压力管理
2021-06-05
多线程
目录

线程池作用、用法以及原理

# 作用

1,降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗 2,提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行 3,提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

尽量通过 Executor 来启动线程,这种方法比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题

# 用法

使用 ThreadPoolExecutor 构造线程池,将线程(任务)传入

创建线程池时需要设定特殊参数,如核心线程池大小、最大线程池大小、缓冲队列大小等

获取结果也是线程池使用的一大难点,普通的 Future 接口以及 FutureTask、ListenableFuture 等都可以实现接受结果 在这里插入图片描述

# 建议设定大小

1、CPU 计算较多的时候,被叫做 CPU 密集型应用,核心线程数设置为 N+1,N 为 CPU 个数 2、IO 操作较多时,被叫做 IO 密集型应用,设置为 2*N(IO 所需要的 CPU 资源非常少。大部分工作是分派给 DMA 完成的)

我们线上很多场景都不需要 CPU 一直参与,因此设置为 2*N 比较合适

那么这个大小到底是如何确定下来的呢?

通过 Little's Law(利特尔法则)确定的,利特尔法则(Little’s Law)是运营管理中的一个基本定理,由约翰·利特尔(John Little)于1961年提出。 该法则描述了在稳定系统中,系统中的平均库存量(L)、平均到达率(λ)和平均等待时间(W)之间的关系

知道三点即可确定线程数:请求 CPU 处理时间、一个请求所消耗的时间、CPU 数目,表示如下:

线程数=[(线程等待时间+线程 CPU 时间)/线程 CPU 时间]×CPU 数量 在这里插入图片描述 如何获取线程 CPU 时间和线程等待时间呢,最好的办法是通过性能测试,做好监控、打点、压测来发现最佳的线程数

同时也给了我们一个提示,要提升性能我们就要减少 CPU 的执行时间,另外就是要设置一个合理的并发线程数,通过这两方面来显著提升服务器的性能

当然以上过程比较理论化,在实战中会有很多特殊情况发生,比如下午3,4点的流量,和 12 点左右午饭时的流量不一样,而美团给出了动态化配置的解决方案

在源码中有一些诡异的方法,我们一般不会注意到,比如:

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

该方法让我们可以在线程池运行时修改核心线程数,JDK 不止提供了这个方法,最大线程数等都可以修改

同时,我们除了将大小动态化保存,还可以关注最大线程数和核心线程数的大小,如果将核心线程数设置为最佳线程数了,最大的线程数可能需要和核心保持一致,如果将最大线程数设置为最佳线程数,核心线程数可能需要设置为平时的请求 QPS 数来节省资源

# 快捷构造线程池

只建议用 ThreadPoolExecutor 来创建线程池,不建议使用 Executors 中的以下四种方法创建,前两种队列大小可达 INTEGER_VALUE,后两种线程多少可达 INTEGER_VALUE,而这两种都会消耗系统资源,在源码中以下方法也只是返回参数固定的 ThreadPoolExecutor 对象

1,FixedThreadPool:固定线程数的线程池 2,SingleThreadExecutor:只有一个线程的线程池 3,CachedThreadPool:主线程提交任务的速度高于线程处理任务的速度时,会不断创建新的线程 4,ScheduledThreadPoolExecutor:定时执行任务

# submit 与 execute

submit 可以提交 Callable 子类对象并获得一个 Future 类型的对象,比如 FutrueTask 作为返回值。可以通过 Future 的 get 方法来获取返回值,不过 get 会阻塞调用该方法的线程,因此是同步的,get 方法可以带时间,时间一过就会抛出异常。这种方式在调用 future 的 get 方法时才会抛出异常

execute 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否,同时由于 Runnable 的 run 方法没有抛出异常,因此 Runnable 实现的线程也不会抛出异常,我们需要在内部捕获异常

# shutdown 与 shutdownNow

shutdown:关闭线程池,线程池的状态变为 SHUTDOWN,不再接受新任务,并执行所有在队列中的任务

shutdownNow:立即关闭线程池,线程池的状态变为 STOP,停止执行所有任务,返回在队列中的任务链表

同时,还有 isShutdown() 方法与 isTerminated() 方法来判断它是否执行 shutdown 方法以及是否抛出了所有队列进入了 terminated 状态

# Future 与 FutureTast

Future 是一个接口,里面定义了一些方法:

boolean cancel(boolean mayInterruptIfRunning);如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务
boolean isCancelled();
boolean isDone();
V get();
V get(long timeout, TimeUnit unit);
1
2
3
4
5

FutureTask 除了实现了 Future 接口外还实现了 Runnable 接口(即可以通过 Runnable 接口实现线程,也可以通过 Future 取得线程执行完后的结果),因此 FutureTask 也可以直接提交给 Executor 执行

# 代码

public class test4 {
	
    private static final int CORE_POOL_SIZE = 6;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    
    public static void main(String[] args) throws InterruptedException {
    	TreadTest t = new TreadTest();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());
        
        for(int i = 0; i < 30; i++) {
//        	TreadTest t = new TreadTest();
        	executor.submit(t);
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}


class TreadTest implements Callable<Integer> {
	volatile int j = 0;
    @Override
    public Integer call() {
	    for(int i = 0; i < 10; i++) {
	        System.out.println("Thread Name= "+Thread.currentThread().getName() + "number = "+ j++);
	        try {
	            Thread.sleep(100);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
    	}
        //formatter pattern is changed here by thread, but it won't reflect to other threads
        return j;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 状态(生命周期)

线程池有5种状态

1,RUNNING:线程池一旦被创建,就处于 RUNNING 状态,任务数为0,能够接收新任务,对已排队的任务进行处理

2,SHUTDOWN:不接收新任务,但能处理已排队的任务。调用线程池的 shutdown 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态,只有在所有任务执行完毕后,空闲的线程才会被逐渐回收

3,STOP:不接收新任务,不处理已排队的任务,如果还有未完成的任务,这些任务将会被丢弃,并且会中断正在处理的任务。调用线程池的 shutdownNow 方法,线程池由 RUNNING 或 SHUTDOWN 转变为 STOP 状态,被中断的线程在完成任务后会被回收,因为线程池正在关闭

4,TIDYING(整理):

SHUTDOWN 状态下,任务数为 0,其他所有任务已终止,线程池会变为 TIDYING 状态,会执行 terminated 方法。线程池中的 terminated 方法是空实现,可以重写该方法进行相应的处理,用于在线程池停止后执行一些清理工作

线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态

5,TERMINATED(结束):线程池彻底终止。线程池在 TIDYING 状态执行完 terminated 方法就会由 TIDYING 转变为 TERMINATED 状态

在这里插入图片描述

# 底层原理

# 继承关系

如果加入已计划的线程池,就成了如下结果: 在这里插入图片描述 ScheduledExecutorService 主要是用来做定时任务的

以下是 ExecutorService 接口的所有方法(Execute 接口只有一个 execute 方法) 在这里插入图片描述 可以看到该接口就是表示了线程池的生命周期,这个线程池服务的 invokeAll 方法是将集合中所有的Callable都执行,invokeAny提交所有但是只返回一个最先完成的结果,其他的主要方法下面会说

以下源码主要来自ThreadPoolExecutor类

# 主要参数

corePoolSize:核心线程数,线程池有多少线程同时运行

maximumPoolSize:最大线程数,当提交的任务超过队列大小时,当前可以同时运行的线程数量变为最大线程数

workQueue:缓冲队列,指最大可以存放的任务数,注意,线程执行任务时任务还在队列中,可以把缓冲队列设置为0看看效果

handler:饱和策略,如果缓冲队列慢了会怎么处理提交的任务

keepAliveTime:如果线程没有任务,多少时间后会消亡

注意,缓冲队列和饱和策略都是可以继承后重写的,你可以在饱和策略里发mq,Tomcat 中也重写了队列,为了优先创建线程

// Tomcat的TaskQueue核心逻辑
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    
    @Override
    public boolean offer(Runnable o) {
        // 如果线程数未达到最大线程数,直接拒绝入队,让线程池创建新线程
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
            return false; // 触发创建新线程
        }
        return super.offer(o);
    }
    
    @Override
    public boolean offer(Runnable o, long timeout, TimeUnit unit) {
        return offer(o); // 同样逻辑
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

现在我们来构建一个线程池:

import java.util.concurrent.*;

public class ThreadPoolExample {

    public static void main(String[] args) {
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
         	// 核心线程数
            5,
            // 最大线程数
            10,
            // 空闲线程的存活时间
            60L,
            // 时间单位,分钟
            TimeUnit.SECONDS,
            // 有界任务队列,如果需要无界任务队列,使用 new ArrayBlockingQueue<Runnable>() 即可
            new ArrayBlockingQueue<Runnable>(10),
            // 拒绝策略,如果队列满了,使用主线程来处理这个任务
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Task ID: " + taskId + " is running by " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(1000); // 模拟任务执行时间
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        System.out.println("Task ID: " + taskId + " was interrupted");
                    }
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 工作原理

1,核心线程数未满时,提交任务,无可用线程时线程数加一 2,核心线程数已满时,提交任务,到达缓冲队列 3,缓冲队列已满时,提交任务,创建新线程直到到达最大线程数 4,继续提交任务,线程池使用饱和策略,饱和策略可以在构造线程池时设定

以上四步也是 ThreadPoolExecutor 的 execute 方法的过程

但是在开始之前需要来一点预备知识

	// ctl 表示了线程的状态以及当前激活的线程数,用一个值表示了两种东西,很离谱对不对
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Integer.SIZE 就是32,COUNT_BITS 就是29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 这里就是线程池运行时状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 提供读取当前线程数、当前运行状态的方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

来解释一下,ctl 的低29位用于存放当前的线程数,因此一个线程池在理论上最大的线程数是(2^29)-1;高3位是用于表示当前线程池的状态,其中高三位的值和状态对于如下:

  • 111: RUNNING
  • 000: SHUTDOWN
  • 001: STOP
  • 010: TIDYING
  • 011: TERMINATED

在以后的处理需要多个数据的问题的时候,也可以模仿这种优雅的写法。接下来来看看 execute 方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl 是个原子类,拿到ctl的值
        int c = ctl.get();
        // 如果核心线程数大于现在正在执行的线程数,workerCountOf方法用于获取当前正在执行的线程数
        if (workerCountOf(c) < corePoolSize) {
        	// 调用addWorker创建一个线程并返回,如果创建失败再获取一个ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果当前线程池在跑并且将command成功加入了队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果当前线程池没在跑并且将command删除成功了,则执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果发生什么事情删除失败了或者当前线程池在运行中,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 尝试创建一个工作线程,如果线程池挂了或者大于最大线程数,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
        // 其他情况不做处理
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

其中 addWorker 方法用于创建新线程,它使用 HashSet 来存放线程,里面放的是 Worker,只有持有全局锁 mainLock 的前提下才能访问此集合。同时这个方法返回 true 则表示线程创建成功,false 表示失败

    private final ReentrantLock mainLock = new ReentrantLock();
    // 线程池的最大大小
    private int largestPoolSize;
    // 工作线程放在这里
    private final HashSet<Worker> workers = new HashSet<>();

   private boolean addWorker(Runnable firstTask, boolean core) {
   		//这个retry用于跳出两层循环,两次循环创建失败后再次进行资格判断
        retry:
        for (;;) {
        	//条件判断
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

			//以下for循环主要为了将workcount的数量加1
            for (;;) {
               //获取线程池中工作的线程的数量
                int wc = workerCountOf(c);
                // core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //原子操作将workcount的数量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程的状态改变了就再次执行上述操作
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        // 标记工作线程是否启动成功
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        //以下过程尝试去创建工作线程
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
                    //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
                    //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                    // firstTask == null证明只新建线程而不执行任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        //更新当前工作线程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 工作线程是否启动成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
                if (workerAdded) {
                    t.start();
                    // 标记线程启动成功
                    workerStarted = true;
                }
            }
        } finally {
            // 线程启动失败,需要从工作线程中移除对应的Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

# 饱和策略

1,拒绝执行任务并抛出异常 2,使用调用线程池的线程执行任务 3,丢弃此任务 4,丢弃第一个等待的任务

还有更多处理方法,可以自行百度

# 连接复用

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);

        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (timed && timedOut) {
            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,
            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

            // 注意workQueue中的poll()方法与take()方法的区别
            // poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
            // take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止

            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

上面的代码实现了连接池的线程复用,以及超过核心线程数的线程如何被销毁,核心线程如何被保存,思路就是把 Runnable 放进 BlockingQueue 里,用一定方式调用线程去拿任务

如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待 keepAliveTime 的时长,此时还没任务就返回 null,这就意味着 runWorker() 方法中的 while 循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程

如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有 N 个线程是活的,可以随时处理任务,从而达到重复利用的目的

所以最大线程与核心线程的不同,就是调用获取任务的方法不同,一个 take 一个是 poll,这两个方法都由 BlockingQueue 友情提供

# 常见问题

# 死锁

虽然死锁可能发生在任何多线程程序中,但线程池引入了另一种死锁情况。他发生死锁的原因是线程池中执行 A 函数,A 函数阻塞的调用 B 函数,而 B 函数也在该线程池中,其中所有正在执行的线程都在等待等待队列阻塞线程的结果,因为执行的线程不可用

业务线程在占用了线程池内所有的资源后又向线程池提交了新的任务,并且要等这些任务完成后才释放资源,而这些新提交的任务根本就没机会被完成

如果我们使用无界线程池,死锁的问题几乎无法解决,因为我们不断提交任务,但是线程池却不扩容,最后导致所有的线程都无法执行

# ForkJoinPool

ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。ForkJoinPool 主要用于实现分而治之的场景,需要程序员去将任务拆分,它最适合的是计算密集型的任务

其性能好坏取决于编码方式,举个例子


public class ForkJoinCalculator implements Calculator {

    private ForkJoinPool pool;

    //执行任务RecursiveTask:有返回值  RecursiveAction:无返回值
    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        //此方法为ForkJoin的核心方法:对任务进行拆分  拆分的好坏决定了效率的高低
        @Override
        protected Long compute() {

            // 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            } else { // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的线程池 ForkJoinPool.commonPool():
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
        pool.shutdown();
        return result;
    }
}
输出:
耗时:390ms
结果为:50000005000000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#线程池#并发
最后更新: 2/23/2026, 9:23:04 AM
生产者消费者问题
AQS 组件

← 生产者消费者问题 AQS 组件→

最近更新
01
vibe coding 最佳实践
02-24
02
立直麻将牌效益理论
02-23
03
伪静态是什么
02-08
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式