Apache Flink 漫谈系列(15) – DataStream Connectors之Kafka

作者:孙金城 2019-01-15 08:50:12开发开发工具Kafka 为了满足本系列读者的需求,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。

一、聊什么

为了满足本系列读者的需求,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。

二、Kafka 简介

Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,LinkedIn于2010年贡献给了Apache基金会并成为***开源项目。Kafka用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。

Kafka不但是分布式消息系统而且也支持流式计算,所以在介绍Kafka在Apache Flink中的应用之前,先以一个Kafka的简单示例直观了解什么是Kafka。

1. 安装

本篇不是系统的,详尽的介绍Kafka,而是想让大家直观认识Kafka,以便在Apahe Flink中进行很好的应用,所以我们以最简单的方式安装Kafka。

(1) 下载二进制包:

    curl-L-Ohttp://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

(2) 解压安装

Kafka安装只需要将下载的tgz解压即可,如下:

    jincheng:kafkajincheng.sunjc$tar-zxfkafka_2.11-2.1.0.tgzjincheng:kafkajincheng.sunjc$cdkafka_2.11-2.1.0jincheng:kafka_2.11-2.1.0jincheng.sunjc$lsLICENSENOTICEbinconfiglibssite-docs

其中bin包含了所有Kafka的管理命令,如接下来我们要启动的Kafka的Server。

(3) 启动Kafka Server

Kafka是一个发布订阅系统,消息订阅首先要有个服务存在。我们启动一个Kafka Server 实例。 Kafka需要使用ZooKeeper,要进行投产部署我们需要安装ZooKeeper集群,这不在本篇的介绍范围内,所以我们利用Kafka提供的脚本,安装一个只有一个节点的ZooKeeper实例。如下:

    jincheng:kafka_2.11-2.1.0jincheng.sunjc$bin/zookeeper-server-start.shconfig/zookeeper.properties&[2019-01-1309:06:19,985]INFOReadingconfigurationfrom:config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)........[2019-01-1309:06:20,061]INFObindingtoport0.0.0.0/0.0.0.0:2181(org.apache.zookeeper.server.NIOServerCnxnFactory)

启动之后,ZooKeeper会绑定2181端口(默认)。接下来我们启动Kafka Server,如下:

    jincheng:kafka_2.11-2.1.0jincheng.sunjc$bin/kafka-server-start.shconfig/server.properties[2019-01-1309:09:16,937]INFORegisteredkafkakafka:type=kafka.Log4jControllerMBean(kafka.utils.Log4jControllerRegistration$)[2019-01-1309:09:17,267]INFOstarting(kafka.server.KafkaServer)[2019-01-1309:09:17,267]INFOConnectingtozookeeperonlocalhost:2181(kafka.server.KafkaServer)[2019-01-1309:09:17,284]INFO[ZooKeeperClient]Initializinganewsessiontolocalhost:2181.(kafka.zookeeper.ZooKeeperClient)......[2019-01-1309:09:18,253]INFO[KafkaServerid=0]started(kafka.server.KafkaServer)

如果上面一切顺利,Kafka的安装就完成了。

2. 创建Topic

Kafka是消息订阅系统,首先创建可以被订阅的Topic,我们创建一个名为flink-tipic的Topic,在一个新的terminal中,执行如下命令:

    jincheng:kafka_2.11-2.1.0jincheng.sunjc$bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topicflink-tipicCreatedtopic"flink-tipic".

在Kafka Server的terminal中也会输出如下成功创建信息:

    ...[2019-01-1309:13:31,156]INFOCreatedlogforpartitionflink-tipic-0in/tmp/kafka-logswithproperties{compression.type->producer,message.format.version->2.1-IV2,file.delete.delay.ms->60000,max.message.bytes->1000012,min.compaction.lag.ms->0,message.timestamp.type->CreateTime,message.downconversion.enable->true,min.insync.replicas->1,segment.jitter.ms->0,preallocate->false,min.cleanable.dirty.ratio->0.5,index.interval.bytes->4096,unclean.leader.election.enable->false,retention.bytes->-1,delete.retention.ms->86400000,cleanup.policy->[delete],flush.ms->9223372036854775807,segment.ms->604800000,segment.bytes->1073741824,retention.ms->604800000,message.timestamp.difference.max.ms->9223372036854775807,segment.index.bytes->10485760,flush.messages->9223372036854775807}.(kafka.log.LogManager)...

上面显示了flink-topic的基本属性配置,如消息压缩方式,消息格式,备份数量等等。

除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下:

    jincheng:kafka_2.11-2.1.0jincheng.sunjc$bin/kafka-topics.sh--list--zookeeperlocalhost:2181flink-tipic

如果输出flink-tipic那么说明我们的Topic成功创建了。

那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为了直观,我们看如下Kafka架构示意图简单理解一下:

简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。

3. 发送消息

如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

    jincheng:kafka_2.11-2.1.0jincheng.sunjc$bin/kafka-console-producer.sh--broker-listlocalhost:9092--topicflink-topic>Kafkatestmsg>Kafkaconnector

上面我们发送了两条消息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。

4. 读取消息

如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下:

    jincheng:kafka_2.11-2.1.0jincheng.sunjc$bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicflink-topic--from-beginningKafkatestmsgKafkaconnector

其中–from-beginning 描述了我们从Topic开始位置读取消息。

三、Flink Kafka Connector

前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink Connector相关的基础知识会在《Apache Flink 漫谈系列(14) – Connectors》中介绍,这里我们直接介绍与Kafka Connector相关的内容。

Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。

1. mvn 依赖

要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下:

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.7.0</version></dependency>

Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。 DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte [] message)方法,从Kafka传递值。

2. Examples

我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下:

    bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topicflink-tipic-output

所以示例中我们Source利用flink-topic, Sink用slink-topic-output。

(1) Simple ETL

我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serialize和deserialize的实现,也就是我们要定义一个实现DeserializationSchema和SerializationSchema 的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。

KafkaMsgSchema – 完整代码
    importorg.apache.flink.api.common.serialization.DeserializationSchema;importorg.apache.flink.api.common.serialization.SerializationSchema;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Preconditions;importjava.io.IOException;importjava.io.ObjectInputStream;importjava.io.ObjectOutputStream;importjava.nio.charset.Charset;publicclassKafkaMsgSchemaimplementsDeserializationSchema<String>,SerializationSchema<String>{privatestaticfinallongserialVersionUID=1L;privatetransientCharsetcharset;publicKafkaMsgSchema(){//默认UTF-8编码this(Charset.forName("UTF-8"));}publicKafkaMsgSchema(Charsetcharset){this.charset=Preconditions.checkNotNull(charset);}publicCharsetgetCharset(){returnthis.charset;}publicStringdeserialize(byte[]message){//将Kafka的消息反序列化为java对象returnnewString(message,charset);}publicbooleanisEndOfStream(StringnextElement){//流永远不结束returnfalse;}publicbyte[]serialize(Stringelement){//将java对象序列化为Kafka的消息returnelement.getBytes(this.charset);}publicTypeInformation<String>getProducedType(){//定义产生的数据TypeinforeturnBasicTypeInfo.STRING_TYPE_INFO;}privatevoidwriteObject(ObjectOutputStreamout)throwsIOException{out.defaultWriteObject();out.writeUTF(this.charset.name());}privatevoidreadObject(ObjectInputStreamin)throwsIOException,ClassNotFoundException{in.defaultReadObject();StringcharsetName=in.readUTF();this.charset=Charset.forName(charsetName);}}
主程序 – 完整代码
    importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.utils.ParameterTool;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;importjava.util.Properties;publicclassKafkaExample{publicstaticvoidmain(String[]args)throwsException{//用户参数获取finalParameterToolparameterTool=ParameterTool.fromArgs(args);//Stream环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//Source的topicStringsourceTopic="flink-topic";//Sink的topicStringsinkTopic="flink-topic-output";//broker地址Stringbroker="localhost:9092";//属性参数-实际投产可以在命令行传入Propertiesp=parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers",broker);env.getConfig().setGlobalJobParameters(parameterTool);//创建消费者FlinkKafkaConsumerconsumer=newFlinkKafkaConsumer<String>(sourceTopic,newKafkaMsgSchema(),p);//设置读取最早的数据//consumer.setStartFromEarliest();//读取Kafka消息DataStream<String>input=env.addSource(consumer);//数据处理DataStream<String>result=input.map(newMapFunction<String,String>(){publicStringmap(Strings)throwsException{Stringmsg="Flinkstudy".concat(s);System.out.println(msg);returnmsg;}});//创建生产者FlinkKafkaProducerproducer=newFlinkKafkaProducer<String>(sinkTopic,newKeyedSerializationSchemaWrapper<String>(newKafkaMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);//将数据写入Kafka指定Topic中result.addSink(producer);//执行jobenv.execute("KafkaExample");}}

运行主程序如下:

我测试操作的过程如下:

启动flink-topic和flink-topic-output的消费拉取;通过命令向flink-topic中添加测试消息only for test;通过命令打印验证添加的测试消息 only for test;最简单的FlinkJob source->map->sink 对测试消息进行map处理:”Flink study “.concat(s);通过命令打印sink的数据;

(2) 内置Schemas

Apache Flink 内部提供了如下3种内置的常用消息格式的Schemas:

TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation创建模式。 如果数据由Flink写入和读取,这将非常有用。JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / …)()从中访问字段。 KeyValue objectNode包含“key”和“value”字段,其中包含所有字段以及可选的”metadata”字段,该字段公开此消息的偏移量/分区/主题。AvroDeserializationSchema 它使用静态提供的模式读取使用Avro格式序列化的数据。 它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))推断出模式,或者它可以与GenericRecords一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(…))

要使用内置的Schemas需要添加如下依赖:

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>1.7.0</version></dependency>

(3) 读取位置配置

我们在消费Kafka数据时候,可能需要指定消费的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置设置,如下:

consumer.setStartFromEarliest() – 从最早的记录开始;consumer.setStartFromLatest() – 从***记录开始;consumer.setStartFromTimestamp(…); // 从指定的epoch时间戳(毫秒)开始;consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

上面的位置指定可以精确到每个分区,比如如下代码:

    Map<KafkaTopicPartition,Long>specificStartOffsets=newHashMap<>();specificStartOffsets.put(newKafkaTopicPartition("myTopic",0),23L);//***个分区从23L开始specificStartOffsets.put(newKafkaTopicPartition("myTopic",1),31L);//第二个分区从31L开始specificStartOffsets.put(newKafkaTopicPartition("myTopic",2),43L);//第三个分区从43L开始consumer.setStartFromSpecificOffsets(specificStartOffsets);

对于没有指定的分区还是默认的setStartFromGroupOffsets方式。

(4) Topic发现

Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如:

    //创建消费者FlinkKafkaConsumerconsumer=newFlinkKafkaConsumer<String>(java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),newKafkaMsgSchema(),p);

在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。

3. 定义Watermark(Window)

对Kafka Connector的应用不仅限于上面的简单数据提取,我们更多时候是期望对Kafka数据进行Event-time的窗口操作,那么就需要在Flink Kafka Source中定义Watermark。

要定义Event-time,首先是Kafka数据里面携带时间属性,假设我们数据是String#Long的格式,如only for test#1000。那么我们将Long作为时间列。

KafkaWithTsMsgSchema – 完整代码

要想解析上面的Kafka的数据格式,我们需要开发一个自定义的Schema,比如叫KafkaWithTsMsgSchema,将String#Long解析为一个Java的Tuple2

    importorg.apache.flink.api.common.serialization.DeserializationSchema;importorg.apache.flink.api.common.serialization.SerializationSchema;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.typeutils.TupleTypeInfo;importorg.apache.flink.util.Preconditions;importjava.io.IOException;importjava.io.ObjectInputStream;importjava.io.ObjectOutputStream;importjava.nio.charset.Charset;publicclassKafkaWithTsMsgSchemaimplementsDeserializationSchema<Tuple2<String,Long>>,SerializationSchema<Tuple2<String,Long>>{privatestaticfinallongserialVersionUID=1L;privatetransientCharsetcharset;publicKafkaWithTsMsgSchema(){this(Charset.forName("UTF-8"));}publicKafkaWithTsMsgSchema(Charsetcharset){this.charset=Preconditions.checkNotNull(charset);}publicCharsetgetCharset(){returnthis.charset;}publicTuple2<String,Long>deserialize(byte[]message){Stringmsg=newString(message,charset);String[]dataAndTs=msg.split("#");if(dataAndTs.length==2){returnnewTuple2<String,Long>(dataAndTs[0],Long.parseLong(dataAndTs[1].trim()));}else{//实际生产上需要抛出runtime异常System.out.println("Failduetoinvalidmsgformat..["+msg+"]");returnnewTuple2<String,Long>(msg,0L);}}@OverridepublicbooleanisEndOfStream(Tuple2<String,Long>stringLongTuple2){returnfalse;}publicbyte[]serialize(Tuple2<String,Long>element){return"MAX-".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);}privatevoidwriteObject(ObjectOutputStreamout)throwsIOException{out.defaultWriteObject();out.writeUTF(this.charset.name());}privatevoidreadObject(ObjectInputStreamin)throwsIOException,ClassNotFoundException{in.defaultReadObject();StringcharsetName=in.readUTF();this.charset=Charset.forName(charsetName);}@OverridepublicTypeInformation<Tuple2<String,Long>>getProducedType(){returnnewTupleTypeInfo<Tuple2<String,Long>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO);}}
Watermark生成

提取时间戳和创建Watermark,需要实现一个自定义的时间提取和Watermark生成器。在Apache Flink 内部有2种方式如下:

AssignerWithPunctuatedWatermarks – 每条记录都产生Watermark。AssignerWithPeriodicWatermarks – 周期性的生成Watermark。

我们以AssignerWithPunctuatedWatermarks为例写一个自定义的时间提取和Watermark生成器。代码如下:

    importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;importorg.apache.flink.streaming.api.watermark.Watermark;importjavax.annotation.Nullable;publicclassKafkaAssignerWithPunctuatedWatermarksimplementsAssignerWithPunctuatedWatermarks<Tuple2<String,Long>>{@Nullable@OverridepublicWatermarkcheckAndGetNextWatermark(Tuple2<String,Long>o,longl){//利用提取的时间戳创建WatermarkreturnnewWatermark(l);}@OverridepubliclongextractTimestamp(Tuple2<String,Long>o,longl){//提取时间戳returno.f1;}}

主程序 – 完整程序

我们计算一个大小为1秒的Tumble窗口,计算窗口内***的值。完整的程序如下:

    importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.typeutils.TupleTypeInfo;importorg.apache.flink.api.java.utils.ParameterTool;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;importjava.util.Properties;publicclassKafkaWithEventTimeExample{publicstaticvoidmain(String[]args)throwsException{//用户参数获取finalParameterToolparameterTool=ParameterTool.fromArgs(args);//Stream环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//设置Event-timeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//Source的topicStringsourceTopic="flink-topic";//Sink的topicStringsinkTopic="flink-topic-output";//broker地址Stringbroker="localhost:9092";//属性参数-实际投产可以在命令行传入Propertiesp=parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers",broker);env.getConfig().setGlobalJobParameters(parameterTool);//创建消费者FlinkKafkaConsumerconsumer=newFlinkKafkaConsumer<Tuple2<String,Long>>(sourceTopic,newKafkaWithTsMsgSchema(),p);//读取Kafka消息TypeInformation<Tuple2<String,Long>>typeInformation=newTupleTypeInfo<Tuple2<String,Long>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.LONG_TYPE_INFO);DataStream<Tuple2<String,Long>>input=env.addSource(consumer).returns(typeInformation)//提取时间戳,并生产Watermark.assignTimestampsAndWatermarks(newKafkaAssignerWithPunctuatedWatermarks());//数据处理DataStream<Tuple2<String,Long>>result=input.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).max(0);//创建生产者FlinkKafkaProducerproducer=newFlinkKafkaProducer<Tuple2<String,Long>>(sinkTopic,newKeyedSerializationSchemaWrapper<Tuple2<String,Long>>(newKafkaWithTsMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);//将数据写入Kafka指定Topic中result.addSink(producer);//执行jobenv.execute("KafkaWithEvent-timeExample");}}

测试运行如下:

简单解释一下,我们输入数如下:

我们看的5000000~7000000之间的数据,其中B#5000000, C#5000100和E#5000120是同一个窗口的内容。计算MAX值,按字符串比较,***的消息就是输出的E#5000120。

4. Kafka携带Timestamps

在Kafka-0.10+ 消息可以携带timestamps,也就是说不用单独的在msg中显示添加一个数据列作为timestamps。只有在写入和读取都用Flink时候简单一些。一般情况用上面的示例方式已经足够了。

四、小结

本篇重点是向大家介绍Kafka如何在Flink中进行应用,开篇介绍了Kafka的简单安装和收发消息的命令演示,然后以一个简单的数据提取和一个Event-time的窗口示例让大家直观的感受如何在Apache Flink中使用Kafka。愿介绍的内容对您有所帮助!

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

作者:孙金城,花名 金竹,目前就职于阿里巴巴,自2015年以来一直投入于基于Apache Flink的阿里巴巴计算平台Blink的设计研发工作。

【本文为51CTO专栏作者“金竹”原创稿件,转载请联系原作者】

戳这里,看该作者更多好文