1. 使用 SSL 设置 Kafka 实例


  Kafka 可以通过 SSL 加密与消息消费者和生产者的连接。可以在不同的地方找到有关如何设置的说明。比如 Confluence 平台文档(Confluence 平台可以理解为围绕 Kafka 的复杂包装器/生态系统)或 Apache Kafka 文档。这些说明基于keytooljava 实用程序来生成和签署 SSL 证书。如果我们想在 Python 中使用这些证书,我们必须提取凭证。
  以下 Python 包允许连接到 Kafka:

  但是,如果想从 Python 建立 SSL 连接,首先必须确保在 JKS 容器中拥有所需的证书和私钥,如上面的说明中所述。有两种方法可以从 Python 访问证书:

  • 将 JKS 容器中的证书和密钥导出为 PEM 格式以在内部使用它们kafka-python
  • 使用例如pyjks包直接在 Python 中导入证书和密钥

  下面使用的就是第一种方法。

2. 提取密钥

  配置 Apache Kafka 实例后,将有两个 JKS 容器:kafka.client.keystore.jkskafka.client.truststore.jks。第一个包含已签名的客户端证书、其私钥和用于对其进行签名的 “CARoot” 证书。第二个包含用于签署客户端证书和密钥的证书。因此,我们需要的一切都包含在kafka.client.keystore.jks文件中。要了解其内容的概述,可以调用:

1
keytool -list -rfc -keystore kafka.client.keystore.jks

2.1 提取客户端证书

  首先,我们将提取客户端证书:

1
keytool -exportcert -alias caroot -keystore kafka.client.keystore.jks -rfc -file certificate.pem

  需要注意的是,上面命令的参数别名-alias可以通过下面的命令来查看:

1
keytool -list -rfc -keystore client.keystore.jks

2.2 提取客户端密钥

  接下来我们将提取客户端密钥。但是keytool不直接支持这一点,所以我们必须先将密钥库转换为pkcs12格式,然后从中提取私钥:

1
keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias caroot -destkeystore cert_and_key.p12 -deststoretype PKCS12

  生成 p12 文件后,使用下面的命令将密钥打印到 STDOUT,从那里可以将其复制并粘贴到key.pem中(确保复制到 --BEGIN PRIVATE KEY-- 和 --END PRIVATE KEY-- 之间的行)。

1
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes

  但是,我在执行上面的命令后,怎么都找不到打印的信息,最后没办法,在https://www.openssl.net.cn/docs/249.html中查阅了下 OpenSSL 的命令,将结果打印在终端,从终端复制到key.pem文件中:

1
openssl pkcs12 -in cert_and_key.p12 -nodes

  然而事实上,在最后的使用中,我根本没用到key.pem文件,直接添加了对应的 password 即可。。。。

2.3 提取 CARoot 证书

  最后我们将提取 CARoot 证书:

1
keytool -exportcert -alias CARoot -keystore kafka.client.keystore.jks -rfc -file CARoot.pem

3. kafka-python创建连接

  现在我们有了三个文件certificate.pemkey.pemCARoot.pemkafka-python它们可以作为消费者和生产者的构造函数的参数传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from kafka import KafkaConsumer, KafkaProducer

consumer = KafkaConsumer(bootstrap_servers='my.server.com',
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')

producer = KafkaProducer(bootstrap_servers='my.server.com',
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')

# Write hello world to test topic
producer.send("test", bytes("Hello World"))
producer.flush()

# Read and print all messages from test topic
consumer.assign([TopicPartition(TOPIC, 0)])
consumer.seek_to_beginning(TopicPartition(TOPIC, 0))
for msg in consumer:
print(msg)

4. pykafka创建连接

  以类似的方式,还可以将这些文件作为参数传递给pykafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pykafka import KafkaClient, SslConfig

config = SslConfig(cafile='CARoot.pem',
certfile='certificate.pem',
keyfile='key.pem')

client = KafkaClient(hosts='my.server.com',
ssl_config=config)

topic = client.topics["test"]

# Write hello world to test topic
with topic.get_sync_producer() as producer:
producer.produce('Hello World')

# Print all messages from test topic
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
print('{} {}'.format(message.offset, message.value))

  现在我们可以建立 SSL 连接并使用kafka-python or pykafka包编写自己的消费者和生产者了。

  部分内容源自于:http://maximilianchrist.com/python/databases/2016/08/13/connect-to-apache-kafka-from-python-using-ssl.html