Kafka-Python 环境搭建与实战

小七学习网,助您升职加薪,遇问题可联系:客服微信【1601371900】 备注:来自网站

Kafka 是一个分布式、分区的、多副本的、多订阅者的流处理平台。文章将演示如何通过 Python 与 Kafka 交互以及 Kafka 相关的知识体系。 文章内容: Kafka 环境安装 命令行操作…

Kafka 是一个分布式、分区的、多副本的、多订阅者的流处理平台。文章将演示如何通过 Python 与 Kafka 交互以及 Kafka 相关的知识体系。

文章内容:

  1. Kafka 环境安装
  2. 命令行操作 Kafka
  3. Python 操作 Kafka
  • Python Kafka Admin 代码演示
  • Python Kafka Producer 代码演示
  • Python Kafka Consumer 代码演示
  1. Kafka 架构设计以及常见面试题
  • Kafka 架构是如何设计的?
  • 使用 Kafka 的好处?
  • Kafka 如何保证数据不丢失?
  • 简述 Leader 选举机制
  • Kafka 数据分区和消费者的关系?
  • Kafka 重启是否会导致数据丢失?


Kafka 环境安装

学习 Kafka 之前,搭建环境是必须的,我这里利用 docker-compose,1 分钟搭建 Kafka 环境,如果对 docker-compose 不擅长的筒子们,可以参考我的另一篇 文章:《Docker-Compose 入门与实战》。

我这里就默认筒子们都会 docker-compose 了,直接开整!首先咱们来编写 docker-compose.yaml,结构如下:

version: \'3.2\'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - \"2181:2181\"    container_name: \"zookeeper\"    restart: always  kafka:    image: wurstmeister/kafka:2.12-2.3.0    container_name: \"kafka\"    ports:      - \"9092:9092\"    environment:      - TZ=CST-8      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181      # 非必须,设置自动创建 topic      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true      - KAFKA_ADVERTISED_HOST_NAME=<your ip>      - KAFKA_ADVERTISED_PORT=9092      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<your ip>:9092      - KAFKA_LISTENERS=PLAINTEXT://:9092      # 非必须,设置堆内存      - KAFKA_HEAP_OPTS=-Xmx1G -Xms1G      # 非必须,设置保存 1h 内的数据      - KAFKA_LOG_RETENTION_HOURS=1    volumes:      # 将 kafka 的数据文件映射出来,便于服务迁移      - /home/kafka:/kafka      - /var/run/docker.sock:/var/run/docker.sock    restart: always

Kafka 的运行依赖于 ZooKeeper,所以需要运行两个容器,注意上述中的 <your ip> 意思是替换为你本地的 IP。

启动容器:

docker-compose up -d

输出日志:

Creating network \"kafka_default\" with the default driverPulling zookeeper (wurstmeister/zookeeper:latest)...latest: Pulling from wurstmeister/zookeepera3ed95caeb02: Pull completeef38b711a50f: Pull completee057c74597c7: Pull complete666c214f6385: Pull completec3d6a96f1ffc: Pull complete3fe26a83e0ca: Pull complete3d3a7dd3a3b1: Pull completef8cc938abe5f: Pull complete9978b75f7a58: Pull complete4d4dbcc8f8cc: Pull complete8b130a9baa49: Pull complete6b9611650a73: Pull complete5df5aac51927: Pull complete76eea4448d9b: Pull complete8b66990876c6: Pull completef0dd38204b6f: Pull completeDigest: sha256:7a7fd44a72104bfbd24a77844bad5fabc86485b036f988ea927d1780782a6680Status: Downloaded newer image for wurstmeister/zookeeper:latestPulling kafka (wurstmeister/kafka:2.12-2.3.0)...2.12-2.3.0: Pulling from wurstmeister/kafkae7c96db7181b: Pull completef910a506b6cb: Pull completeb6abafe80f63: Pull completeae88d9cb15e8: Pull complete663fffaf4171: Pull complete9a84fa74bbc3: Pull completeDigest: sha256:a1b4deaa037fd110d87676fc315608c556fcef98f8f1f9302e9cf85757f9d1f3Status: Downloaded newer image for wurstmeister/kafka:2.12-2.3.0Creating zookeeper ... Creating kafka ... Creating zookeeperCreating zookeeper ... done

查看启动状态:

docker-compose ps

输出日志:

Name                 Command               State                         Ports                       -------------------------------------------------------------------------------------------------------kafka       start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp                            zookeeper   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

两个容器都启动表示 Kafka 环境搭建成功。

命令行操作 Kafka

环境搭建好之后,下面咱们先来学习一些通过命令行操作 Kafka 的操作,测试环境是否搭建成功的同时,也便于后续开发的时候调试,可谓一举多得。

下面将演示来包括 Topic 的创建、删除、查看、数据推送和订阅五大常用 API。由于咱们 Kafka 是 Docker 容器的方式运行的,所有有个前置条件,就是得进入到 Kafka 容器内部,命令:

#进入容器内部docker exec -it kafka /bin/bash

当然如果不想进入容器内部,也可。如果不进入 Docker 容器内部,则执行命令时将下面的命令通过参数的方式传入到容器内部。

示例:

#查看 topicdocker exec -it kafka bash -c \'kafka-topics.sh --list --bootstrap-server localhost:9092\'

不进入 docker 内部输入命令不会自动 tab 补全,这个小伙们自行取舍,来看下面的示例。

创建 Topic:

kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

删除 Topic:

kafka-topics.sh --delete --topic www --bootstrap-server localhost:9092

查看 topic 列表:

kafka-topics.sh --list --bootstrap-server localhost:9092

查看指定的某个 topic:

kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

消息推送:

kafka-console-producer.sh --topic quickstart-events --broker-list localhost:9092

消息订阅:

kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

Python 操作 Kafka

命令行操作 Kafka 毕竟是用于调试,要集成到业务系统当中还是得用代码去调用,下面将演示通过 Python 操作 Kafka。

首先安装 Python Kafka 依赖包:

pip install kafka-python==2.0.2

kafka-python 是一个开源项目,更多内容小伙伴们可以参考:

  • 源码地址:https://github.com/dpkp/kafka-python
  • 官方文档地址:https://kafka-python.readthedocs.io/en/master/

Python Kafka Admin 代码演示

Python Kafka Admin 顾名思义,是负责管理 Kafka,下面来演示 Kafka Admin 相关的操作。

创建 Topic:

from kafka.admin import KafkaAdminClient, NewTopicimport tracebackimport jsonKAFKA_BOOTSTRAP_SERVERS = \"<your ip>:9092\"def create_topic(topic, partition, replication):    result = {        \"code\": 0,        \'status\': \'success\',        \"msg\": \'\'    }    admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)    topic = NewTopic(        name=topic,        num_partitions=partition,#分片数        replication_factor=replication#副本数    )    try:        future = admin.create_topics([topic])        result[\'msg\'] = future.topic_errors    except Exception:        error_info = traceback.format_exc()        result[\'code\'] = 999        result[\'status\'] = \'fail\'        result[\'msg\'] = error_info    return result

Topic 有两个重要的参数需要理解。

1. num_partitions

一个 Topic 可以配置多个 partition,Producer 发送的消息分发到不同的 partition 中,consumer 接受数据的时候是按照 group 来接收,Kafka 确保每个 partition 只能同一个 group 中的同一个 consumer 消费,如果想要重复消费,则需要其他的组来消费。如图所示:

在这里插入图片描述

2. replication_factor

用来设置 Topic 的副本数。每个主题可以有多个副本,副本位于集群中不同的 broker 上,也就是说副本的数量不能超过 broker 的数量,否则创建主题时会失败。

查看所有的 Topic:

from kafka.admin import KafkaAdminClientKAFKA_BOOTSTRAP_SERVERS = \"<your ip>:9092\"def list_topic():    admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)    list_topic = admin.list_topics()    print(list_topic)

查看 Topic 详情:

from kafka.admin import KafkaAdminClientKAFKA_BOOTSTRAP_SERVERS = \"<your ip>:9092\"def describe_topic(topic):    admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)    future = admin.describe_topics(topics=[topic])

删除 Topic:

from kafka.admin import KafkaAdminClientKAFKA_BOOTSTRAP_SERVERS = \"<your ip>:9092\"def delete_topic(topic):    admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)    admin.delete_topics(topics=[topic])

Python Kafka Producer 代码演示

from kafka import KafkaProducerfrom settings import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TEST_TOPICimport jsonfrom kafka.errors import kafka_errorsimport traceback# 创建生产者\"\"\"bootstrap_servers:指定 Kafka 的地址key_serializer:key 序列化value_serializer:value 序列化\"\"\"producer = KafkaProducer(    bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],    key_serializer=lambda v: json.dumps(v).encode(\"utf-8\"),#指定 lambda 表达式进行序列化    value_serializer=lambda v: json.dumps(v).encode(\"utf-8\")  # 指定 Json 类型)json_message = {\'foo\': \'bar\'}future = producer.send(KAFKA_TEST_TOPIC,                       value=json_message, key=json_message, headers=[(\'content-encoding\', b\'base64\')])#heasers 的作用是传递一些元数据,它的类型是元组列表的形式,例如=[(\'content-encoding\', b\'base64\'),(\'content-encoding\', b\'base64\')]:,并且每个元组的后一个必须为字节类型try:    future.get(timeout=10)  # 监控是否发送成功except kafka_errors:  # 发送失败抛出 kafka_errors    traceback.format_exc()

KafkaProducer 几个参数的作用如下。

1. key_serializer

指定数据的 key 及其序列方式,通过 key 可以将数据路由到不同的 partition 当中,源码如下:

在这里插入图片描述

从源码可以看到,没有指定 key 也是可以的,如果 key 为空,则直接用 value 的 hashcode 和总的 分片数区取余决定发送到哪个 partition。

2. value_serializer

传输值序列化,因为 Kafka 只能发送字节类型数据,在这里进行序列化了之后,在传输值时候,我们就不用关心序列化为字节问题了。

Python Kafka Consumer 代码演示

from kafka import KafkaConsumerfrom settings import KAFKA_TEST_TOPIC, KAFKA_BOOTSTRAP_SERVERSimport json#创建消费者\"\"\"指定 Topicbootstrap_servers:Kafka 地址key_serializer:key 反序列化value_serializer:value 反序列化\"\"\"consumer = KafkaConsumer(    KAFKA_TEST_TOPIC,    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,    group_id=\'mytest\',    key_deserializer=lambda v: json.loads(v, encoding=\"utf-8\"),    value_deserializer=lambda v: json.loads(v, encoding=\"utf-8\")  # 指定 Json 类型)for msg in consumer:    print(msg.headers)    print(msg.key)    print(msg.value)

消费者接收数据需要对数据进行反序列化,和前面的序列化是相对立的。有一个参数需要注意。

1. group_id

如果我们希望能够重复消费一批数据,则需要定义不同的 group_id。如果用多个 group_id 消费同一个 topic,则会将该 topic 的数据分摊到各个 group_id 消费。道理和前面所讲解的类似,如图:

在这里插入图片描述

Kafka 架构设计以及常见面试题

前面已经讲解了命令和 Python 操作 Kafka,看一下围绕 Kafka 架构设计展开的相关面试题。

Kafka 架构是如何设计的?

在这里插入图片描述

一个经典的 Kafka 集群中包含了若干个 Producer,若干个 Consumer,以及一个 ZooKeeper 集群,Kafka 通过 ZooKeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 Rebalance(负载均衡);Producer 使用 Push 模式将消息发布到 Broker,Consumer 使用 Pull 模式从 Broker 中订阅并消费消息。

使用 Kafka 的好处?

  • 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,Kafka 在中间可以起到一个缓冲的作用,把消息暂存在 Kafka 中,下游服务就可以按照自己的节奏进行慢慢处理。
  • 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅 Topic 的服务消费到,供多个毫无关联的业务使用。
  • 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
  • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Kafka 如何保证数据不丢失?

这是一个必问题,要回答好这个问题,需要从三个角度进行阐述:

  • 从生产者角度
  • 从消费者角度
  • 从 Broker 角度

从生产者角度

生产者是数据产生的源头,如果这里产生数据丢失肯定是由于数据没有发送到 Broker,那如何才能保证数据肯定到达了,答案就是设置 ack 确认机制。

代码示例:

from kafka import KafkaProducerfrom settings import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TEST_TOPICimport jsonfrom kafka.errors import kafka_errorsimport traceback# 创建生产者\"\"\"bootstrap_servers:指定 Kafka 的地址key_serializer:key 序列化value_serializer:value 序列化\"\"\"producer = KafkaProducer(    acks=-1,#指定 ack 确认机制,默认为 1,还有 0,-1 三种选项    bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],    key_serializer=lambda v: json.dumps(v).encode(\"utf-8\"),#指定 lambda 表达式进行序列化    value_serializer=lambda v: json.dumps(v).encode(\"utf-8\")  # 指定 Json 类型)

下面来看一下 ack 三者之间的差别:

ack 参数 作用
0 生产者发送消息之后 不需要等待服务端的任何响应,它不管消息有没有发送成功,如果发送过程中遇到了异常,导致 broker 端没有收到消息,消息也就丢失了。
1(默认值) 生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。
-1(或 all) 生产者在发送消息之后,需要等待 ISR 中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应。

从消费者角度

通过 offset commit 来保证数据的不丢失,Kafka 自己记录了每次消费的 offset 数值,下次继续消费的时候,会接着上次的 offset 进行消费。即使消费者在运行过程中挂掉了,再次启动的时候会找到 offset 的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。

代码演示:

from kafka import KafkaConsumerfrom settings import KAFKA_TEST_TOPIC, KAFKA_BOOTSTRAP_SERVERSimport jsonconsumer = KafkaConsumer(    KAFKA_TEST_TOPIC,    enable_auto_commit=False,#设置为手动提交 offset    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,    group_id=\'mytest\',    key_deserializer=lambda v: v.decode(\"utf-8\"),    value_deserializer=lambda v: json.loads(v, encoding=\"utf-8\")  # 指定 Json 类型)print(\"Kafka Consumer 启动完毕!\")while True:    msg = consumer.poll()    if msg:        print(msg)        consumer.commit()#消息消费后提交 offset commit,如果不提交 offset commit,下一次会消费同样的内容

上述的操作也回答了如果消费者想重复消费数据的该如何解决。

从 Broker 角度

每个 Broker 中的 partition 我们一般都会设置有 replication(副本)的个数,生产者写入的时候首先根据分发策略写入到 leader 中,follower(副本)再跟 leader 同步数据,这样有了备份,也可以保证消息数据的不丢失。

简述 Leader 选举机制

Leader 在 ZooKeeper 上创建一个临时节点,所有 Follower 对此节点注册监听,当 Leader 宕机时,此时 ISR 里的所有 Follower 都尝试创建该节点,而创建成功者(ZooKeeper 保证只有一个能创建成功)即是新的 Leader,其它 Replica 即为 Follower。

Kafka 数据分区和消费者的关系?

每个分区(Partition)只能由同一个消费组内的一个消费者(Consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。如图:

在这里插入图片描述

Kafka 重启是否会导致数据丢失?

  • Kafka 是将数据写到磁盘的,一般数据不会丢失。
  • 但是在重启 Kafka 过程中,如果有消费者消费消息,那么 Kafka 如果来不及提交 offset,可能会造成数据的不准确(丢失或者重复消费)。

小七学习网,助您升职加薪,遇问题可联系:客服微信【1601371900】 备注:来自网站

免责声明: 1、本站信息来自网络,版权争议与本站无关 2、本站所有主题由该帖子作者发表,该帖子作者与本站享有帖子相关版权 3、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和本站的同意 4、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责 5、用户所发布的一切软件的解密分析文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。 6、您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。 7、请支持正版软件、得到更好的正版服务。 8、如有侵权请立即告知本站(邮箱:1099252741@qq.com,备用微信:1099252741),本站将及时予与删除 9、本站所发布的一切破解补丁、注册机和注册信息及软件的解密分析文章和视频仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。如有侵权请邮件与我们联系处理。