1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package com.distinct
import java.util.Properties
import com.utils.SocketUDPClient import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, StateTtlConfig} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.time.Time import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector
object Kafka2flink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(100000)
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(2)))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
env.getCheckpointConfig.setCheckpointTimeout(120000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
val stateBackend = new FsStateBackend("file:///ssd/flink2kafka/flink-1.12.4/data") env.setStateBackend(stateBackend)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "10.7.2.20:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest")
val stream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties).setStartFromGroupOffsets())
val streamdata = stream.map(x => { (x, "1") }) .keyBy(_._1) .process(new Deduplicator())
streamdata.map(data => {data + "\n"}).writeToSocket("10.7.2.21", 6666, new SimpleStringSchema())
env.execute("Socket stream word count") }
} class udpSink() extends RichSinkFunction[String](){
var socketUDPClient: SocketUDPClient = _
override def open(parameters: Configuration): Unit = { socketUDPClient = new SocketUDPClient("10.7.2.21", 6666) }
override def invoke(value: String, context: SinkFunction.Context): Unit = { socketUDPClient.send(value) }
override def close(): Unit = { socketUDPClient.close() } }
class Deduplicator extends KeyedProcessFunction[String, (String, String), String]() {
var state: MapState[String, String] = _ val dec = new MapStateDescriptor[String, String]("state", classOf[String], classOf[String])
override def open(parameters: Configuration): Unit = {
val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(3)) .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build dec.enableTimeToLive(ttlConfig)
val context = getRuntimeContext context.getMapState(dec) state = context.getMapState(new MapStateDescriptor[String, String]("state", classOf[String], classOf[String])) }
override def processElement(i: (String, String), context: KeyedProcessFunction[String, (String, String), String]#Context, collector: Collector[String]): Unit = {
if (state.contains(i._1)) { state.put(i._1, "1") }else { state.put(i._1, "1") collector.collect(i._1) } } }
|