我这里的业务背景比较复杂,数据是非结构化的且类型很多,更致命的是数据没有唯一ID字段!!!所以只能基于整条数据去分区去重,本来打算用布隆过滤器来实现,可是布隆过滤器的误判太搞人了,业务不允许,所以还是用MapState来做,这里去掉了flink优化的代码,便于学习和观看,后面我会写一个flink对接kafka去重加细节优化的博客,感兴趣的可以给个关注。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| object flinkMapState {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("10.7.2.20", 9999) val streamdata = stream.map(x => { (x, "1") }) .keyBy(_._1) .process(new Deduplicator()) streamdata .print()
env.execute("Socket stream word count") }
} class Deduplicator extends KeyedProcessFunction[String, (String, String), String]() { var state: MapState[String, String] = _ val dec = new MapStateDescriptor[String, String]("state", classOf[String], classOf[String])
override def open(parameters: Configuration): Unit = { val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(3)) .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build dec.enableTimeToLive(ttlConfig) val context = getRuntimeContext context.getMapState(dec) state = context.getMapState(new MapStateDescriptor[String, String]("state", classOf[String], classOf[String])) }
override def processElement(i: (String, String), context: KeyedProcessFunction[String, (String, String), String]#Context, collector: Collector[String]): Unit = {
if (state.contains(i._1)) { state.put(i._1, "1") }else { state.put(i._1, "1") collector.collect(i._1) } } }
|
效果图我就不放了,这代码我已经跑了N+1次了,如果你们运行不动的话,,,,,,那就是你们的问题了哈哈