disgare 的博客
首页
博客
分类
标签
首页
博客
分类
标签
  • 网络

    • 计算机网络学习笔记
    • 网络安全相关
    • 域名和子网掩码
    • CORS 跨域资源共享
    • DNS、HTTP 与 HTTPS
    • Server-Sent Events (SSE)
    • WebSocket 长连接
  • 计算机基础

    • 操作系统 IO 相关知识
    • 操作系统学习笔记
    • 程序的机器级表示
    • 音频文件基础
    • 正则表达式相关概念
    • ffmpeg 的安装以及实现音频切分功能
    • Hex 和 Base64 编码
    • XML 的使用
  • 数据结构与算法

    • 动态规划算法学习笔记
    • 基于比较的排序算法的最坏情况下的最优下界为什么是O(nlogn)
    • 集合与数据结构学习笔记
    • 面试常见算法总结
    • 算法导论第二部分排序学习笔记
    • 算法导论第一部分学习笔记
  • Java

    • 对象之间的映射与转换
    • 反射学习笔记
    • 泛型相关概念
    • 关于 boolean 类型的坑
    • 如何使用 lambda 表达式实现排序
    • CompletableFuture 相关用法
    • CompletableFuture 源码浅要阅读
      • 注释
      • 创建一个 CompletableFuture
      • AsyncSupply类
        • 奇怪的地方
        • d.completeValue(f.get()) 语句
      • CompletionStage
      • UniAccept
      • Completion
      • 总结
    • FutureTask 源码阅读
    • Guava 常用 API
    • Guava 源码阅读:Multimap 相关
    • Jackson 的各种使用
    • Java 的 Excel 相关操作
    • java 的常见性能问题分析以及出现场景
    • java 基础知识
    • JAVA 枚举的基础和原理
    • Java 图片文件上传下载处理
    • Java 序列化
    • Java 异常
    • Java 语法糖
    • Java 中关于字符串处理的常用方法
    • Java 中强、软、弱、虚引用
    • JAVA 注解小结
    • Java Http 访问框架
    • Java Stream 的使用
    • Java8 新特性
    • netty 学习笔记
    • Scanner 的各种用法
    • Servlet 学习笔记
    • String、StringBuffer、StringBuilder 学习笔记
  • JVM

    • 虚拟机执行子系统
    • JVM 自动内存管理
    • Linux 中 JVM 常用工具以及常见问题解决思路
  • Linux

    • crontab 表达式
    • Linux 常见命令
    • Linux 文件系统
  • 中间件

    • 关于定时任务原理
    • 详解 kafka
    • ES 搜索引擎
    • flink 提交流程
    • Grape-RAG
    • Hadoop 基础原理
  • 多线程

    • 多线程基础学习笔记
    • 简单了解并发集合
    • 如何手写单例
    • 深入理解 java 多线程安全
    • 生产者消费者问题
    • 线程池作用、用法以及原理
    • AQS 组件
    • ThreadLocal 原理以及使用
  • 非关系型数据库

    • Redis 集群
    • Redis 数据结构、对象与数据库
    • Redis 学习笔记
  • 关系型数据库

    • B+ 树的插入、删除和数据页分裂机制
    • MySQL 的 binglog、redolog、undolog
    • MySQL 的记录存储结构、存储引擎与 Buffer Pool
    • MySQL 基本的特性
    • MySQL 开发规范
    • MySQL 事务与锁与 MVCC
    • MySQL 数据类型、字符集相关内容
    • MySQL 索引与索引优化
    • PostgreSQL 更新数据时 HOT优化
    • PostgreSQL 相关用法
  • Python

    • Python 基础语法
    • Python 学习
  • Spring 项目

    • Lombok 的常用注解
    • maven 小结
    • MyBatis 框架的使用
    • MyBatis 重要知识点总结
    • MybatisPlus 的使用
    • Spring 框架基础使用
    • Spring 事务相关
    • Spring IOC 的原理及源码
    • Spring AOP 的使用和原理
    • SpringBoot 的原理
    • SpringBoot 基础使用
    • SpringWeb 重要知识点
  • 分布式

    • 初步了解 docker
    • 从 ACID 到 BASE 事务处理的实现
    • 访问远程服务
    • 分布式 id
    • 分布式缓存相关问题
    • 分布式集群理论和分布式事务协议
    • 分布式架构的观测
    • 分布式一致性算法
    • 负载均衡 Load Balancing
    • 关于分布式系统 RPC 中高可用功能的实现
    • 集群间数据同步的目的
    • 三高问题下的系统优化
    • 数据库分库分表
    • 详解 Spring Cloud
    • Dubbo 基础概念
    • Gossip 协议
    • nginx 学习笔记
    • Protobuf 通信协议
    • Zookeeper 基础学习
  • 架构设计

    • 参数校验与异常处理
    • 抽象方法与设计模式
    • 代码整洁之道
    • 权限系统设计
    • 用低内存处理大量数据
    • 设计模式——策略模式
    • 设计模式——过滤器模式在 Spring 中的实践
    • 状态模式
    • 统一结果返回
    • 为什么要打日志?怎么打日志?打什么日志?
    • 运维监控常见指标含义
    • 资深研发进阶
    • DDD 架构学习笔记
    • Java 常用的规则引擎
    • MVC 架构学习笔记
  • AI

    • 如何编写 Prompt
    • Agent 工程架构
    • LLM 相关内容
    • NLP 相关知识
    • vibe coding 最佳实践
    • windows 下 ollama 迁移到 D 盘
  • 开发工具

    • 如何画时序图、流程图、状态流转图
    • excel 关于 =vlookup 的用法
    • git 的学习以及使用
    • IDEA 插件推荐
    • IDEA 常用快捷键以及调试
    • Shell 脚本
    • swagger 的使用
  • 前端

    • 简单了解前端页面开发
    • 伪静态是什么
    • GitHub Pages 部署教程
    • Vercel 部署教程
    • vue-admin-template 简单使用
    • VuePress 博客搭建指南
  • 项目

    • 面试刷题网——技术方案
    • 影视资源聚合站——技术方案
  • 问题记录

    • 定时任务单线程消费 redis 中数据导致消费能力不足
    • 提供可传递的易受攻击的依赖项
    • Liteflow 在 SpringBoot 启动时无法注入组件问题 couldn‘t find chain with the id[THEN(NodeComponent)]
  • 金融

    • 股票分析——关于电力
    • 股票技术面——量价关系
    • 股票技术面——盘口
    • 股票技术面——基础
    • 基础的金融知识
    • 基金与股票
    • 韭菜的自我总结
    • 聊聊价值投资
  • 其他

    • 程序员职场工作需要注意什么
    • 创业全链路SOP:从灵光一现到系统化增长的实战指南
    • 观罗翔讲刑法随笔
    • 价格和价值
    • 立直麻将牌效益理论
    • 梅花易数学习笔记
    • 压力管理
2023-03-03
Java
目录

CompletableFuture 源码浅要阅读

completablefuture 的使用相当便捷,不过它的方法初次学习起来也相当困难,简单的阅读一下它的实现原理可能会让我们更好的掌握这个类的使用。不过它的源码读起来也和它的使用一样,相当抽象。。。异常抽象

# 注释

进入代码发现它的注释就有老长一段,以下是比较有用的部分:

  • CompletableFuture 是一个在完成时可以触发相关方法和操作的 Future,并且它可以视作为 CompletableStage
  • CompletableFuture 的取消会被视为异常完成。调用 cancel 方法会和调用 completeExceptionally 方法具有同样的效果
  • 如果没有显示指定的 Executor 的参数,则会调用默认的 ForkJoinPool.commonPool(),最好使用指定的线程池,由于守护线程的原因使用默认线程池的话会出现一些奇妙的 bug

它实现了 Future 以及 CompletionStage

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
1

# 创建一个 CompletableFuture

众所周知,创建一个 CompletableFuture 可以使用 run 组和 supply 组的方法,那么这两者创建的 CompletableFuture 有什么不同呢

supplyAsync 的源码:

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

	// 逻辑运行的主要方法
    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        // new 一个新的 CompletableFuture
        CompletableFuture<U> d = new CompletableFuture<U>();
        // 进入线程池,此时业务逻辑已经在执行了
        e.execute(new AsyncSupply<U>(d, f));
        // 返回 CompletableFuture
        return d;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

我们看到进入线程池的是一个 AsyncSupply 对象,里面包含了这个新创建的 CompletableFuture 以及我们重写的 supplier

同时看到 CompletableFuture 直接返回,标志了这是一个异步任务,可以猜测与同步实现的区别就在这

runAsync 的源码:

    static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        e.execute(new AsyncRun(d, f));
        return d;
    }
1
2
3
4
5
6

看到两者没什么不同,都是条件判断加丢进线程池。区别在于这次丢进去的是 AsyncRun

事实上其他的几个创建 CompletableFuture 的方法都类似这样,也可以猜测 AsyncRun 的实现与 AsyncSupply 大差不差

# AsyncSupply类

AsyncSupply 是 CompletableFuture 的内部类,这是它的所有源码:

    static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                // 传入的是一个 new CompletableFuture,它所包含的值为 null 才正常
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 奇怪的地方

等等等等,你们发现了一个奇怪的地方吗

        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }
        ...
        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null)
            ...
1
2
3
4
5
6
7
8
9

为什么要定义两次 CompletableFuture 以及 Supplier 呢,这么做有什么好处吗?

run 方法中的判断 (d = dep) != null && (f = fn) != null 是为了确保在执行任务之前,依赖的 CompletableFuture 和 Supplier 都没有被意外地修改或清空。主要是为了避免并发问题,多个线程可能同时访问和修改 dep 和 fn 字段。通过将 dep 和 fn 赋值给局部变量 d 和 f,可以确保在执行任务时使用的是这些字段的当前值(因为保存到栈帧中了),而不是可能被其他线程修改后的值

这里还可以减少不必要的检查,在 run 方法中,直接使用局部变量 d 和 f 而不是多次访问 dep 和 fn,可以减少不必要的内存访问,提高性能

它还可以防止空指针异常,如果 dep 或 fn 在任务执行前被其他线程设置为 null,那么在调用 f.get() 时会抛出 NullPointerException。通过这个判断,可以提前检查并避免这种情况

# d.completeValue(f.get()) 语句

该方法使用 UNSAFE 类的 CAS 操作,将 supplier 结果设置给 CompletableFuture 的 RESULT

    final boolean completeValue(T t) {
        return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                           (t == null) ? NIL : t);
    }
1
2
3
4

那 CompleteThrowable 一定就是异常的处理了

    final boolean completeThrowable(Throwable x) {
        return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                           encodeThrowable(x));
    }
1
2
3
4

链式调用的代码,实现在 postComplete 中

    final void postComplete() {
    	// 初始化当前 CompletableFuture
        CompletableFuture<?> f = this; Completion h;
        // 循环直到 f 的 stack 为空,如果 f 不是当前实例但其 stack 不为空也好进行循环
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            // 使用 compareAndSet 方法原子地更新 f 的 stack,确保线程安全
            if (STACK.compareAndSet(f, h, t = h.next)) {
            	// 如果 t 不为空,表示还有更多的 Completion 需要处理
                if (t != null) {
                    if (f != this) {
                    	// 压栈
                        pushStack(h);
                        continue;
                    }
                    NEXT.compareAndSet(h, t, null); // try to detach
                }
                // 调用 h.tryFire(NESTED) 尝试触发当前的 Completion 任务
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

postComplete 方法的主要目的是在 CompletableFuture 完成后,递归地触发所有依赖于它的 Completion 任务,确保整个依赖链上的所有任务都能正确地完成。通过使用 CAS 操作和栈来管理依赖关系,确保了线程安全和高效的处理

压进栈中的是 CompletionStage,那 CompletionStage 是什么

# CompletionStage

官方定义中,一个 Function,Comsumer 或者 Runnable 都可以被描述为一个 CompletionStage

CompletionStage 是一个可能执行异步计算的阶段,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式完成,并且它的完成会触发其他依赖的CompletionStage。CompletionStage 接口的方法一般都返回新的 CompletionStage,因此构成了链式的调用

public interface CompletionStage<T> {

    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn);
	...
1
2
3
4
5
6
7

可以看到,CompletableFuture 的所有后续操作都在 CompletionStage 中被定义

选择一个简单的后续操作,看看在 CompletableFuture 中的实现

    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
        // 异常判断
        if (f == null) throw new NullPointerException();
        CompletableFuture<V> d =  new CompletableFuture<V>();
        // 条件判断,线程池是否为null以及CompletableFuture是否已经运行
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
            //放进栈中
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

此方法就是判断当前 CompletableFuture 是否已经运行,如果没运行,将新创建的 CompletableFuture、执行该方法的 CompletableFuture、线程池、我们重写的 Function 打包成一个 UniApply,并且放入这个 CompletableFuture 的栈中

那么这个栈是个什么东西,这个 UniAccept 又是什么?

# UniAccept

该类的构造方法就是简单的赋值

    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            // dep: 新创建的CompletableFuture
  			// src: 驱动thenAccept的CompletableFuture
            super(executor, dep, src); this.fn = fn;
        }
        
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

可以看到它的构造方法调用了它的父类方法,那它的父类是什么?

# Completion

abstract static class Completion extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
        volatile Completion next;      
        abstract CompletableFuture<?> tryFire(int mode);
        abstract boolean isLive();

        public final void run()                { tryFire(ASYNC); }
        public final boolean exec()            { tryFire(ASYNC); return true; }
        public final Void getRawResult()       { return null; }
        public final void setRawResult(Void v) {}
    }
1
2
3
4
5
6
7
8
9
10
11

Completion 是一个抽象类,分别实现了 Runnable、AsynchronousCompletionTask 接口,继承了 ForkJoinPoolTask 类,而 ForJoinPoolTask 抽象类又实现了 Future 接口,因此 Completion 可以简单的看成一个 Future

我们看到 Completion 类中有一个 next,说明它是一个链表结构

而之前那个问题,栈是什么,栈就是 CompletableFuture 中的一个属性 stack,而这个 stack 就是 Completion 类的

这里面的一个方法 tryFire,就是尝试启动下一个 Completion 的意思

# 总结

以上,我们简单过了一遍 CompletableFuture 的创建以及后续操作的实现

  • CompletableFuture 的创建是使用 CAS 操作将我们的传入的方法以及最后的实现参数赋值给 CompletableFuture 中的属性
  • CompletableFuture 中对于各个组的实现大同小异
  • 后续操作是从 postComplete 方法中引出来的,后续操作定义在 CompletionStage 接口中,后续操作的实现是通过 Compition 类的链表结构实现的
  • 每次调用后续操作方法都会生成一个新的 CompletableFuture
#CompletableFuture
最后更新: 1/17/2026, 2:51:21 AM
CompletableFuture 相关用法
FutureTask 源码阅读

← CompletableFuture 相关用法 FutureTask 源码阅读→

最近更新
01
vibe coding 最佳实践
02-24
02
立直麻将牌效益理论
02-23
03
伪静态是什么
02-08
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式