设为首页 加入收藏

TOP

Flink1.9整合Kafka(四)
2019-09-23 11:17:55 】 浏览:111
Tags:Flink1.9 整合 Kafka
;bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val myConsumer = new FlinkKafkaConsumer08[String]( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema, properties) val stream = env.addSource(myConsumer) ...
时间戳和水印

在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。

我们可以定义好Timestamp Extractors / Watermark Emitters,通过以下方式将其传递给您的消费者:

java

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
    .addSource(myConsumer)
    .print();

scala

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
    .addSource(myConsumer)
    .print()

Kafka Producer

Kafka Producer 根据版本分别叫做FlinkProducer011 FlinkKafkaProducer010等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。

构建FlinkKafkaConsumer

java

DataStream<String> stream = ...;

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);

stream.addSink(myProducer);

scala

val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer011[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)

stream.addSink(myProducer)

需要指定broker list , topic,序列化类。

自定义分区:默认情况下,将使用FlinkFixedPartitioner将每个Flink Kafka Producer并行子任务映射到单个Kafka分区。

可以实现FlinkKafkaPartitioner类自定义分区。

Flink1.9消费Kafka完整代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class KafkaConsumer {

    public static void main(Str
首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Robo 3T SQL 下一篇海盗分金问题SQL求解(贪心算法)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目