我这里的业务背景比较复杂,数据是非结构化的且类型很多,更致命的是数据没有唯一ID字段!!!所以只能基于整条数据去分区去重,本来打算用布隆过滤器来实现,可是布隆过滤器的误判太搞人了,业务不允许,所以还是用MapState来做,这里去掉了flink优化的代码,便于学习和观看,后面我会写一个flink对接kafka去重加细节优化的博客,感兴趣的可以给个关注。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
object flinkMapState {

def main(args: Array[String]): Unit = {

// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[String] = env.socketTextStream("10.7.2.20", 9999)
val streamdata = stream.map(x => {
(x, "1")
})
.keyBy(_._1)
.process(new Deduplicator())

streamdata
.print()

// 启动executor,执行任务
env.execute("Socket stream word count")
}


}
class Deduplicator extends KeyedProcessFunction[String, (String, String), String]() {
var state: MapState[String, String] = _
val dec = new MapStateDescriptor[String, String]("state", classOf[String], classOf[String])

override def open(parameters: Configuration): Unit = {
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(3)) //这是state存活时间10s
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)//设置过期时间更新方式
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//永远不要返回过期的状态
// .cleanupInRocksdbCompactFilter(5)//处理完1000个状态查询时候,会启用一次CompactFilter
.build
dec.enableTimeToLive(ttlConfig)
val context = getRuntimeContext
context.getMapState(dec)
state = context.getMapState(new MapStateDescriptor[String, String]("state", classOf[String], classOf[String]))
}

override def processElement(i: (String, String), context: KeyedProcessFunction[String, (String, String), String]#Context, collector: Collector[String]): Unit = {

if (state.contains(i._1)) {
state.put(i._1, "1")
}else {
state.put(i._1, "1")
collector.collect(i._1)
}
}
}

效果图我就不放了,这代码我已经跑了N+1次了,如果你们运行不动的话,,,,,,那就是你们的问题了哈哈