Kafka幂等和事务

Kafka幂等和事务

一、幂等性

1.1、Exactly-Once

保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
所谓幂等性,数学概念就是:f(f(x)) = f(x) 。f函数表示对消息的处理。
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。
Exactly-Once
即精准一次,来保证幂等性。Exactly once是Kafka从版本0.11之后提供的高级特性。

1.2、为什么需要幂等性

在使用Kafka时,需要确保Exactly-Once语义。分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack前,有可能出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。注,在未达到最大重试次数前,会自动重试(非应用程序代码写的重试)。
Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:

生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:

上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(比如网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
因此需要保证幂等性保证即使多次发送也要让最终的结果一样。

1.3、如何实现幂等性

Kafka在0.11.0.0之后加入的幂等性。他是通过添加唯一ID,类似于数据库的主键,用于唯一标记一个消息。它在底层设计架构中引入了ProducerIDSequenceNumber。那这两个概念的用途是什么呢?
PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID 。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
同样,下图一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:

当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

1.4、使用幂等

通过如下配置使用幂:

  • enable.idempotence=true。enable.idempotence配置项表示是否使用幂等性。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight .requests.per.connection的值小于5。
  • acks=all

1.5、幂等性的限制条件

单独只使用Producer的幂等性是存在一些限制条件的:

  • 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。这种幂等性只是保证了再生产端实现了幂等性,在实际场景中往往需要在消息者端实现幂等性,可以最大程度避免重复消费。

1.6、幂等性的实现原理

每个新的Producer在初始化的时候会被分配一个唯一的PID(凡是开启幂等性都是需要生成PID,只不过未开启事务的PID可以在任意broker生成,而开启事务只能在TransactionCoordinator节点生成),该PID对用户完全透明而不会暴露给用户。Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其消息序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:

  • 如果消息序号比Broker维护的序号大于1以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

上述设计解决了 0.11.0 之前版本中的两个问题:

  • Broker 保存消息后,发送 ACK 前宕机,Producer 认为消息未发送成功并重试,造成数据重复。
  • 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序。

producer_id是从Kafka服务端请求获取的(通过 ProducerIdManager 的 generateProducerId() 方法产生,维护在zk中的 /latest_producer_id_block 节点),消息序列号是Producer端生成的,初始值为0,之后自增加1,每个分区都有独立的序列号。。这里需要说明下,Kafka发送消息都是以batch的格式发送,batch包含了多条消息。所以Producer发送消息batch的时候,只会设置该batch的第一个消息的序列号,后面消息的序列号可以根据第一个消息的序列号计算出来。

二、事务

2.1、为什么需要事务

Kafka 的 Exactly Once 幂等性只能保证单次会话内的精准一次性,不能解决跨会话和跨分区的问题。
假如有如下问题:

  • 1、producer发的多条消息组成一个事务,这些消息需要对consumer同时可见或者同时不可见 。
  • 2、producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
  • 3、kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提交。
  • 4、producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。

在一个原子操作中,根据包含的操作类型,可以分为三种情况:

  • 1、只有Producer生产消息.
  • 2、消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的consume-transform-produce 模式。
  • 3、只有consumer消费消息。

前两种情况需要引入事务,第3种情况不需要引入,这种操作其实没有什么意义,跟使用手动提交效果一样,而且也不是事务属性引入的目的。

2.2、开启事务

对于Producer,需要设置transactional.id属性,这个属性的作用下文会提到。设置了transactional.id属性后,enable.idempotence属性会自动设置为true。
对于Consumer,需要设置isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外,需要设置enable.auto.commit = false来关闭自动提交Offset功能。

2.3、事务保证

2.3.1、事务恢复的保证

为了实现有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。
另外,为了保证新的 Producer 启动后,旧的具有相同Transaction ID的 Producer 即失效,每次 Producer 通过Transaction ID拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。
有了Transaction ID和epoch后,Kafka 可保证:

  • 跨 Session 的数据幂等发送。当具有相同Transaction ID的新的 Producer 实例被创建且工作时,旧的且拥有相同Transaction ID的 Producer 将不再工作。
  • 跨 Session 的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么 Commit 要么 Abort,使得新实例从一个正常状态开始工作。

2.3.2、事务原子性的保证

事务原子性是指 Producer 将多条消息作为一个事务批量发送,要么全部成功要么全部失败。 引入了一个服务器端的模块,名为Transaction Coordinator,用于管理 Producer 发送的消息的事务性。
该Transaction Coordinator维护Transaction Log,该 log 存于一个内部的 Topic 内。由于 Topic 数据具有持久性,因此事务的状态也具有持久性。
Producer 并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。
Transaction Log的设计与Offset Log用于保存 Consumer 的 Offset 类似。

2.3.3、事务中 Offset 的提交保证

在Kafka Stream 应用中同时包含 Consumer 和 Producer(即Consumer-Transform-Producer),前者负责从 Kafka 中获取消息,后者负责将处理完的数据写回 Kafka 的其它 Topic 中。
为了实现该场景下的事务的原子性,Kafka 需要保证对 Consumer Offset 的 Commit 与 Producer 对发送消息的 Commit 包含在同一个事务中。否则,如果在二者 Commit 中间发生异常,根据二者 Commit 的顺序可能会造成数据丢失和数据重复:

  • 如果先 Commit Producer 发送数据的事务再 Commit Consumer 的 Offset,即At Least Once语义,可能造成数据重复。
  • 如果先 Commit Consumer 的 Offset,再 Commit Producer 数据发送事务,即At Most Once语义,可能造成数据丢失。

2.3.4、用于事务特性的控制型消息

为了区分写入 Partition 的消息被 Commit 还是 Abort,Kafka 引入了一种特殊类型的消息,即Control Message。该类消息的 Value 内不包含任何应用相关的数据,并且不会暴露给应用程序。它只用于 Broker 与 Client 间的内部通信。
对于 Producer 端事务,Kafka 以 Control Message 的形式引入一系列的Transaction Marker。Consumer 即可通过该标记判定对应的消息被 Commit 了还是 Abort 了,然后结合该 Consumer 配置的隔离级别决定是否应该将该消息返回给应用程序。
Kafka事务的回滚,并不是删除已写入的数据,而是将写入数据的事务标记为 Rollback/Abort 从而在读数据时过滤该数据。

2.4、事务流程

事务原理流程如下:

上图中的 Transaction Coordinator 运行在 Kafka 服务端,下面简称 TC 服务。
__transaction_state 是 TC 服务持久化事务信息的 topic 名称,下面简称事务 topic。
Producer 向 TC 服务发送的 commit 消息,下面简称事务提交消息。
TC 服务向分区发送的消息,下面简称事务结果消息。

1、寻找 TC 服务地址
Producer 会首先从 Kafka 集群中选择任意一台机器,然后向其发送请求,获取 TC 服务的地址。Kafka 有个特殊的事务 topic,名称为__transaction_state ,负责持久化事务消息。这个 topic 有多个分区,默认有50个,每个分区负责一部分事务。事务划分是根据 transaction id, 计算出该事务属于哪个分区。这个分区的 leader 所在的机器,负责这个事务的TC 服务地址。

2、事务初始化
Producer 在使用事务功能,必须先自定义一个唯一的 transaction id。有了 transaction id,即使客户端挂掉了,它重启后也能继续处理未完成的事务。
Kafka 实现事务需要依靠幂等性,而幂等性需要指定 producer id 。所以Producer在启动事务之前,需要向 TC 服务申请 producer id。TC 服务在分配 producer id 后,会将它持久化到事务 topic。

3、发送消息
Producer 在接收到 producer id 后,就可以正常的发送消息了。不过发送消息之前,需要先将这些消息的分区地址,上传到 TC 服务。TC 服务会将这些分区地址持久化到事务 topic。然后 Producer 才会真正的发送消息,这些消息与普通消息不同,它们会有一个字段,表示自身是事务消息。
这里需要注意下一种特殊的请求,提交消费位置请求,用于原子性的从某个 topic 读取消息,并且发送消息到另外一个 topic。我们知道一般是消费者使用消费组订阅 topic,才会发送提交消费位置的请求,而这里是由 Producer 发送的。Producer 首先会发送一条请求,里面会包含这个消费组对应的分区(每个消费组的消费位置都保存在 __consumer_offset topic 的一个分区里),TC 服务会将分区持久化之后,发送响应。Producer 收到响应后,就会直接发送消费位置请求给 GroupCoordinator。

4、发送提交请求
Producer 发送完消息后,如果认为该事务可以提交了,就会发送提交请求到 TC 服务。Producer 的工作至此就完成了,接下来它只需要等待响应。这里需要强调下,Producer 会在发送事务提交请求之前,会等待之前所有的请求都已经发送并且响应成功。

5、提交请求持久化
TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每 个分区生成提交请求,存到队列里等待发送。
在一般的二阶段提交中,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。那如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了,那么 Kafka 是如何保证事务完成。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。

6、发送事务结果信息给分区
后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。

2.5、事务原理

2.5.1、FindCoordinatorRequest

对应图中的 1。
由于Transaction Coordinator是分配 PID 和管理事务的核心,因此 Producer 要做的第一件事情就是通过向任意一个 Broker 发送FindCoordinator请求找到Transaction Coordinator的位置。
注意:只有应用程序为 Producer 配置了Transaction ID时才可使用事务特性,也才需要这一步。另外,由于事务性要求 Producer 开启幂等特性,因此通过将transactional.id设置为非空从而开启事务特性的同时也需要通过将enable.idempotence设置为 true 来开启幂等特性。
通过请求附带的事务 ID,计算出 __transaction_state 的分区 ID,而其对应的Leader副本 Broker 即是负责当前事务的 Transaction Coordinator。

2.5.2、InitProducerIdRequest

对应图中的 2 。
找到Transaction Coordinator后,具有幂等特性的 Producer 必须发起InitPidRequest请求以获取 PID。
注意:只要开启了幂等特性即必须执行该操作,而无须考虑该 Producer 是否开启了事务特性。

2.5.3、如果事务特性被开启

InitPidRequest会发送给Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有该Transaction ID的 InitPidRequest 请求,它将会把该<TransactionID, PID>存入Transaction Log,如上图中步骤 2.1 所示。这样可保证该对应关系被持久化,从而保证即使Transaction Coordinator宕机该对应关系也不会丢失。
除了返回 PID 外,InitPidRequest还会执行如下任务:
增加该 PID 对应的 epoch。具有相同 PID 但 epoch 小于该 epoch 的其它 Producer(如果有)新开启的事务将被拒绝。
恢复(Commit 或 Abort)之前的 Producer 未完成的事务(如果有)。
注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer 即可开始新的事务。
另外,如果事务特性未开启,InitPidRequest可发送至任意 Broker,并且会得到一个全新的唯一的 PID。该 Producer 将只能使用幂等特性以及单一 Session 内的事务特性,而不能使用跨 Session 的事务特性。

2.5.4、开启事务回话

Kafka 从 0.11.0.0 版本开始,提供beginTransaction()方法用于开启一个事务。调用该方法后,Producer 本地会记录已经开启了事务(标记生产者状态机处于 IN_TRANSACTION 状态),但Transaction Coordinator只有在 Producer 发送第一条消息后才认为事务已经开启。

2.5.5、流的处理与转发阶段

Consume-Transform-Produce模式。这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。
1、AddPartitionsToTxnRequest
一个 Producer 可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。
Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤 4.1 所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>设置 COMMIT 或者 ABORT 标记(如上图中步骤 5.2 所示)。
另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。

2、ProduceRequest
Producer 通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了 PID,epoch,和Sequence Number。该过程如上图中步骤 4.2 所示。

3、AddOffsetsToTxnRequest
为了提供事务性,Producer 新增了sendOffsetsToTransaction方法,该方法将多组消息的发送和消费放入同一批处理内。
该方法先判断在当前事务中该方法是否已经被调用并传入了相同的 Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,如上图中步骤 4.3 所示。该方法会阻塞直到收到响应。

4、TxnOffsetCommitRequest
作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer 也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的 Offset 持久化到内部的__consumer_offsets中,如上图步骤 4.4 所示。
在此过程中,Consumer Coordinator会通过 PID 和对应的 epoch 来验证是否应该允许该 Producer 的该请求。
这里需要注意:
(1)写入__consumer_offsets的 Offset 信息在当前事务 Commit 前对外是不可见的。也即在当前事务被 Commit 前,可认为该 Offset 尚未 Commit,也即对应的消息尚未被完成处理。
(2)Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的 Offset,因为此时这些更新操作尚未被 COMMIT 或 ABORT。

2.5.6、提交或回滚事务

1、EndTxnRequest(对应 5.1)
当用户调用 KafkaProducer#commitTransaction 或者 abortTransaction 方法,生产者会往 Transaction Coordinator 发送附带提交或中止的事务结果的 EndTxnRequest 请求。
当 Transaction Coordinator 在收到请求后,
把 PREPARE 消息写到 __transaction_state。(对应 5.1a)
通过 WriteTxnMarkersRequest 请求,向事务关联的所有 TopicPartitions 主副本写入 EndTransactionMarker 标记。(详细见下文)
最终,把封装了 COMMITTED 或 ABORTED 状态的 EndTransactionMarker 标记写到 __transaction_state。(对应 5.3a)

2、WriteTxnMarkersRequest(对应 5.2a)
Transaction Coordinator 向关联的 TopicPartitions 主副本提交 WriteTxnMarkersRequest 请求,请求中将附带生产者 ID,以用于过滤掉交叉不相关联生产者的日志。
在日后,当消费者读取某生产者的 Aborted 段日志时,可通过上文提及的 .txnindex 索引文件提前过滤,而读取 Committed 段的则无需格外处理。(具体参考上文的设计分解)
另外,如果 __consumer_offsets 也作为事务的一部分,同样写入 EndTransactionMarker 标记并更新 Offsets 可见性。

2.5.7、超时事务中止

默认情况,根据事务的起始时间戳,Transaction Coordinator 每 10s 轮询进行中的事务是否已超时,若发现超时事务,将推进 Epoch、中止当前事务(相当于 Transaction Coordinator 作为新的生产者);而在未来,老的生产者将收到 ProducerFencedException 异常。

2.6、拒绝僵尸实例

在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)”。在Kafka中,两个相同的producer同时处理消息并生产出重复的消息(read-process-write模式),这样就严重违反了Exactly Once Processing的语义。这就是僵尸实例问题。
Kafka事务特性通过transaction-id属性来解决僵尸实例问题。所有具有相同transaction-id的Producer都会被分配相同的pid,同时每一个Producer还会被分配一个递增的epoch。Kafka收到事务提交请求时,如果检查当前事务提交者的epoch不是最新的,那么就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。

2.7、与分布式事务机制对比

2.7.1、两阶段提交原理

二阶段提交的算法思路可以概括为:协调者询问参与者是否准备好了提交,并根据所有参与者的反馈情况决定向所有参与者发送commit或者rollback指令(协调者向所有参与者发送相同的指令)。
所谓的两个阶段是指
准备阶段 又称投票阶段。在这一阶段,协调者询问所有参与者是否准备好提交,参与者如果已经准备好提交则回复Prepared,否则回复Non-Prepared。
提交阶段 又称执行阶段。协调者如果在上一阶段收到所有参与者回复的Prepared,则在此阶段向所有参与者发送commit指令,所有参与者立即执行commit操作;否则协调者向所有参与者发送rollback指令,参与者立即执行rollback操作。
两阶段提交中,协调者和参与方的交互过程如下图所示:

2.7.2、Kafka两阶段提交对比

Kafka的事务机制与上述所介绍的两阶段提交机制看似相似,都分PREPARE阶段和最终COMMIT阶段,但又有很大不同。

  • Kafka事务机制中,PREPARE时即要指明是PREPARE_COMMIT还是PREPARE_ABORT,并且 只须在Transaction Log中标记即可 ,无须其它组件参与。而两阶段提交的PREPARE需要发送给所有的分布式事务参与方,并且事务参与方需要尽可能准备好,并根据准备情况返回Prepared或Non-Prepared状态给事务管理器。
  • Kafka事务中,一但发起PREPARE_COMMIT或PREPARE_ABORT,则确定该事务最终的结果应该是被COMMIT或ABORT 。而分布式事务中,PREPARE后由各事务参与方返回状态,只有所有参与方均返回Prepared状态才会真正执行COMMIT,否则执行ROLLBACK
  • Kafka事务机制中,某几个Partition在COMMIT或ABORT过程中变为不可用,只影响该Partition不影响其它Partition。两阶段提交中,若唯一收到COMMIT命令参与者Crash,其它事务参与方无法判断事务状态从而使得整个事务阻塞。
  • Kafka事务机制引入事务超时机制,有效避免了挂起的事务影响其它事务的问题。
  • Kafka事务机制中存在多个Transaction Coordinator实例,而分布式事务中只有一个事务管理器。

2.8、事务操作

Kafka事务操作常用的API如下:

// 初始化事务,需要注意确保transation.id属性被分配 
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作 
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; 
// 提交事务 
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

案例1:
单个Producer,使用事务保证消息的仅一次发送:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;


public class MyTransactionalProducer {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 提供客户端ID
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
        // 事务ID
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
        // 要求ISR都确认
        configs.put(ProducerConfig.ACKS_CONFIG, "all");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
        // 初始化事务 
        producer.initTransactions();
        // 开启事务 
        producer.beginTransaction();
        try {
            // producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_01"));
            producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
            // int i = 1 / 0;
            // 提交事务
            producer.commitTransaction();
        } catch (Exception ex) {
            // 中止事务
            producer.abortTransaction();
        } finally {
            // 关闭生产者
            producer.close();
        }
    }
}

案例2:
在 消费-转换-生产 模式,使用事务保证仅一次发送:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class MyTransactional {
    public static KafkaProducer<String, String> getProducer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 设置client.id
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
        // 设置事务id
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
        // 需要所有的ISR副本确认
        configs.put(ProducerConfig.ACKS_CONFIG, "all");
        // 启用幂等性
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        return producer;
    }

    public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 设置消费组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
        // 不启用消费者偏移量的自动确认,也不要手动确认
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 只读取已提交的消息
        // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
        return consumer;
    }

    public static void main(String[] args) {
        String consumerGroupId = "consumer_grp_id_101";
        KafkaProducer<String, String> producer = getProducer();
        KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);
        // 事务的初始化
        producer.initTransactions();
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_tx_01"));
        final ConsumerRecords<String, String> records = consumer.poll(1_000);
        // 开启事务
        producer.beginTransaction();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                // 偏 移量表示下一条要消费的消息
            }
            // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移 量)
            producer.sendOffsetsToTransaction(offsets, consumerGroupId);

            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            e.printStackTrace();
            // 回滚事务
            producer.abortTransaction();
        } finally {
            // 关闭资源 
            producer.close();
            consumer.close();
        }
    }
}

三、相关配置

3.1、Broker configs

  • transactional.id.timeout.ms
    在ms中,事务协调器在生产者TransactionalId提前过期之前等待的最长时间,并且没有从该生产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周一次的生产者作业维护它们的id
  • max.transaction.timeout.ms
    事务允许的最大超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以
    防止客户机超时过大,从而导致用户无法从事务中包含的主题读取内容。默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。
  • transaction.state.log.replication.factor
    事务状态topic的副本数量。默认值:3
  • transaction.state.log.num.partitions
    事务状态主题的分区数。默认值:50
  • transaction.state.log.min.isr
    事务状态主题的每个分区ISR最小数量。默认值:2
  • transaction.state.log.segment.bytes
    事务状态主题的segment大小。默认值:104857600字节

3.2、Producer configs

  • enable.idempotence
    开启幂等
  • transaction.timeout.ms
    事务超时时间
    事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间。这个配置值将与InitPidRequest一起发送到事务
    协调器。如果该值大于max.transaction.timeout:在broke中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。默认是60000。这使得交易不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。
  • transactional.id
    用于事务性交付的TransactionalId。这支持跨多个生产者会话的可靠性语义,因为它允许客户端确保使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等交付。

3.3、Consumer configs

isolation.level隔离级别。

  • read_uncommitted:以偏移顺序使用已提交和未提交的消息。
  • read_committed:仅以偏移量顺序使用非事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使用者中缓冲消息,直到看到给定事务中的所有消息。

四、总结

  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件;
  • 开启 kafka事务时,kafka 会自动开启幂等生产者。
  • Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性。
  • 通过事务机制,KAFKA 实现了对多个 topic 的多个 partition 的原子性的写入(Atomic multi-partition writes);
  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件:用户可以根据需要,配置使用幂等生产者但不开启事务;也可以根据需要开启 kafka事务,此时kafka 会使用幂等生产者;
  • 为支持事务机制,KAFKA 引入了两个新的组件:Transaction Coordinator 和 Transaction Log,其中 transaction coordinator 是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一(不是一个独立的新的进程);而 transaction log 是 kakafa 的一个内部 topic;
  • 为支持事务机制,kafka 将日志文件格式进行了扩展:日志中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息 controlBatch,它有两种类型:commit和abort,分别用来表征事务已经成功提交或已经被成功终止。
  • 开启了事务的生产者,生产的消息最终还是正常写到目标 topic 中,但同时也会通过 transaction coordinator 使用两阶段提交协议,将事务状态标记 transaction marker,也就是控制消息 controlBatch,写到目标 topic 中,控制消息共有两种类型 commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;
  • 开启了事务的消费者,如果配置读隔离级别为 read-committed, 在内部会使用存储在目标 topic-partition 中的事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息,从而确保只读到已提交的事务的 message;
  • 开启了事务的消费者,过滤消息时,KAFKA consumer 不需要跟 transactional coordinator 进行 rpc 交互,因为 topic 中存储的消息,包括正常的数据消息和控制消息,包含了足够的元数据信息来支持消息过滤;
  • 当然 kakfa 的 producer 和 consumer 是解耦的,你也可以使用非 transactional consumer 来消费 transactional producer 生产的消息,此时目标 topic-partition 中的所有消息都会被返回,不会进行过滤,此时也就丢失了事务 ACID 的支持;

评论

暂无

添加新评论