@KafkaListener 注解

@KafkaListener 注解

一、简介

Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一。本文集成的框架 spring-kafka
源码解读:

@Target ({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention (RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable (KafkaListeners.class)
public @interface KafkaListener {
   /**
    * 消费者的 id,当 GroupId 没有被配置的时候,默认 id 为 GroupId
    */
   String id () default "";
   /**
    * 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置 containerFactory 属性
    */
   String containerFactory () default "";
   /**
    * 需要监听的 Topic,可监听多个,和 topicPattern 属性互斥
    */
   String [] topics () default {};
   /**
    * 需要监听的 Topic 的正则表达。和 topics,topicPartitions 属性互斥
    */
   String topicPattern () default "";
   /**
    * 可配置更加详细的监听信息,必须监听某个 Topic 中的指定分区,或者从 offset 为 200 的偏移量开始监听,可配置该参数,和 topicPattern 属性互斥
    */
   TopicPartition [] topicPartitions () default {};
   /**
    * 侦听器容器组 
    */
   String containerGroup () default "";
   /**
    * 监听异常处理器,配置 BeanName
    */
   String errorHandler () default "";
   /**
    * 消费组 ID 
    */
   String groupId () default "";
   /**
    * id 是否为 GroupId
    */
   boolean idIsGroup () default true;
   /**
    * 消费者 Id 前缀
    */
   String clientIdPrefix () default "";
   /**
    * 真实监听容器的 BeanName,需要在 BeanName 前加 "__"
    */
   String beanRef () default "__listener";
}

二、案例

2.1、ConsumerRecord 类消费

使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用 String 类型去接收消息体。
这里编写一个 Listener 方法,监听 topic1,并把 ConsumerRecord 里面所包含的内容打印到控制台中:

@Component
public class Listener {
    private static final Logger log = LoggerFactory.getLogger (Listener.class);

    @KafkaListener (id = "consumer", topics = "topic1")
    public void consumerListener (ConsumerRecord record) {
        log.info ("topic.quick.consumer receive :" + record.toString ());
    }
}

2.2、批量消费

批量消费在现实业务场景中是很有实用性的。因为批量消费可以增大 kafka 消费吞吐量,提高性能。
批量消费实现步骤:

  • 1、重新创建一份新的消费者配置,配置为一次拉取 10 条消息。
  • 2、创建一个监听容器工厂,命名为:batchContainerFactory,设置其为批量消费并设置并发量为 5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。
  • 3、创建一个分区数为 8 的 Topic。
  • 4、创建监听方法,设置消费 id 为 batchConsumer,clientID 前缀为 batch,监听 batch,使用 batchContainerFactory 工厂创建该监听容器。
@Component
public class BatchListener {
    private static final Logger log = LoggerFactory.getLogger (BatchListener.class);

    private Map consumerProps () {
        Map props = new HashMap<>();
        props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        // 一次拉取消息数量
        props.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
        props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, NumberDeserializers.IntegerDeserializer.class);
        props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        return props;
    }

    @Bean ("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory listenerContainer () {
        ConcurrentKafkaListenerContainerFactory container
                = new ConcurrentKafkaListenerContainerFactory ();
        container.setConsumerFactory (new DefaultKafkaConsumerFactory (consumerProps ()));
        // 设置并发量,小于或等于 Topic 的分区数
        container.setConcurrency (5);
        // 必须 设置为批量监听
        container.setBatchListener (true);
        return container;
    }

    @Bean
    public NewTopic batchTopic () {
        return new NewTopic ("topic.batch", 8, (short) 1);
    }

    @KafkaListener (id = "batchConsumer", clientIdPrefix = "batch"
            , topics = {"topic.batch"}, containerFactory = "batchContainerFactory")
    public void batchListener (List data) {
        log.info ("topic.batch  receive :");
        for (String s : data) {
            log.info (s);
        }
    }
}

2.3、监听 Topic 中指定的分区

使用 @KafkaListener 注解的 topicPartitions 属性监听不同的 partition 分区。
@TopicPartition:topic 需要监听的 Topic 的名称,partitions 需要监听 Topic 的分区 id,partitionOffsets 可以设置从某个偏移量开始监听。
@PartitionOffset:partition 分区 Id,非数组,initialOffset 初始偏移量。

@Bean
public NewTopic batchWithPartitionTopic () {
    return new NewTopic ("topic.batch.partition", 8, (short) 1);
}

@KafkaListener (id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",
        topicPartitions = {
                @TopicPartition (topic = "topic1",partitions = {"1","3"}),
                @TopicPartition (topic = "topic2",partitions = {"0","4"},
                        partitionOffsets = @PartitionOffset (partition = "2",initialOffset = "100"))

        }
)
public void batchListenerWithPartition (List data) {
    log.info ("topic.batch.partition  receive :");
    for (String s : data) {
        log.info (s);
    }
}

2.4、注解方式获取消息头及消息体

当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式。。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为 List 即可,比如 List data , List key。

  • @Payload 获取的是消息的消息体,也就是发送内容。
  • @Header (KafkaHeaders.RECEIVED_MESSAGE_KEY) 获取发送消息的 key。
  • @Header (KafkaHeaders.RECEIVED_PARTITION_ID) 获取当前消息是从哪个分区中监听到的。
  • @Header (KafkaHeaders.RECEIVED_TOPIC) 获取监听的 TopicName。
  • @Header (KafkaHeaders.RECEIVED_TIMESTAMP) 获取时间戳。
@KafkaListener (id = "params", topics = "topic.params")
public void otherListener (@Payload String data,
                         @Header (KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                         @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                         @Header (KafkaHeaders.RECEIVED_TOPIC) String topic,
                         @Header (KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
    log.info ("topic.params receive : \n"+
            "data :"+data+"\n"+
            "key :"+key+"\n"+
            "partitionId :"+partition+"\n"+
            "topic :"+topic+"\n"+
            "timestamp :"+ts+"\n"
    );
}

2.5、使用 Ack 机制确认消费

Kafka 是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,当第一条消息未被确认,而第二条消息被确认的时候,Kafka 会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。Kafka 的 ack 机制可以有效的确保消费不被丢失。因为自动提交是在 kafka 拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
使用 Kafka 的 Ack 机制比较简单,只需简单的三步即可:

  • 设置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交。
  • 设置 AckMode=MANUAL_IMMEDIATE
  • 监听方法加入 Acknowledgment ack 参数。
  • 使用 Consumer.seek 方法,可以指定到某个偏移量的位置.
@Component
public class AckListener {
    private static final Logger log = LoggerFactory.getLogger (AckListener.class);
    
    private Map consumerProps () {
        Map props = new HashMap<>();
        props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean ("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory () {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory ();
        factory.setConsumerFactory (new DefaultKafkaConsumerFactory (consumerProps ()));
factory.getContainerProperties ().setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory (new DefaultKafkaConsumerFactory (consumerProps ()));
        return factory;
    }
    
    @KafkaListener (id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")
    public void ackListener (ConsumerRecord record, Acknowledgment ack) {
        log.info ("topic.quick.ack receive :" + record.value ());
        ack.acknowledge ();
    }
}

2.6、解决重复消费

上一节中使用 ack 手动提交偏移量时,假如 consumer 挂了重启,那它将从 committed offset 位置开始重新消费,而不是 consume offset 位置。这也就意味着有可能重复消费。
在 0.9 客户端中,有 3 种 ack 策略:

  • 策略 1: 自动的,周期性的 ack。
  • 策略 2:consumer.commitSync (),调用 commitSync,手动同步 ack。每处理完 1 条消息,commitSync 1 次。
  • 策略 3:consumer. commitASync (),手动异步 ack。
    那么使用策略 2,提交每处理完 1 条消息,就发送一次 commitSync。那这样是不是就可以解决 “重复消费” 了呢?如下代码:
while (true) {
    List buffer = new ArrayList<>();
    ConsumerRecords records = consumer.poll (100);
    for (ConsumerRecord record : records) {
        buffer.add (record);
    }
    insertIntoDb (buffer);    // 消除处理,存到 db
    consumer.commitSync ();   // 同步发送 ack
    buffer.clear ();
}

答案是否定的!因为上面的 insertIntoDb 和 commitSync 做不到原子操作:如果在数据处理完成,commitSync 的时候挂了,服务器再次重启,消息仍然会重复消费。
那么如何解决重复消费的问题呢?答案是自己保存 committed offset,而不是依赖 kafka 的集群保存 committed offset,把消息的处理和保存 offset 做成一个原子操作,并且对消息加入唯一 id, 进行判重。

依照官方文档,要自己保存偏移量,需要:
enable.auto.commit=false, 禁用自动 ack。
每次取到消息,把对应的 offset 存下来。
下次重启,通过 consumer.seek 函数,定位到自己保存的 offset,从那开始消费。
更进一步处理可以对消息加入唯一 id, 进行判重。

参考:
【spring-kafka】@KafkaListener详解与使用
Spring Boot与Kafka整合:实现百万级高吞吐消息处理的最佳实践

评论

暂无

添加新评论