置顶URL: https://blog.csdn.net/weixin_46244703?type=blog

消息队列

什么是消息队列

消息队列MQ(Message Queue)用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存

消息队列的作用

消息队列可以实现两个模块之间的异步通信,并降低模块之间的耦合性,并可以限流削峰

由于消息队列是连接两个模块的桥梁,至关重要,因此消息队列都是采用分布式架构来保证数据的安全性和可靠性

常见的消息队列:

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ、Pulsar

发布和订阅

介绍

  • 1、发布者又称生成者,多个生成者都可以向消息队列生成数据,为了区分不同的消息,每个消息都有主题
  • 2、订阅者又称消费者,每个消费者可以消费或者订阅多个主题的消息
  • 3、消费者消费完消息之后,队列中的消息并不会立刻消失

Kafka

什么是Kafka

Kafka是基于分布式的流式处理框架,主要用于实时分析

Kafka是基于发布订阅模式的消息队列,有生产者 消费者 主题 队列等

Kafka的核心组件

  • Broker

    1
    2
    3
    1、搭建Kafka集群的每台主机被称为Broker
    2、Kafka是主从架构,分为主Broker和从Broker,主Broker是由ZK选举得到,被称为Controller
    3、如果主Broker挂掉,则重新选举,主Broker负责管理和对外读写数据,从Broker只负责对外读写数据
  • Producer:生产者

    1
    负责向Kafka生成消息
  • Consumer:消费者

    1
    负责从Kafka消费消息
  • 消费者组

    1
    2
    3
    4
    5
    1、任何一个消费者必须属于某一个消费者组
    2、同一个组的消费者只能消费同一个Topic中不同分区的数据,也就是同一个组的消费者不能消费相同的数据,不能出现抢食现象(每人各自一盘菜)
    3、同一个消费者组 ,一个消费者可以消费多个分区的数据,尽量让分区数等于消费者数,这样效率最高
    3、消费者组中多个消费者可以实现并发消费,也就是同时消费,提高数据消费的速度和效率
    4、整个消费者组中所有消费者消费的数据加在一起是一份完整的数据
  • Toic

    1
    2
    1、为了对数据进行分类,不同类型的数据被称为不同的Topic(主题)
    2、Kafka中的Topic类似HDFS中的一个文件
  • Partition

    1
    2
    3
    4
    5
    6
    7
    1、每个Topic可以被分成多个分区
    2、Kafka中的分区类似HDFS中的一个Block
    3、每个分区都有多个副本
    4、Kafka的分区副本有主副本(Leader副本)和从副本(Follower副本),主副本负责对外读写数据,从副本负责和主副本保持数据同步

    #主副本和从副本由Controller的Broker主机来决定
    #Controller的Broker主机由ZK来决定

pCqdKcn.png

  • Segment

    1
    2
    3
    4
    5
    6
    1、为了更新的管理数据,Kafka每个分区的数据被切分成一个个Segment文件,一般一个Segment文件最大是一个G
    2、为了更方便查询每一个Segment文件内容,Kafka会生成对应的索引文件
    3、每个Segment对应两种【三个】文件
    xxxxxxxxx.log: 存储数据
    xxxxxxxxx.index 根据数据查找索引
    xxxxxxxx.timeindex:根据时间查找索引
  • Offset

    1
    2
    3
    4
    1、Offset是每条数据在自己分区中的偏移量,也可以理解为每条数据的编号,从0开始
    2、Offset是每个分区独立关系,比如每个分区都有offset为0的数据,分区之间的offset彼此独立
    3、比如一个分区的数据上次被消费到offset=2的位置,下一次从offset+1=3的位置开始消费
    4、如果没有offset,就有可能出现重复消费或遗漏消费的情况

    pCqdJNF.png

  • 小结

    pCqdYh4.png

Kafka的环境搭建

搭建过程

  • 下载解压安装

    • 下载:http://archive.apache.org/dist/kafka/

    • 上传到第一台机器

      1
      2
      cd /export/software/
      rz
    • 解压

      1
      2
      3
      4
      5
      # 解压安装
      tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
      cd /export/server/kafka_2.12-2.4.1/
      # Kafka数据存储的位置
      mkdir datas
  • 修改配置

    • 切换到配置文件目录

      1
      cd /export/server/kafka_2.12-2.4.1/config
    • 修改server.properties:

      vim server.properties

      1
      2
      3
      4
      5
      6
      7
      8
      9
      #21行:唯一的 服务端id
      broker.id=1
      #60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
      log.dirs=/export/server/kafka_2.12-2.4.1/datas
      #123行:指定zookeeper的地址
      zookeeper.connect=node1:2181,node2:2181,node3:2181
      #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
      delete.topic.enable=true
      host.name=node1
    • 分发

      1
      2
      3
      cd /export/server/
      scp -r kafka_2.12-2.4.1 node2:$PWD
      scp -r kafka_2.12-2.4.1 node3:$PWD
    • 第二台:vim /export/server/kafka_2.12-2.4.1/config/server.properties

      1
      2
      3
      4
      #21行:唯一的 服务端id
      broker.id=2
      #最后
      host.name=node2
    • 第三台:vim /export/server/kafka_2.12-2.4.1/config/server.properties

      1
      2
      3
      4
      #21行:唯一的 服务端id
      broker.id=3
      #最后
      host.name=node3
  • 每台机器添加环境变量

    1
    vim /etc/profile
    1
    2
    3
    #KAFKA_HOME
    export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
    export PATH=:$PATH:$KAFKA_HOME/bin
    1
    source /etc/profile

启动Kafka

  • 启动Zookeeper
1
2
#在node1,node2,node3分别执行以下命令
/export/server/zookeeper/bin/zkServer.sh start
  • 启动Kafka
1
2
3
#在node1,node2,node3分别执行以下命令
cd /export/server/kafka_2.12-2.4.1/
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &

验证Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#1、创建主题:指定分区数和副本数
kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

"""
--topic bigdata01 主题的名字
--partitions 3 分区的个数
--replication-factor 2 副本个数
--bootstrap-server 服务器地址
"""
#创建主题:不指定分区数和副本数,默认是1个分区,1个副本
kafka-topics.sh --create --topic test1 --bootstrap-server node1:9092,node2:9092,node3:9092


#2、查看所有主题
kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092


#3、查看某一个主题的详情
kafka-topics.sh --describe --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092

kafka-topics.sh --describe --topic test1 --bootstrap-server node1:9092,node2:9092,node3:9092


#4、删除topic
kafka-topics.sh --delete --topic test1 --bootstrap-server node1:9092,node2:9092,node3:9092


#5、模拟生成者和消费者

#生成者
kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092

#消费者
kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning

pCqdN9J.png