disgare 的博客
首页
博客
分类
标签
首页
博客
分类
标签
  • 网络

    • 计算机网络学习笔记
    • 网络安全相关
    • 域名和子网掩码
    • CORS 跨域资源共享
    • DNS、HTTP 与 HTTPS
    • Server-Sent Events (SSE)
    • WebSocket 长连接
  • 计算机基础

    • 操作系统 IO 相关知识
    • 操作系统学习笔记
    • 程序的机器级表示
    • 音频文件基础
    • 正则表达式相关概念
    • ffmpeg 的安装以及实现音频切分功能
    • Hex 和 Base64 编码
    • XML 的使用
  • 数据结构与算法

    • 动态规划算法学习笔记
    • 基于比较的排序算法的最坏情况下的最优下界为什么是O(nlogn)
    • 集合与数据结构学习笔记
    • 面试常见算法总结
    • 算法导论第二部分排序学习笔记
    • 算法导论第一部分学习笔记
  • Java

    • 对象之间的映射与转换
    • 反射学习笔记
    • 泛型相关概念
    • 关于 boolean 类型的坑
    • 如何使用 lambda 表达式实现排序
    • CompletableFuture 相关用法
    • CompletableFuture 源码浅要阅读
    • FutureTask 源码阅读
    • Guava 常用 API
    • Guava 源码阅读:Multimap 相关
    • Jackson 的各种使用
    • Java 的 Excel 相关操作
    • java 的常见性能问题分析以及出现场景
    • java 基础知识
    • JAVA 枚举的基础和原理
    • Java 图片文件上传下载处理
    • Java 序列化
    • Java 异常
    • Java 语法糖
    • Java 中关于字符串处理的常用方法
    • Java 中强、软、弱、虚引用
    • JAVA 注解小结
    • Java Http 访问框架
    • Java Stream 的使用
    • Java8 新特性
    • netty 学习笔记
    • Scanner 的各种用法
    • Servlet 学习笔记
    • String、StringBuffer、StringBuilder 学习笔记
  • JVM

    • 虚拟机执行子系统
    • JVM 自动内存管理
    • Linux 中 JVM 常用工具以及常见问题解决思路
  • Linux

    • crontab 表达式
    • Linux 常见命令
    • Linux 文件系统
  • 中间件

    • 关于定时任务原理
    • 详解 kafka
      • 消息队列的功能
        • 解耦
        • 异步
        • 削峰
        • 模型
        • 问题
      • Kafka 核心概念
        • Topic(主题)
        • Partition(分区)
        • Replica(副本)和 Broker(服务器)
        • Consumer(消费者)
        • Consumer Group(消费者组)
        • AR、ISR、OSR 和 HW
      • 常见问题
        • kafka 是使用 pull 模式还是 push 模式
        • kafka 在什么时候删除 message 呢
        • 消息队列的高可用性
        • 崩溃恢复
        • 如何保证消息的可靠性传输
        • 生产者丢失数据
        • kafka 丢失数据
        • 消费者丢失数据
        • Kafka 为何可以实现高吞吐
        • Kafka 高效存储机制(索引)
      • 业务问题
        • 大量消息积压
        • 如何按顺序消费 kafka 中的数据
        • 消费者故障,出现活锁问题如何解决
        • 事务消息
        • 死信队列
    • ES 搜索引擎
    • flink 提交流程
    • Grape-RAG
    • Hadoop 基础原理
  • 多线程

    • 多线程基础学习笔记
    • 简单了解并发集合
    • 如何手写单例
    • 深入理解 java 多线程安全
    • 生产者消费者问题
    • 线程池作用、用法以及原理
    • AQS 组件
    • ThreadLocal 原理以及使用
  • 非关系型数据库

    • Redis 集群
    • Redis 数据结构、对象与数据库
    • Redis 学习笔记
  • 关系型数据库

    • B+ 树的插入、删除和数据页分裂机制
    • MySQL 的 binglog、redolog、undolog
    • MySQL 的记录存储结构、存储引擎与 Buffer Pool
    • MySQL 基本的特性
    • MySQL 开发规范
    • MySQL 事务与锁与 MVCC
    • MySQL 数据类型、字符集相关内容
    • MySQL 索引与索引优化
    • PostgreSQL 更新数据时 HOT优化
    • PostgreSQL 相关用法
  • Python

    • Python 基础语法
    • Python 学习
  • Spring 项目

    • Lombok 的常用注解
    • maven 小结
    • MyBatis 框架的使用
    • MyBatis 重要知识点总结
    • MybatisPlus 的使用
    • Spring 框架基础使用
    • Spring 事务相关
    • Spring IOC 的原理及源码
    • Spring AOP 的使用和原理
    • SpringBoot 的原理
    • SpringBoot 基础使用
    • SpringWeb 重要知识点
  • 分布式

    • 初步了解 docker
    • 从 ACID 到 BASE 事务处理的实现
    • 访问远程服务
    • 分布式 id
    • 分布式缓存相关问题
    • 分布式集群理论和分布式事务协议
    • 分布式架构的观测
    • 分布式一致性算法
    • 负载均衡 Load Balancing
    • 关于分布式系统 RPC 中高可用功能的实现
    • 集群间数据同步的目的
    • 三高问题下的系统优化
    • 数据库分库分表
    • 详解 Spring Cloud
    • Dubbo 基础概念
    • Gossip 协议
    • nginx 学习笔记
    • Protobuf 通信协议
    • Zookeeper 基础学习
  • 架构设计

    • 参数校验与异常处理
    • 抽象方法与设计模式
    • 代码整洁之道
    • 权限系统设计
    • 用低内存处理大量数据
    • 设计模式——策略模式
    • 设计模式——过滤器模式在 Spring 中的实践
    • 状态模式
    • 统一结果返回
    • 为什么要打日志?怎么打日志?打什么日志?
    • 运维监控常见指标含义
    • 资深研发进阶
    • DDD 架构学习笔记
    • Java 常用的规则引擎
    • MVC 架构学习笔记
  • AI

    • 如何编写 Prompt
    • Agent 工程架构
    • LLM 相关内容
    • NLP 相关知识
    • vibe coding 最佳实践
    • windows 下 ollama 迁移到 D 盘
  • 开发工具

    • 如何画时序图、流程图、状态流转图
    • excel 关于 =vlookup 的用法
    • git 的学习以及使用
    • IDEA 插件推荐
    • IDEA 常用快捷键以及调试
    • Shell 脚本
    • swagger 的使用
  • 前端

    • 简单了解前端页面开发
    • 伪静态是什么
    • GitHub Pages 部署教程
    • Vercel 部署教程
    • vue-admin-template 简单使用
    • VuePress 博客搭建指南
  • 项目

    • 面试刷题网——技术方案
    • 影视资源聚合站——技术方案
  • 问题记录

    • 定时任务单线程消费 redis 中数据导致消费能力不足
    • 提供可传递的易受攻击的依赖项
    • Liteflow 在 SpringBoot 启动时无法注入组件问题 couldn‘t find chain with the id[THEN(NodeComponent)]
  • 金融

    • 股票分析——关于电力
    • 股票技术面——量价关系
    • 股票技术面——盘口
    • 股票技术面——基础
    • 基础的金融知识
    • 基金与股票
    • 韭菜的自我总结
    • 聊聊价值投资
  • 其他

    • 程序员职场工作需要注意什么
    • 创业全链路SOP:从灵光一现到系统化增长的实战指南
    • 观罗翔讲刑法随笔
    • 价格和价值
    • 立直麻将牌效益理论
    • 梅花易数学习笔记
    • 压力管理
2023-03-15
中间件
目录

详解 kafka

消息队列就是我们常说的 MQ,英文叫 Message Queue,是作为一个单独的中间件产品存在的,独立部署

引入一个新的技术产品,肯定是要考虑为什么要用它呢?消息队列也不列外,说到为什么要用,还真是因为它能在某些场景下发挥奇效。例如:解耦,异步,削峰,这三个词你也听说过吧,那下面就就从这三个好处出发,讲讲到底什么是解耦,异步,削峰

# 消息队列的功能

消息队列的缺点,都在我们下午的常见问题里了。引入任何技术,都需要考虑可能引发的问题

# 解耦

解耦都不陌生吧,就是降低耦合度,我们都知道 Spring 的主要目的是降低耦合,那 MQ 又是如何解耦的呢?

系统 A 是一个关键性的系统,产生数据后需要通知到系统 B 和系统 C 做响应的反应,三个系统都写好了,稳定运行

某一天,系统 D 也需要在系统 A 产生数据后作出反应,那就得系统 A 改代码,去调系统 D 的接口,好,改完了,上线了。假设过了某段时间,系统 C 因为某些原因,不需要作出反应了,不要系统 A 调它接口了,就让系统 A 把调接口的代码删了,系统 A 的负责人可定会很烦,改来改去,不停的改,不同的测,还得看会不会影响系统 B,系统 D

而且这样还没考虑异常情况,假如系统 A 产生了数据,本来需要实时调系统 B 的,结果系统 B 宕机了或重启了,没调成功咋办,或者调用返回失败怎么办,系统 A 是不是要考虑要不要重试?还要开发一套重试机制,系统 A 要考虑的东西也太多了吧

那如果使用 MQ 会是什么样的效果呢?系统 A 产生数据之后,将该数据写到 MQ 中,系统 A 就不管了,不用关心谁消费,谁不消费,即使是再来一个系统 E,或者是系统 D 不需要数据了,系统 A 也不需要做任何改变,而系统 B、C、D 是否消费成功,也不用系统 A 去关心,通过这样一种机制,系统 A 和其他各系统之间的强耦合是不是一下子就解除了

总结一下解耦的意义:

  • 当有类似分布式(单个系统多态机器)、多个系统依赖某一系统产出的数据时,使用 MQ 可以完成该任务
  • 可以灵活配置,只需要订阅消息即可
  • MQ 揽下了所有的异常情况,消息传递之间的异常都可以在这一层处理

# 异步

使用了 MQ 可以将一些不影响业务主流程的问题异步处理,给用户立即返回结果,系统 A 就只需要把产生的数据放到 MQ 里就行了,就可以立马返回用户响应,用户体验提升了很多

系统 A 把数据写到 MQ 里,系统 B、C、D 就自己去拿,慢慢消费就行了。一般就是一些时效性要求不高的操作,比如下单成功系统 A 调系统 B 发下单成功的短信,短信晚几秒发都是 OK 的

# 削峰

削峰是什么意思?大家都知道对于大型互联网公司,典型的就是电商,时不时的搞一些大促,流量会高于平时几十倍几百倍...例如,平时下单也就每秒一二十单,对于现有的架构来说完全不是事儿,那大促的时候呢?每秒就有可能举个例子是5000单,如果说下单要实时操作数据库,假设数据库最大承受一秒2000,那大促的时候一秒5000的话数据库肯定会被打死的,数据库一挂导致系统直接不可用,那是多么严重的事情。

所以在这种场景下使用 MQ 完美的解决了这个问题,下游系统下单时只需要往 MQ 里发消息,我的订单系统可以设定消费的频率,比如每秒我就消费2000个消息(在数据库的可承受范围),不管你下游系统每秒下多少单,我都保持这个速率,既不会影响我订单系统的数据库,也不影响你下游系统的下单操作,很好的保护了系统,也提高了下单的吞吐量。

我们都知道,大促也就几分钟的事,往多了说是几个小时吧,咱就说4个小时吧,每秒5000,4小时 7200W 单往 MQ 里写,订单系统每秒消费2000单,大促过后,MQ 里会积压 4320W 个消息,剩下的就慢慢消费呗。当然了,大促的时候肯定会临时申请加机器的,每秒消费不止2000

这就是削峰,将某一段时间的超高流量分摊到更长的一段时间内去消化,避免了流量洪峰击垮系统

消息队列虽然有诸多好处,但是使用时也需要注意一些问题:消息队列因为会造成队列堆积,时间不长的数据才能放进去,跨天的数据尽量不放进去

# 模型

mq 一般有三种模型,每种模型都有自己的用途

1,点对点(Queue)模型:生产者 → 队列 → 消费者,这种模型下一条消息只能被一个消费者消费,消费后消息消失,核心应用为任务分发、订单处理

2,发布/订阅(Topic)模型:生产者 → 主题 → 多个消费者,每个消费者收到全量消息。特点是一条消息被所有订阅者消费、消息广播模式,在事件通知、日志广播等功能上使用广泛

3,多消费者组模型(Kafka 模型):每个消费者组接受到全部消息,每个消费者组中有多个消费者,进行负载均衡

此外还有延迟队列、优先级队列等模型

# 问题

引入一项新技术一定会带来一些问题,比如系统的复杂性增加、性能下降等。但是这些问题都是可以通过合理的设计和配置,或者一些业务考量来解决的。常见的 mq 引入的问题如下:

  • 1,消息丢失:消息的可靠性保证见下文,主要包含消息到 mq、mq 内部可靠性、消费者可靠性
  • 2,延迟问题:mq 消费消息必然会增加数据延迟,使用 MQ 异步写入,本身就是在用延迟换吞吐/解耦/削峰,因此我们在用这个技术的时候就需要考虑延迟对业务的影响,是否可以接受。同时还需要规避因为消费能力不足导致的消息积压问题

# Kafka 核心概念

kafka 是一个分布式、高吞吐量、高扩展性的消息队列系统。下面是一些基础的概念,也是 mq 的基础功能:

# Topic(主题)

每一个发送到 Kafka 的消息都有一个主题,也可叫做一个类别,类似我们传统数据库中的表名一样,比如说发送一个主题为 order 的消息,那么这个 order 下边就会有多条关于订单的消息,只不过 kafka 称之为主题,都是一样的道理。订阅了该主题的消费者就可以消费该主题里的消息了

# Partition(分区)

消息键(Key)是 Kafka 消息的一个可选属性,用于标识消息的逻辑关联关系。每条消息可以携带一个关键字作为其键,这个键可以是字符串、整数等数据类型。Kafka 中会对这个消息键进行 hash 实现分区

生产者发送的消息数据 Topic 会被存储在分区中,这个分区的概念和 ElasticSearch 中分片的概念是一致的,都是想把数据分成多个块,合理的把消息分布在不同的分区上,好达到我们的负载均衡

分区是被分在不同的 Broker 上也就是服务器上,这样我们大量的消息就实现了负载均衡。注意这里每个分区的内容是不相同的,分区最常见的用法是使用分区选择器 PartitionSelector 将消息根据键进行分区。分区函数会将相同键的消息分配到同一个分区中。这样,相同键的消息将被放置在同一个分区,从而保证了相同键的消息在分区中的顺序,比如按照用户的 id 进行分区,就会保证消息消费的顺序

消费者可以选择订阅一个或多个分区来消费消息。消费者在订阅时可以指定自己感兴趣的键。当消费者订阅特定主题的特定键时,只会接收到该键对应的分区中的消息。这样可以确保消费者只消费自己感兴趣的键的消息,并保证了消费者消费消息的顺序

每个 Topic 可以指定多个分区,但是至少指定一个分区。同时,为了保证一个分区中的数据不丢失,我们还可以设置多个机器拥有这一个分区的副本

# Replica(副本)和 Broker(服务器)

副本就是分区中数据的备份,是 Kafka 为了防止数据丢失或者服务器宕机采取的保护数据完整性的措施,一般的数据存储软件都应该会有这个功能

Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成。Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。多个 broker 组成一个 Kafka 集群,通常一台机器部署一个 Kafka 实例,一个实例挂了不影响其他实例

# Consumer(消费者)

Consumer 就是消费者,我们知道传统的消息队列有两种传播消息的方式,一种是单播,类似队列的方式,一个消息只被消费一次,消费过了,其他消费者就不能消费了;另一种是多播,类似发布-订阅的模式,一个消息可以被多个消费者同时消费

同时多个消费者也可以被分为一组 Group,一组中的所有机器只会收到一条消息 mq 消息,即在一个 Consumer Group 中,每一个 Topic 中的消息只能被这个组中的一个 Consumer 消费,这样保证了一个业务线集群部署的机器不会重复消费消息。发布-订阅下,所有消息都会被 kafka 会向每一组消费者发送一次,有 n 个消费组,就会发送 n 条消息

对于设置了多分区的 Topic,分区的个数和消费者组中的个数应该是一样的,一个消费者消费一个分区,这样每个消费者就成了单播形式,类似队列的消费形式。同样一个消费者组里边的消费者不能多于 Topic 的分区数,一旦多于,多余的消费者实例将不会被分配分区,因此不会收到任何消息。直到有其他消费者实例离开或者分区重新平衡发生,这时它们才有可能获得分区进行消费。但是如果消费者比分区数少,不会造成信息丢失,但是会有 consumer 处理多个分区的 message

同时 kafka 依赖 zk,在分布式系统中,消费者需要知道有哪些生产者是可用的,通过使用 ZooKeeper 协调服务,Kafka 就能将 Producer,Consumer,Broker 等结合在一起,kafka 的选举、负载均衡、partition 对应都是通过 zk 来的。比如 broker 向 Zookeeper 进行注册后,生产者根据 broker 节点来感知 broker 服务列表变化,这样可以实现动态负载均衡 在这里插入图片描述 在 kafka 中是无法过滤任何消息的,只有收到消息后在代码中做过滤

# Consumer Group(消费者组)

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable 在这里插入图片描述 一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡

当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组,这个重新申请加入组的过程就是重平衡(事实上重平衡是为了均匀地分配某个 topic 下的所有 partition 到各个消费组中的消费者,从而使得消息的消费速度达到最快,消费者组和重平衡息息相关)。在以下两种情况下,会触发 rebalance:

  • Topic 的分区数发生变化
  • 消费组内成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组(因此在消费者发布的时候会经常出现重平衡)

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了

# AR、ISR、OSR 和 HW

  • AR:分区中所有副本称为 AR
  • ISR(In-Sync Replicas,同步副本集):所有与主副本保持一定程度同步的副本(包括主副本)称为 ISR
  • OSR:与主副本滞后过多的副本组成 OSR

Leader 会维护一个与自己基本保持同步的 Replica 列表,该列表称为 ISR,每个 Partition 都会有一个 ISR,而且是由 Leader 动态维护。所谓动态维护,就是说如果一个 Follower 比一个 Leader 落后太多,或者超过一定时间未发起数据复制请求,则 Leader 将其从 ISR 中移除

分区副本中的 Leader 如果宕机但 ISR 却为空,一般会等待旧 leader 恢复正常。但是可以设置参数允许 OSR 成为 Leader

HW(高水位值)定义了消息可见性,标识了一个特定的消息偏移量(offset)。作用有两个:消费者只能拉取到这个水位 offset 之前的消息,同时这个偏移量还可以帮助 Kafka 完成副本数据同步操作

接下来讲一下 Leader 的逻辑中的高水位处理:

1,写入消息到磁盘 2,更新 LEO 值。LEO 是自己机器下偏移量的最大值,全程日志末端位移(Log End Offset)。Leader 维护了集群中每台集群的 LEO 值 3,获取 leader 副本所在 Broker 保存的所有远程副本 LEO 值,如:LEO-1、LEO-2、LEO-3。注意这里没有请求 follower 机器,这些 LEO 值都是 follower 机器请求 leader 时 leader 保存的 4,更新高水位为:HW = Math.max(目前的 HW,Math.min(LEO-1,LEO-2,LEO-3....))

Follower 副本拉取消息的逻辑如下:

1,读取磁盘(页缓存)中的消息数据 2,使用 Follower 副本发送消息请求,拉取数据,同时会将自己 LEO 发送给 Leader

# 常见问题

# kafka 是使用 pull 模式还是 push 模式

Kafka 主要采用的是 pull 模式,也就是 consumer 端主动的去获取数据,而 push 模式并不是 Kafka 的标准消费模式。集群间同步也是从机器主动去拉主机器的

这么做的思想是为了 kafka 的高吞吐量设计的,从机可以根据自生情况,选择一次性拉多少数据

pull 模式工作流程

1,消费者发送请求:消费者向 broker 发送拉取请求,指定要拉取的分区和偏移量 2,broker 返回数据:broker 根据请求返回指定的数据 3,消费者处理数据:消费者处理返回的数据,并更新偏移量

这里有点小问题,consumer 侧如何维护这个 offset 偏移量,如果机器重启了,如何保证该机器得到之前的偏移量,当重平衡发生的时候,这个 offset 是如何在消费者组中传递的?

kafka 使用了 pull 模式,但是每个消费组的偏移量维护在 kafka 集群中

当消费者机器重启时,Kafka 会根据消费者的组 ID(group.id)和主题(topic)从 Kafka 的内部主题 __consumer_offsets 中恢复之前的偏移量。重平衡时偏移量管理也是类似的,首先让所有的消费者停止消费,然后进行重新分区,最后将分区对应的 __consumer_offsets 下发到对应的机器中

有趣的是,kafka 会对每一个 consumer 设置一个线程池,线程池中会分 Consumer 线程和 Worker 线程。Consumer 线程是 Kafka 消费者客户端中的主要线程,负责从 Kafka 代理(broker)拉取数据、处理数据、自动或手动提交偏移量、以及在消费者组中参与分区的重新分配(重平衡)。而 Worker 线程主要用于并行处理消费者线程拉取的数据,这里注意 Worder 是无法直接从 kafka 拉数据的,必须从 Consumer 线程中拉数据

Push 模式是一种由生产者或中间件主动推送数据给消费者的模式。虽然 Kafka 本身不直接支持 push 模式,但可以通过一些间接的方法实现类似的效果。不过要是用这些间接的方式,那还不如使用 RabbitMQ 呢(RabbitMQ 本身支持 push)

# kafka 在什么时候删除 message 呢

kakfa 只有在以下两种情况才会删除数据:

1、数据保留时间 (retention.ms):这是 Kafka 配置中设置的时间阈值,表示消息至少应在 broker 上保留的时间。无论消息是否被消费,只要消息没有超过这个时间阈值,就会被保留 2、数据保留大小(retention.bytes):这是一个可选的配置,用于限制每个 topic 分区的最大存储大小。当分区的数据量超过这个限制时,最旧的消息将被删除,即使它们还没有达到保留时间

就算所有的消费者消费了这条消息,这条消息也是不会被删除的。删除消息有低水位机制,思想类似高水位机制

假设你用的是 rabbitmq 等可以设置过期时间的 mq,如果消息在 queue 中积压超过一定的时间就会被 rabbitmq 给清理掉,这个数据就没了,但是这些数据非常重要,我们想要恢复一下

这个情况下,只能批量重导了,写脚本将丢失的那批数据,一点一点地查出来

# 消息队列的高可用性

任何一个中间件提高可用性的最简单的做法就是集群部署。同时,不同的消息队列有不同的集群方式,我们以 Kafka 举例

Kafka 天生就是一个分布式的消息队列,它可以由多个 broker 组成(每个 broker 是一个节点)。创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 只保存一部分数据

Kafka 提供了 replica 副本机制来保证每个节点上的数据不丢失。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,来提高容错性。每个 partition 的数据都会同步到其他机器上,形成多个副本。然后所有副本会选举一个 leader 出来,那么生产和消费都找 leader,其他副本只做同步操作。当 leader 挂了集群会自动去找 replica,然后会再选举一个 leader 出来,这样就具有高可用性了

写数据的时候,生产者就写 leader,然后 leader 将数据写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。只有同步好所有数据才会给 leader 发生 ack,leader 收到所有 ISR 的 ack 之后,才会返回写成功的消息给生产者(当然,这只是其中一种模式,还可以适当调整这个行为) 在这里插入图片描述 消费的时候,只会从 leader 去读,但是只有一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到

# 崩溃恢复

说到了高可用就不得不说崩溃恢复,崩溃恢复是实现高可用的核心设计之一,上面只说明了 kafka 的同步方式,漏了 kafka 的崩溃恢复机制

kafka 的选主机制主要参考 redis 的哨兵选主机制,先选出一个控制器,控制器再根据一定的规则选出一个副本作为主副本,选主核心流程如下:

1,Controller 选举(集群级):每个 Broker 启动时尝试抢占创建 /controller ZNode(旧版)或竞选 KRaft Controller 角色(新版),先创建成功者成为 Controller,负责所有分区 Leader 选举。旧版依赖 zk,需要在 zk 中创建节点,新版使用 raft 协议,集群内部投票选举

2,Controller 检测副本状态:选出来的机器会监控 Broker 的 存活状态(通过 ZK 心跳或 KRaft RPC),如果确认挂掉 Controller 会按照规则选出新 Leader

规则如下:

  • ISR 非空:选 ISR 中第一个机器
  • 如果开启 unclean.leader.election 并且 ISE 为空:从非 ISR 但追上 HW(High Watermark)的副本中选择
  • 分区不可用,直到 ISR 有副本恢复

新版 kafka 利用 kraft 去实现崩溃恢复和故障转移,其实还是为了减少对 zk 的依赖,简化架构复杂性

# 如何保证消息的可靠性传输

# 生产者丢失数据

生产者将数据发送到 MQ 的时候,可能数据就在半路给搞丢了,因为网络啥的问题。Kafka 为了处理这个问题,可以这么设置参数:

  • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 ISR replica 之后,才能认为是写成功了(可类比于 mysql 的全同步)
  • 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了

即只有当所有的 ISR follower 都同步到了消息之后才会给生产者返回 ack 应答,如果没满足这个条件,生产者会自动不断地重试,重试无限次

这里补充一下 Kafka 的 Producer 的 ack 机制,参数值有0、1 和 -1

0: 相当于异步操作,Producer 不需要 Leader 给予回复,此机制具有最低延迟,但是持久性可靠性也最差,当服务器发生故障时,很可能发生数据丢失 1: Kafka 默认的设置。表示 Producer 要 Leader 确认已成功接收数据才发送下一条(批)Message。不过 Leader 宕机,Follower 尚未复制的情况下,数据就会丢失。此机制提供了较好的持久性和较低的延迟性 -1: Leader 接收到消息之后,还必须要求 ISR 列表里跟 Leader 保持同步的那些 Follower 都确认消息已同步,Producer 才发送下一条(批)Message。此机制持久性可靠性最好,但延时性最差

同时,每个生产者初始化时会从 Broker 获取的唯一 ID,还会维护序列号(Sequence Number)对每个消息分区单调递增的数字,kafka 使用该机制做了幂等去重和可靠性校验

if (接收到的序列号 == 最后持久化的序列号 + 1) {
    接受消息
} else if (接收到的序列号 <= 最后持久化的序列号) {
    丢弃消息(幂等去重)
} else {
    抛出异常(序列号不连续)
}
1
2
3
4
5
6
7

# kafka 丢失数据

kafka 挂了,导致数据丢失。这种情况是是 kafka 某个节点宕机,然后重新选举 partiton 的 leader 时,要是此时其他的 follower 刚好还有些数据没有同步,同时 leader 挂了,然后选举某个 follower 成 leader 之后,他就少了一些数据

在这个场景里,我们只要保证每个 partition 都有副本,就可以保证 kafka 不会丢失数据了

追求的说是高水位的设计保证了消费者不会丢失数据,只有该数据同步到所有的 ISR 机器后,消费者才可以读到这个数据

# 消费者丢失数据

消费者消费到了这个消息,然后消费者那边自动提交了 offset,让 kafka 以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢了(异步消费)

大家都知道 kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢(同步消费)。但是此时确实还是会重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,幂等性问题自己保证就好了

同时,消费者和消息队列一般都有心跳检测机制,可以让 mq 精准的感受到哪个消费者挂了以及时进行重平衡

Kafka 消费者可以采用两种方式来提交偏移量:自动提交和手动提交

1,自动提交:配置参数为 enable.auto.commit 设置为 true。工作原理是消费者会定期自动提交当前的偏移量到 Kafka 内部的特殊 Topic _consumer_offsets 中,默认时间间隔为 auto.commit.interval.ms(默认值为 5000 毫秒)。可能会导致数据重复消费或丢失,因为提交的时间点不是完全可控的

2,手动提交:配置参数 enable.auto.commit 设置为 false。工作原理是消费者在处理完数据后,显式调用 commitSync 或 commitAsync 方法提交偏移量。优点是精确控制偏移量的提交时机,避免数据重复消费或丢失,但是需要手动调用方法

# Kafka 为何可以实现高吞吐

kafka 非常适合处理巨量请求,那他为什么会有这种能力呢,他对比 rabbitMQ,rocketMQ 优秀在哪里呢。kafka 非常适合处理巨量请求是因为他支持高吞吐,而 Kafka 之所以可以实现高吞吐,主要依赖于以下5点:

  • Zero Copy 零拷贝技术:当有 Consumer 订阅了相应的 Topic 消息,数据需要从磁盘中读取然后将数据写回到套接字中,正常来说需要经历多次内核态用户态切换和数据拷贝的流程,在 kafka 中直接将数据从文件通道直接传输到了给定的可写字节通道
  • Page Cache 页缓存:操作系统本身有一层缓存,叫做 page cache,是在内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。你在写入磁盘文件的时候,可以直接写入这个 os cache 里,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入磁盘文件中。类似与 mysql 中的 buffer pool
  • 分区分段:partition 机制支持了高吞吐
  • 批量读写:Kafka 数据读写也是批量的而不是单条的。consumer 可以一次拉去多条数据然后起多个 worker 处理请求,Kafka 生产者也会维护一个缓冲区(RecordAccumulator),消息先写入缓冲区而非直接发送,这也是 kafka 的消息封装功能
  • 批量压缩:Kafka 使用了批量压缩,多个消息一起压缩。降低网络带宽。Kafka 允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
  • NIO:kakfa 在网络通信时使用了 nio 技术,使用少量线程可以处理多个连接

# Kafka 高效存储机制(索引)

Kafka 的高效存储设计是其高性能的核心,主要依赖于分段存储和索引优化的架构,也就是 log 文件和 index 文件。下面详细解释各个组件及其协作原理

Segment(分段文件)的物理表现为每个分区被拆分为多个固定大小的 Segment 文件(默认1GB)。文件命名规则为起始偏移量.log(如000.log)

这样做避免单个大文件导致的性能问题,老数据删除只需删除整个 Segment 文件,而且方便基于偏移量的二分查找

Kafka 使用稀疏索引,维护一个 .index 文件,有什么查询下都先访问这个里面的数据,包含两种索引:

  • 偏移量索引,比如 offset: 499 position: 102400
  • 时间戳索引,比如 timestamp: 1631234567890 offset: 499 position: 102400

# 业务问题

# 大量消息积压

比如几千万条数据在 MQ 里积压了七八个小时,从下午4点多,积压到了晚上很晚。这种情况怎么解决呢?首先我们需要分析问题原因,可能原因如下:特定时间节点流量大,mq 消费不过来

注意这里的原因只能是消费者消费不过来,因为在 kafka 里,就算所有的消费者消费了这条消息,这条消息也是不会被删除的,kafka 的消息收到后都是在内存中有缓存,然后落库到磁盘中

因此我们只能在 mq 本身或者生产者方面去处理问题,如果在 mq 本身处理问题的话,我们可能有以下思路:

1,对于不重要的数据,丢掉也没有关系的数据,可以直接修改偏移量 2,在保证下游消费者可以申请到更多资源时,可以解耦消费和接受的逻辑。紧急上一版程序,去让原先接受 mq 的程序,先不消费,转而将数据存放在 db 中,不管是存 mysql 还是 redis,只要有一个数据存放位置即可,然后用两倍的机器消费 db 中的数据 3,如果消费者组中机器数少于分区数,可以扩容消费者

# 如何按顺序消费 kafka 中的数据

Kafka 在分区内是有序的,但不同分区之间的消息顺序无法保证。因此我们可以将业务键(如用户 ID、订单 ID)的消息发送到同一个分区中,保证局部有序

方案二是单分区配合单消费者模式,就是将 topic 设置为 1 个分区,并使用 1 个消费者实例消费。当然这是不满足线上环境的

方案三:我们如果不是 kafka 的提供者,只是一个接受者,只能起一个中间层,比如接收到 kakfa 消息后,我们塞入 redis 中,这里可以使用 hash 或者 zset 这些键来存放数据,然后起一个任务去处理 redis 中的数据。事实上大多数情况都需要这么处理,因为我们作为接收者不可能去找上游改 kafka 配置

# 消费者故障,出现活锁问题如何解决

活锁指消费者持续的维持心跳,但没有进行消息处理

为了预防消费者在这种情况一直持有分区,通常会利用 max.poll.interval.ms 活跃检测机制,如果调用 Poll 的频率大于最大间隔,那么消费者将会主动离开消费组,以便其他消费者接管该分区

# 事务消息

Kafka 事务,是为确保在一个事务中发送的多条消息,要么都成功,要么都失败。这里的多条消息不一定在同一个 topic 和 partition,可以是发往多个 topic 和 partition 的消息

kafka 有个事务协调者,在服务端协调整个事务。非独立进程,而是 Broker 进程的一部分,协调者和分区一样通过选举保证

Kafka 集群也有一个特殊的用于记录事务日志的 topic,该事务日志 topic 的实现和普通 topic 一样,里面记录数据类似开启事务、提交事务这样的事务日志。日志 topic 同样也包含很多分区

Kafka 集群中,可存在多个协调者,每个协调者负责管理和使用事务日志中的几个分区。就是为能并行执行多个事务,提升性能

Kafka 事务实现流程:

1,开启事务时,生产者给协调者发请求开启事务,协调者在事务日志中记录下事务 ID,并且告知事务中的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志

2,接下来,生产者就可像发送普通消息一样发事务消息,Kafka 在处理未提交的事务消息时,和普通消息一样,直接发给 Broker,保存在这些消息对应的分区中。这时候消费者根本读取不了这些消息,会暂时过滤掉未提交的事务消息,不进行消费

3,消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务:

第一阶段,协调者把事务的状态设置为预提交(向所有涉及的分区 Broker 发送开始提交标记),并写入事务日志

第二阶段,如果事务正常结束(所有分区正常返回),协调者在事务相关的所有分区中,都会写一条事务结束的特殊消息,这时候会更新水位信息,此时 Kafka 的消费者就可消费之前读不到的那些事务消息了。如果有分区没返回确认信息,则事务需要回滚,此时写入 ABORT 标记,消费者永远也读到这些消息了

# 死信队列

死信队列是指,当消息在队列中无法被正常消费时(可能是消费失败、消息过期删除、队列满载以及消息被拒绝等原因),将该消息发送到一个特殊的队列中,这个队列就是死信队列。死信队列可以用来重试处理失败的消息,或者记录失败的消息以便后续分析。例如,当订单在支付超时后,可以将订单信息发送到死信队列中,以便进行后续的订单取消或退款处理

RabbitMQ 中死信队列可以直接配置,这包括设置消息的TTL(Time To Live,存活时间)、队列的最大长度、以及消息被拒绝时的处理方式

而 Kafka 本身并不直接支持死信队列,但可以通过配置消费者组和重试机制来实现类似的功能。当消息无法被消费者处理时,可以将其发送到特定的主题(如"dead-letter-topic",这个功能需要负责 kafka 的同学来封装),由另一个消费者组来处理这些消息

#kafka
最后更新: 3/5/2026, 8:10:14 AM
关于定时任务原理
ES 搜索引擎

← 关于定时任务原理 ES 搜索引擎→

最近更新
01
vibe coding 最佳实践
02-24
02
立直麻将牌效益理论
02-23
03
伪静态是什么
02-08
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式