Kafka 高级&低级 API、同异步发送及拦截器原理
|字数总计:3.8k|阅读时长:16分钟|阅读量:
一、高级API
优点:
- 不需要执行去管理 offset,直接通过 ZK 管理;也不需要管理分区、副本,由 Kafka 统一管理
- 消费者会自动根据上一次在 ZK 中保存的 offset 去接着获取数据
- 在 ZK 中,不同的消费者组(group)同一个 topic 记录不同的 offset,这样不同程序读取同一个 topic,不会受 offset 的影响
缺点:
- 不能控制offset,例如:想从指定的位置读取
- 不能细化控制分区、副本、ZK 等
1. 自动提交offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class _2ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.88.100:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
|
二、低级API
通过使用低级 API,我们可以自己来控制 offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前 offset 是自动保存在 ZK 中,使用低级 API,我们可以将 offset 不一定要使用 ZK 存储,我们可以自己来存储 offset。例如:存储在文件、MySQL、或者内存中。但是低级 API,比较复杂,需要执行控制 offset,连接到哪个分区,并找到分区的 leader。
优点:
- 能够开发者自己控制 offset,想从哪里读取就从哪里读取。
- 自行控制连接分区,对分区自定义进行负载均衡
- 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储 offset 即可,比如存在文件或者内存中)
缺点:
- 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等
1. 手动提交offset
虽然高级 API 自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
1.1. 同步提交offset
同步提交 offset 有失败重试机制,故更加可靠
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class CustomComsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
|
1.2. 异步提交offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class CustomConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for" + offsets); } } }); } } }
|
1.3. 数据漏消费和重复消费
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。
2. 自定义存储offset
Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka 的一个内置的topic 中。除此之外,Kafka 还可以选择自定义存储 offset。offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace(当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance)。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其
中提交和获取offset 的方法,需要根据所选的 offset 存储系统自行实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>(); public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { commitOffset(currentOffset); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { currentOffset.clear(); for (TopicPartition partition : partitions) { consumer.seek(partition, getOffset(partition)); } } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()); } commitOffset(currentOffset); } } private static long getOffset(TopicPartition partition) { return 0; } private static void commitOffset(Map<TopicPartition, Long> currentOffset) { } }
|
三、kafka 同步、异步发送
- Kafka 的 Producer 发送消息采用的是异步发送的方式
- 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator
- main 线程将消息发送给 RecordAccumulator,消息根据 topic-partition 分类缓存
- 消息累积到
batch.size
或者时间达到了linger.ms
,sender 线程将该批量的消息发送到 topic-partition 所在的 broker
1. 异步发送
1.1. 不带回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class KafkaProducerTest { public static void main(String[] args) throws ExecutionException,InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("test", null, i + ""); } producer.close(); } }
|
1.2. 带回调函数
如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; ++i) {
producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null) { System.out.println("发送消息出现异常"); } else { String topic = metadata.topic(); int partition = metadata.partition(); long offset = metadata.offset(); System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!"); } } }); }
producer.close(); } }
|
2. 同步发送
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.88.100:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; ++i) { try { Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + "")); future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
producer.close(); } }
|
四、自定义Interceptor
1. 拦截器原理
Producer 拦截器(interceptor) 是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息
等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定义的方法包括:
- configure(configs)
- onSend(ProducerRecord)
- 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。
用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区
,否则会影响目标分区的计算。
- onAcknowledgement(RecordMetadata, Exception)
该方法会在消息从RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用。
并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
- close
- 关闭 interceptor,主要用于执行一些资源清理工作
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们
,并仅仅是捕获每个interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
2. 拦截器案例
需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
实操:
- 增加时间戳拦截器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }
|
- 统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { }
@Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }
|
- producer 主程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class InterceptorProducer { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); List<String> interceptors = new ArrayList<>(); interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i); producer.send(record); } producer.close(); } }
|