设为首页 加入收藏

TOP

Flink1.9整合Kafka(二)
2019-09-23 11:17:55 】 浏览:112
Tags:Flink1.9 整合 Kafka
t;, "localhost:2181") properties.setProperty("group.id", "test") stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) .print()

必须有的:

1.topic名称

2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema

3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)

配置消费起始位置

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

//指定位置
//Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

DataStream<String> stream = env.addSource(myConsumer);

scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

//指定位置
//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

val stream = env.addSource(myConsumer)
检查点

启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用Kafka的记录。

如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。

如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
分区发现

Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次性保证消费它们。

还可以使用正则:

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(myConsumer);
...

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Robo 3T SQL 下一篇海盗分金问题SQL求解(贪心算法)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目