一、高级API

优点:

  • 不需要执行去管理 offset,直接通过 ZK 管理;也不需要管理分区、副本,由 Kafka 统一管理
  • 消费者会自动根据上一次在 ZK 中保存的 offset 去接着获取数据
  • 在 ZK 中,不同的消费者组(group)同一个 topic 记录不同的 offset,这样不同程序读取同一个 topic,不会受 offset 的影响

缺点:

  • 不能控制offset,例如:想从指定的位置读取
  • 不能细化控制分区、副本、ZK 等

1. 自动提交offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class _2ConsumerTest {
public static void main(String[] args) {
// 创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.88.100:9092");
props.setProperty("group.id", "test");
// 是否开启自动提交 offset 功能
props.setProperty("enable.auto.commit", "true");
// 自动提交 offset 的时间间隔
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅要消费的主题
consumer.subscribe(Arrays.asList("test"));

// 使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// 定义100毫秒超时
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

二、低级API

通过使用低级 API,我们可以自己来控制 offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前 offset 是自动保存在 ZK 中,使用低级 API,我们可以将 offset 不一定要使用 ZK 存储,我们可以自己来存储 offset。例如:存储在文件、MySQL、或者内存中。但是低级 API,比较复杂,需要执行控制 offset,连接到哪个分区,并找到分区的 leader。
优点:

  • 能够开发者自己控制 offset,想从哪里读取就从哪里读取。
  • 自行控制连接分区,对分区自定义进行负载均衡
  • 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储 offset 即可,比如存在文件或者内存中)

缺点:

  • 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等

1. 手动提交offset

虽然高级 API 自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

1.1. 同步提交offset

同步提交 offset 有失败重试机制,故更加可靠

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CustomComsumer {
public static void main(String[] args) {
Properties props = new Properties();
//Kafka 集群
props.put("bootstrap.servers", "hadoop102:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
props.put("group.id", "test");
//关闭自动提交offse
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 消费者订阅主题
consumer.subscribe(Arrays.asList("first"));

while (true) {
// 消费者拉取数据
ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交,当前线程会阻塞直到 offset 提交成功
consumer.commitSync();
}
}
}

1.2. 异步提交offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka 集群
props.put("bootstrap.servers", "hadoop102:9092");
// 消费者组,只要 group.id 相同,就属于同一个消费者组
props.put("group.id", "test");
// 关闭自动提交 offset
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 消费者订阅主题
consumer.subscribe(Arrays.asList("first"));

while (true) {
// 消费者拉取数据
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" + offsets);
}
}
});
}
}
}

1.3. 数据漏消费和重复消费

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

2. 自定义存储offset

Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka 的一个内置的topic 中。除此之外,Kafka 还可以选择自定义存储 offset。offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace(当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance)。

消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其
中提交和获取offset 的方法,需要根据所选的 offset 存储系统自行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class CustomConsumer {

private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

public static void main(String[] args) {

// 创建配置信息
Properties props = new Properties();
// Kafka 集群
props.put("bootstrap.servers", "hadoop102:9092");
// 消费者组,只要 group.id 相同,就属于同一个消费者组
props.put("group.id", "test");
// 关闭自动提交offset
props.put("enable.auto.commit", "false");
// Key 和 Value 的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 消费者订阅主题
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

// 该方法会在 Rebalance 之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}

//该方法会在 Rebalance 之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
// 定位到最近提交的 offset 位置继续消费
consumer.seek(partition, getOffset(partition));
}
}
});

while (true) {
// 消费者拉取数据
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
// 异步提交
commitOffset(currentOffset);
}
}

// 获取某分区的最新 offset
private static long getOffset(TopicPartition partition) {
return 0;
}

// 提交该消费者所有分区的 offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {

}
}

三、kafka 同步、异步发送

  • Kafka 的 Producer 发送消息采用的是异步发送的方式
  • 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator
  • main 线程将消息发送给 RecordAccumulator,消息根据 topic-partition 分类缓存
  • 消息累积到batch.size或者时间达到了linger.ms,sender 线程将该批量的消息发送到 topic-partition 所在的 broker

1. 异步发送

1.1. 不带回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class KafkaProducerTest {
public static void main(String[] args) throws ExecutionException,InterruptedException {
Properties props = new Properties();
//kafka 集 群 ,broker-list
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator 缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 100; i++) {
// 每条数据都要封装成一个ProducerRecord 对象
producer.send(new ProducerRecord<String, String>("test", null, i + "");
}

producer.close();
}
}

1.2. 带回调函数

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class KafkaProducerTest {
public static void main(String[] args) {
// 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator 缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

// 调用send发送1-100消息到指定Topic test
for(int i = 0; i < 100; ++i) {

// 带回调函数异步方式
producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
System.out.println("发送消息出现异常");
}
else {
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
// 发送消息成功,打印Kafka的topic名字、分区id、offset
System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
}
}
});
}

// 关闭生产者
producer.close();
}
}

2. 同步发送

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class KafkaProducerTest {
public static void main(String[] args) {
// 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

// 调用send发送1-100消息到指定Topic test
for(int i = 0; i < 100; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

// 关闭生产者
producer.close();
}
}

四、自定义Interceptor

1. 拦截器原理

Producer 拦截器(interceptor) 是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • configure(configs)
    • 获取配置信息和初始化数据时调用
  • onSend(ProducerRecord)
    • 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception)
    • 该方法会在消息从RecordAccumulator 成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
  • close
    • 关闭 interceptor,主要用于执行一些资源清理工作

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

2. 拦截器案例

需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

实操:

  1. 增加时间戳拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TimeInterceptor implements ProducerInterceptor<String, String> {

@Override
public void configure(Map<String, ?> configs) {

}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个新的 record,把时间戳写入消息体的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString());
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}
}
  1. 统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CounterInterceptor implements ProducerInterceptor<String, String>{

private int errorCounter = 0;
private int successCounter = 0;

@Override
public void configure(Map<String, ?> configs) {

}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}

@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
  1. producer 主程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 构建拦截链
List<String> interceptors = new ArrayList<>();

interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor");

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

String topic = "first";
Producer<String, String> producer = new KafkaProducer<>(props);

// 3 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
producer.send(record);
}
// 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
producer.close();
}
}