1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.demo

import java.util
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object KafkaSource {

def main(args: Array[String]): Unit = {

// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("bootstrap.servers", "10.7.2.20:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val topics = new util.LinkedList[String]
topics.add("test1")
topics.add("test2")

// val stream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties))
val stream = env.addSource(new FlinkKafkaConsumer[String](topics, new SimpleStringSchema(), properties))

stream.print()

// 启动executor,执行任务
env.execute("Socket stream word count")
}

}

开启两个Kafka生产者,分别是test1、test2两个topic,输入测试数据:




可以看到数据都打印在控制台了: