92
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
missing-topics-fatal: false
# MANUAL poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
# MANUAL_IMMEDIATE 每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
# RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT_TIME TIME或COUNT满足其中一个时提交
ack-mode: manual_immediate
consumer:
group-id: test
# 是否自动提交
enable-auto-commit: false
max-poll-records: 100
# 用于指定消费者在启动时、重置消费偏移量时的行为。
# earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。
# latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。
# none:如果找不到已保存的消费偏移量,消费者会抛出一个异常
auto-offset-reset: earliest
auto-commit-interval: 100
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 3600000
server:
port: 8888spring:
kafka:
bootstrap-servers: 172.31.192.1:9092
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
missing-topics-fatal: false
ack-mode: manual_immediate
consumer:
group-id: test
enable-auto-commit: false
max-poll-records: 100
auto-offset-reset: earliest
auto-commit-interval: 100
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 3600000
生产者代码示例
package io.github.vino42.publiser;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* =====================================================================================
*
* @Created : 2023/8/30 21:29
* @Compiler : jdk 17
* @Author : VINO
* @Copyright : VINO
* @Decription : kafak 消息生产者
* =====================================================================================
*/
@Component
public class KafkaPublishService {
@Autowired
KafkaTemplate kafkaTemplate;
/**
* 这里为了简单 直接发送json字符串
*
* @param json
*/
public void send(String topic, String json) {
kafkaTemplate.send(topic, json);
}
}
@RequestMapping("/send")
public String send() {
IntStream.range(0, 10000).forEach(d -> {
kafkaPublishService.send("test", RandomUtil.randomString(16));
});
return "ok";
}
消费者
@Component
@Slf4j
public class CustomKafkaListener {
@org.springframework.kafka.annotation.KafkaListener(topics = "test")
pu