没错,这是基于下面几篇博客完成的功能:
Flink基于MapState实时去重
Flink接收kafka source数据
Java通过UDP端口发送数据
我也是在边开发边写博客呀(心累),这篇就是之前说的“flink对接kafka去重加细节优化”的博客,加入了优化部分,解决了kafka丢失数据的问题,当然UDP也会丢少量数据,大概不到千分之一吧,业务不允许的可以通过TCP发送,代码我也会加到下面。
等会看到那详细的注释你们就明白我有多细心了(嘿嘿),给个关注我就不心累了-。-

不多哔哔了,上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.distinct

import java.util.Properties

import com.utils.SocketUDPClient
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, StateTtlConfig}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

object Kafka2flink {

def main(args: Array[String]): Unit = {

// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 开启CheckPointing
env.enableCheckpointing(100000)
// 设置重启,出现异常重启3次,隔5秒一次(默认固定延迟无限重启)
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(2)))
// checkpoint最小间隔(确保检查点之间有至少1000 ms的间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// checkpoint的超时时间(检查点必须在2分钟内完成,否则被丢弃)
env.getCheckpointConfig.setCheckpointTimeout(120000)
// 同一时间只允许进行一个检查点
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 设置statebackend
val stateBackend = new FsStateBackend("file:///ssd/flink2kafka/flink-1.12.4/data")
env.setStateBackend(stateBackend)
// 一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 设置Checkpoint模式(与kafka整合,设置Checkpoint模式为exactly-once)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// Kafka props
val properties = new Properties()
// 指定Kafka的Broker地址
properties.setProperty("bootstrap.servers", "10.7.2.20:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

// Checkpoint成功后,还会向Kafka特殊的topic中写偏移量(此处不建议改为false)
// 设置为false后,则不会向特殊topic中写偏移量
// KafkaSource.setCommitOffsetsOnCheckpoints(false)

// 创建Kafka DataStream setStartFromGroupOffsets():从上次消费位点开始消费
val stream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties).setStartFromGroupOffsets())

// val stream: DataStream[String] = env.socketTextStream("10.7.2.20", 9999)
val streamdata = stream.map(x => {
(x, "1")
})
.keyBy(_._1)
.process(new Deduplicator())

// streamdata
// .print()

// 通过TCP端口发送到第三方
streamdata.map(data => {data + "\n"}).writeToSocket("10.7.2.21", 6666, new SimpleStringSchema())
// 自定义sink 通过UDP端口发送到第三方
// streamdata.addSink(new udpSink())

// 启动executor,执行任务
env.execute("Socket stream word count")
}

}
class udpSink() extends RichSinkFunction[String](){

var socketUDPClient: SocketUDPClient = _

override def open(parameters: Configuration): Unit = {
socketUDPClient = new SocketUDPClient("10.7.2.21", 6666)
}

override def invoke(value: String, context: SinkFunction.Context): Unit = {
socketUDPClient.send(value)
}

override def close(): Unit = {
socketUDPClient.close()
}
}

class Deduplicator extends KeyedProcessFunction[String, (String, String), String]() {

var state: MapState[String, String] = _
val dec = new MapStateDescriptor[String, String]("state", classOf[String], classOf[String])

override def open(parameters: Configuration): Unit = {

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(3)) //这是state存活时间10s
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)//设置过期时间更新方式
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永远不要返回过期的状态
// .cleanupInRocksdbCompactFilter(5)//处理完1000个状态查询时候,会启用一次CompactFilter
.build
dec.enableTimeToLive(ttlConfig)

val context = getRuntimeContext
context.getMapState(dec)
state = context.getMapState(new MapStateDescriptor[String, String]("state", classOf[String], classOf[String]))
}

override def processElement(i: (String, String), context: KeyedProcessFunction[String, (String, String), String]#Context, collector: Collector[String]): Unit = {

if (state.contains(i._1)) {
state.put(i._1, "1")
}else {
state.put(i._1, "1")
collector.collect(i._1)
}
}
}

这是UDP发送的工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.utils;

import java.io.IOException;
import java.net.*;

public class SocketUDPClient {

private InetAddress ip;
private int port;
private DatagramSocket socket;

public SocketUDPClient(String ip, int port) throws UnknownHostException, SocketException {
this.ip = InetAddress.getByName(ip);
this.port = port;
// 创建一个UDP套接字,与本地任意一个未使用的UDP端口绑定
socket = new DatagramSocket();
// 与本地一个固定的UDP端口绑定
// socket=new DatagramSocket(9000);
}

public void send(String data){
try {
//先准备一个待发送的数据报
byte[] outputData=data.getBytes();
//构建一个数据报文。
DatagramPacket outputPacket=new DatagramPacket(outputData, outputData.length, ip, port);
//给EchoUDPServer发送数据报
socket.send(outputPacket); //给EchoUDPServer发送数据报
} catch (IOException ex) { }
}

public void close(){
if (socket != null)
socket.close();//释放本地端口
}

}

哦对,还有pom文件(我可真细心)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>flink2kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.4</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

</dependencies>

<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

喜欢的小伙伴给个关注吧~~~