前期准备好kafka环境,我这里是1.12.0版本的Flink,Kafka对应版本官方推荐2.4.1,Scala版本2.12
- 在kafka目录下启动zookeeper(我这里用的是kafka自带的)
1
| ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
|
- 启动kafka
1
| ./bin/kafka-server-start.sh -daemon config/server.properties
|
- 别忘了添加Kafka连接器依赖
1 2 3 4 5 6
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.12.0</version> </dependency>
|
- 上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object KafkaSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties() properties.setProperty("bootstrap.servers", "IP:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest")
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic名称", new SimpleStringSchema(), properties))
stream.print()
env.execute("Kafka Source") }
}
|
- 开启生产者控制台打印数据测试
1
| ./bin/kafka-console-producer.sh --broker-list IP:9092 -topic test
|