Retrospect

Kafka常用命令

  • 创建主题:指定分区数和副本数

    • topic bigdata01 主题的名字
    • partitions 3 分区的个数
    • replication-factor 2 副本个数
    • bootstrap-server kafka内部服务器通信地址,端口默认是9092
  • 创建主题:不指定分区数和副本数,默认是1个分区,1个副本

  • 查看所有主题

  • 查看某一个主题的详情(关键字describe)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 创建主题:指定分区数和副本数
kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

# 创建主题:不指定分区数和副本数,默认是1个分区,1个副本
kafka-topics.sh --create --topic test1 --bootstrap-server node1:9092,node2:9092,node3:9092

# 查看所有主题
kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092

# 查看某一个主题的详情
kafka-topics.sh --describe --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092
kafka-topics.sh --describe --topic test1 --bootstrap-server node1:9092,node2:9092,node3:9092

# 删除topic
kafka-topics.sh --delete --topic test1 --bootstrap-server node1:9092,node2:9092,node3:9092

#5、模拟生成者和消费者
#生成者
kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092

#消费者
kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning

查看Topic信息

1
kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node1:9092,node2:9092,node3:9092
  • 结果

    1
    2
    3
    4
    5
    6
    # Topic名称              分区个数					副本个数
    Topic: bigdata01 PartitionCount: 3 ReplicationFactor: 2
    # 分区:Topic名称 + 分区编号
    Topic: bigdata01 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
    Topic: bigdata01 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
    Topic: bigdata01 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
  • Topic:Topic名称

  • Partition:分区的编号,从0开始

  • Replicas:当前这个分区所有的副本所在的Broker的id

  • Isr:当前这个分区所有可用的副本所在的Broker的id

  • Leader:当前这个分区的Leader副本所在的Broker的id

Kafka的Python生产者消费者API

环境准备(安装Kafka的API包)

1
2
3
4
5
6
#1、在node1,node2,node3分别安装python和kakfa连接包
pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

#注意:如果安装完kafka-python报.sync问题就把之前的卸载,然后重新安装
pip uninstall kafka-python
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from kafka import KafkaProducer

#1、创建一个生产者对象
"""
bootstrap_servers:kafka服务器地址列表
acks:应答机制
0:生产者生产一条数据写入Kafka,不用等待Kafka的回复,直接生产下一条【快,数据容易丢失】
1:生产者生产一条数据写入Kafka,等待Kafka确保这个分区的Leader已经写入,就返回一个ack,生产者收到ack就发送下一条【性能和安全做了权衡】
all/-1:生产者生产一条数据写入Kafka,等待Kafka确保这个分区的所有副本都已经写入成功,再返回ack,生产者收到ack就发送下一条【安全,性能最差】
retries:重试机制,如果生产者长时间没有收到ack,就认为数据丢失,会重新发送一份数据写入,直到收到ack,默认是0
retries=3表示最多重试3次

"""
producer = KafkaProducer(bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'], acks=1,retries=3)


for i in range(0, 10):
# 2、生产消息
"""
topic:生产到哪个主题
key: 生产消息的key,也就是键,可以不要
value: 生产消息的value,是消息最核心的数据,必须要
"""
rs = producer.send(topic="bigdata01", key=f"{i}".encode("UTF-8"), value=f"itcast{i}".encode("UTF-8"))

# 3、获取返回值
"""
timeout:超时时间,如果超过这个时间就认为没有接受到返回信息
"""
metadata = rs.get(timeout=10)

#4、打印返回返回信息详情
print(f"数据:itcast{i} topic:{metadata.topic} partition:{metadata.partition} offset:{metadata.offset}")

#5、强制刷新
"""
生产者不是直接将数据写入Kafka,而是先放到本地一个缓存中,当缓存达到一定大小或者超过一定的时间才会真正的写入Kafka
flush:强制将本地缓存的数据提交到Kafka
"""
producer.flush()

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from kafka import KafkaConsumer

#1、创建消费者对象
consumer = KafkaConsumer(
"bigdata01", #要消费的主题
group_id="cg1", #消费者组
bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'], #kafka服务器地址列表
auto_offset_reset='earliest', #用于指定第一次消费从什么位置消费, earliest-最开始位置,latest-最新位置
enable_auto_commit=True #表示消费之后,自动提交offset
)

#2、开始消费消息,获取的消息会被存储在message对象中
for message in consumer:
# 从每条数据中获取所有信息
topic = message.topic #获取消费的主题
partition = message.partition #获取消费的分区
offset = message.offset #获取消费消息的offset
key = message.key #获取消费消息的key
value = message.value.decode("UTF-8") #获取消费消息的value

# 打印每条数据的内容
print(f"Topic={topic} Partition={partition} Offset={offset} Key={key} Value={value}")

Kafka的消费原理(API解析)

消费者的auto_offset_reset参数

  • auto_offset_reset该参数只对第一次消费有意义,第一次消费指的是消费组id在kafka中没有记录
  • auto_offset_reset有两个值, earliest是从头消费, latest是从最新位置开始消费
  • 如果你是第二次消费则永远都是从最新位置开始消费,不会再从头消费
1
2
3
4
5
6
1、auto_offset_reset有两个值:
auto.offset.reset = latest | earliest
#earliest 从头消费
#latest 从最新位置开始消费
2、auto_offset_reset该参数只对第一次消费有意义,第一次消费指的是消费组id在kafka中没有记录
3、如果你是第二次消费则永远都是从最新位置开始消费,不会再从头消费,比如上一次消费到offset=5的位置,下一次一定会从offset+1的位置开始消费

Offset数据保存位置

1
2
3
4
1、在Kafka中,系统会自动的创建一个Topic,被命名为:__consumer_offsets,该主题用来报错所有分区的offset信息
2、提交方式:
enable_auto_commit=True #消费者自动提交(有安全隐患)
enable_auto_commit=False #消费者手动提交(无安全隐患)

自动提交offset

1
2
3
4
5
6
7
0、自动提交offset的方式
enable_auto_commit=True
1、Kafka的自动提交有安全隐患
2、安全隐患解析
1)会造成数据的丢失:我虽然开始消费了,但是没有完全消费成功,结果offset被自动提交了,下次就会跳过本条数据
2)会造成重复消费:我虽然消费成功了,但是offset自动提交失败了,结果下次又从之前的位置开始消费
3、解决问题的方案:手动来提交offset

手动提交Offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#1、关闭kafka自动提交
enable_auto_commit=False

#2、手动提交Commit Offset
consumer.commit()


#应用场景
def func1(value):
print('spark已经进行了分析')
return 1

def func2(value):
print('spark已经进行了结果保存')
return 1

rs1 = func1(value)
rs2 = func2(value)
if rs1 == 1 and rs2 == 1:
#手动提交offset,要保证所有的处理都成功,则提交
consumer.commit()

Kafka的负载均衡(※)

  • 负载均衡分为生产者, 消费者两方的负载均衡
  • 负载均衡这个概念出现在NameNode管理DataNode中(指的是NameNode为了保证DataNode存储的信息大致相同, 会在分配存储任务的时候, 会优先存储在余量比较大的DataNode上.)

生产者负载均衡

三种策略

  • 随机分区(不会出现数据倾斜)
  • Hash分区(可能会导致数据倾斜)
  • 指定分区
1
2
3
4
5
6
7
8
9
#2.1 随机分区-----写数据时,只指定了value,没有指定key
rs=producer.send(topic="bigdata01", value=f"itcast888-{i}".encode("UTF-8"))

#2.2 Hash分区-----写数据时,既指定了value,又有指定key: key.hash值 % 分区数\
#如果key出现大量的重复,则会造成大量的消息跑到了同一个分区,造成数据倾斜
rs = producer.send(topic="bigdata01", key=f"itcast999-x".encode("UTF-8"), value=f"itcast0000-{i}".encode("UTF-8"))

#2.3 指定分区-----写数据时,手动指定分区编号
rs = producer.send(topic="bigdata01", key=f"itcast999-{i}".encode("UTF-8"), value=f"itcastxxxy-{i}".encode("UTF-8"),partition=2)

消费者负载均衡

两个原则, 三种策略

两个原则

  • 一个分区的数据只能由这个消费者组中的某一个消费者消费
  • 一个消费者可以消费多个分区的数据

三种分配策略

  • 范围分配
    • 原则: 分区按照范围划分给不同的消费者,一般是按照范围平均划分,如果不够平均,优先分配给编号小的消费者
    • 缺点:
      • (编号小的消费者处理的数据更多)如果topic过多,对于编号小的消费者,可以能需要承担更多的数据量
      • (某个消费者挂掉,需要重新洗牌)如果某一个消费者挂掉,则所有的消费者和分区全部断开,推倒重来,重新进行按照范围划分
  • 轮询分配
    • 原则: 该分配策略是对所有Topic的分区编号进行排序,然后开始轮询(你一个,我一个)
    • 优点: 该方案基本上可以显示每个消费者均衡消费
    • 缺点: 如果某一个挂掉,则推导重新轮询,缺点就是由于是重新推导,效率相对低一点
  • 粘性策略
    • 原理: 该策略底层和轮询类似
    • 优点: 有点和轮询不同,如果某一个消费者挂掉,则原来正在消费的消费者继续消费,多出来的分区继续轮询,效率相对高一点

两个原理介绍:

1
2
3
4
5
6
7
8
9
10
11
1、有多个分区,有多个消费者组,有多个消费者,这些分区的数据如何合理的给消费者消费
2、有多盘菜,有多个家庭,有多个家庭成员,这些菜该如何分配给这些家庭成员

3、如果某个消费者正在消费时挂掉,多出来的分区如何重新分给其他的消费者

#前置原则:
= 一个分区的数据只能由这个消费者组中的某一个消费者消费
一盘菜,一个家庭中只能有一个人吃,如果有多个家庭,每个家庭都可以派一个人来吃一盘菜

=一个消费者可以消费多个分区的数据
一个家庭中一个人可以同时吃多盘菜

Kafka读写流程及数据清理

Kafka写入流程

1
2
3
4
5
1、生成者生产消息时,先去Zookeeper获取对应分区Leader所在的主机位置
2、获取Leader所在位置之后,将消息发送给leader副本所在主机
3、leader主机将消息先写入缓存,然后在写入Segment中的.log文件
4、leader主机写入成功之后,给生产者发送一个ack应答
5、每个follower副本主机从Leader主机拉取数据写入自己的Segment文件

pCLm5Os.png

Kafka读取流程

1
2
3
4
1、消费者从Zookeeper获取所在分区Leader主机位置,从__consumer_offsets主题当前分区的offset信息
2、消费者从Leader主机拉取数据
3、leader主机从Segment日志文件读取数据给消费者
4、消费者提交offset到__consumer_offsets主题

pCLm4yj.png

Kafka的清理设置

  • 实施

    • 属性配置

      1
      2
      3
      4
      5
      #开启清理
      log.cleaner.enable = true

      #清理规则
      log.cleanup.policy = delete
    • 基于存活时间规则:最常用的方式,单位越小,优先级越高

      1
      2
      3
      4
      5
      6
      7
      8
      # 清理周期
      log.retention.ms=1000 #每隔1000毫秒就删除kafka数据
      log.retention.minutes=10 #每隔10分钟就删除kafka数据
      log.retention.hours=168 #每隔168小时就删除kafka数据

      # 检查周期,要搭配清理周期来修改
      log.retention.check.interval.ms=300000 #每隔5分钟就检测删除时候时间是否到来
      # Segment文件最后修改时间如果满足条件将被删除
    • 基于文件大小规则

      1
      2
      #删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不使用这种规则
      log.retention.bytes = -1
  • 小结:理解Kafka中数据清理的规则