设为首页 加入收藏

TOP

SpringBoot-Learning系列之Kafka整合(四)
2023-09-23 15:44:41 】 浏览:314
Tags:SpringBoot-Learning Kafka 整合
92 producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: missing-topics-fatal: false # MANUAL poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交 # MANUAL_IMMEDIATE 每处理完业务手动调用Acknowledgment.acknowledge()后立即提交 # RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # COUNT_TIME TIME或COUNT满足其中一个时提交 ack-mode: manual_immediate consumer: group-id: test # 是否自动提交 enable-auto-commit: false max-poll-records: 100 # 用于指定消费者在启动时、重置消费偏移量时的行为。 # earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。 # latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。 # none:如果找不到已保存的消费偏移量,消费者会抛出一个异常 auto-offset-reset: earliest auto-commit-interval: 100 # 指定消息key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: max.poll.interval.ms: 3600000 server: port: 8888spring: kafka: bootstrap-servers: 172.31.192.1:9092 producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer listener: missing-topics-fatal: false ack-mode: manual_immediate consumer: group-id: test enable-auto-commit: false max-poll-records: 100 auto-offset-reset: earliest auto-commit-interval: 100 # 指定消息key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: max.poll.interval.ms: 3600000
  • 生产者代码示例

    package io.github.vino42.publiser;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * =====================================================================================
     *
     * @Created :   2023/8/30 21:29
     * @Compiler :  jdk 17
     * @Author :    VINO
     * @Copyright : VINO
     * @Decription : kafak 消息生产者
     * =====================================================================================
     */
    @Component
    public class KafkaPublishService {
        @Autowired
        KafkaTemplate kafkaTemplate;
    
        /**
         * 这里为了简单 直接发送json字符串
         *
         * @param json
         */
        public void send(String topic, String json) {
            kafkaTemplate.send(topic, json);
        }
    }
    
    
        @RequestMapping("/send")
        public String send() {
            IntStream.range(0, 10000).forEach(d -> {
                kafkaPublishService.send("test", RandomUtil.randomString(16));
            });
            return "ok";
        }
    
    
  • 消费者

    @Component
    @Slf4j
    public class CustomKafkaListener {
    
        @org.springframework.kafka.annotation.KafkaListener(topics = "test")
        pu
  • 首页 上一页 1 2 3 4 下一页 尾页 4/4/4
    】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
    上一篇SpringBoot集成微信支付JSAPIV3保.. 下一篇Spring 注入集合

    最新文章

    热门文章

    Hot 文章

    Python

    C 语言

    C++基础

    大数据基础

    linux编程基础

    C/C++面试题目