前期准备好kafka环境,我这里是1.12.0版本的Flink,Kafka对应版本官方推荐2.4.1,Scala版本2.12

  1. 在kafka目录下启动zookeeper(我这里用的是kafka自带的)
1
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  1. 启动kafka
1
./bin/kafka-server-start.sh -daemon config/server.properties
  1. 别忘了添加Kafka连接器依赖
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.0</version>
</dependency>
  1. 上代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object KafkaSource {

def main(args: Array[String]): Unit = {

// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("bootstrap.servers", "IP:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val stream = env.addSource(new FlinkKafkaConsumer[String]("topic名称", new SimpleStringSchema(), properties))

stream.print()

// 启动executor,执行任务
env.execute("Kafka Source")
}

}
  1. 开启生产者控制台打印数据测试
1
./bin/kafka-console-producer.sh --broker-list IP:9092 -topic test