Kafka入门介绍+集群部署+简单使用

news/2024/5/6 5:38:15

Kafka入门介绍+集群部署+简单使用

  • 简介
    • 核心概念
      • Broker(服务节点/实例)
      • Producer(生产者)
      • Topic(主题)
      • Partition(分区)
      • Consumer(消费者)和Consumer Group(消费者组)
  • 安装部署
    • 下载安装
    • 集群部署
    • 启动
  • 简单使用
    • 创建Topic主题
    • 发送消息
    • 消费消息

简介

官网:https://kafka.apache.org/
中文文档:https://kafka1x.apachecn.org/intro.html
Kafka是一个开源的分布式流处理平台
主要有三个关键功能

  1. 发布订阅事件流(可以用作消息队列)
  2. 分布式持久化存储事件流(可以用作数据处理系统)
  3. 可以在事件发生时处理或回顾性的处理

整体架构图如下:
在这里插入图片描述

核心概念

Broker(服务节点/实例)

一个Broker 可以看作一个独立的Kafka服务节点。
多个Broker组成一个Kafka集群。

Producer(生产者)

消息的生产者,将数据发送到Topic中。

Topic(主题)

Topic是Kafka实现发布订阅的核心。类比其他MQ,可以把Topic看作 交换机和队列 的组合。
相同类型的消息发到同一个Topic。
生产者将消息发送给 Topic,Topic接收消息并持久化。
Topic 内部持久化存储了所有消息。所以Kafka也常被当做一个存储系统。
在这里插入图片描述

思考一个问题:Kafka为什么持久化存储所有消息?

Kafka作为消息队列,一般要提供给多个消费者消费,即广播。
而传统MQ,在消费者消费完一个消息后,会将消息删除。传统MQ想实现广播,需要复制一份给新的消费者消费。
在这里插入图片描述
这个复制的过程无疑加大了性能开销,这与Kafka高性能处理海量数据的设计理念相违背。

所以Kafka在设计时,在Topic下持久化存储所有消息。将消费选择权交给消费者,由消费者提供offset偏移量 来实 现同一消息不同消费者进行消费,进而实现广播。

Partition(分区)

partition 是 消息实际存储的位置,属于Topic的一部分。
生产者向Topic丢数据,最终会落到Partition中。
消费者消费Topic中的数据,也是消费的Partition中的数据。
在这里插入图片描述
Kafka是为大数据而生,需要经常处理海量数据。单机的存储容量和读写性能肯定不能满足需求。需要对数据进行分片存储,Partition就是Kafka分片的数据子集。
也就是说,Partition是Kafka分布式的核心组件。
并且 Partition 是Kafka高性能,高可用,高并发的关键所在。

Consumer(消费者)和Consumer Group(消费者组)

消费者:即消费消息的。
Kakfa设计了消费者组的概念。
同一个消费者组中的消费者共同消费一个Topic中的消息
同时做了如下限制:
一个分区只可以被消费组中的一个消费者所消费
在这里插入图片描述
消费者组的特性:

  1. 一个消费组中的一个消费者可以消费多个分区
  2. 一个消费组中的不同消费者消费的分区一定不会重复
  3. 一个消费组中的所有消费者共同完成整个Topic中所有Partition的消费

思考一下,消费者组这种设计的好处是什么?

  1. 分摊消费压力,多个消费者并行无冲突的消费一组消息
  2. 消费模式更灵活,不同组合可以实现不同消费
    例如:所有消费者一个组,实现单播
    一个消费者一个组,实现广播
  3. 高可用,提高容错率,多个消费者一个组,有一个消费者挂了,自己的分区会分配给其他消费者。

安装部署

下载安装

https://kafka.apache.org/downloads
在这里插入图片描述
官网下载并传到服务器进行解压安装

tar -zxvf kafka_2.13-3.1.2.tgz

安装完成后,查看目录结构
在这里插入图片描述
config目录
在这里插入图片描述
bin目录在这里插入图片描述
分别使用ZK和Kafka的启动命令,即完成了Kafka单机模式的启动。(Kafka默认端口9092)

集群部署

接下来介绍下集群模式如何部署:
下面使用单机进行伪集群搭建,多台机器搭建方式类似。

# 创建一个集群配置目录
mkdir -p cluster/config
# 将zk 和 Broker的配置文件复制过去,三台Broker搭建集群,所以部署三份
cp config/zookeeper.properties cluster/config/
cp config/server.properties  cluster/config/server-0.properties
cp config/server.properties  cluster/config/server-1.properties
cp config/server.properties  cluster/config/server-2.properties

修改 zookeeper配置
在这里插入图片描述
修改 三个Broker 配置
注意:broker.id 必须保证每个serve不同

如果是单机搭建的伪集群,注意listeners 端口也要不同,
同时还要修改log.dirs 日志目录,保证日志目录也不相同,
因为Broker在启动时会检查日志目录下的meta.properties中的broker.id,相同日志目录会导致冲突

在这里插入图片描述

启动

进入bin目录下启动zk和broker
启动Zookeeper

./zookeeper-server-start.sh ../cluster/config/zookeeper.properties
# 后台启动nohup ./zookeeper-server-start.sh ../cluster/config/zookeeper.properties > /dev/null 2>&1 &

启动broker集群

./kafka-server-start.sh ../cluster/config/server-0.properties
./kafka-server-start.sh ../cluster/config/server-1.properties
./kafka-server-start.sh ../cluster/config/server-2.properties# 后台启动nohup ./kafka-server-start.sh ../cluster/config/server-0.properties > /dev/null 2>&1 &nohup ./kafka-server-start.sh ../cluster/config/server-1.properties > /dev/null 2>&1 &nohup ./kafka-server-start.sh ../cluster/config/server-2.properties > /dev/null 2>&1 &

简单使用

进入bin目录下
bin下面的kafka相关命令,都可以使用 --help 查看帮助文档,介绍的很全面

例如:./kafka-topics.sh --help

下面是我根据帮助文档做的简单使用

创建Topic主题

# 直接使用./kafka-topics.sh命令 会给出help文档
./kafka-topics.sh
# 创建一个名为 topci_test 的主题,Partition为3个,副本为2个
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-test --partitions 3 --replication-factor 2
# 查看topic列表
./kafka-topics.sh  --bootstrap-server localhost:9092 --list
# 查看 topic-test 的详细信息
./kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic topic-test

在这里插入图片描述
ISR:副本同步正常的BrokerId
Replicas:副本所在的BrokerId
Leader:leader所在的BrokerId
Partition:partition的编号

发送消息

./kafka-console-producer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic topic-test

在这里插入图片描述

消费消息

消费者发送的offset在 0.9版本之前 存放在zookeeper,因为zookeeper 不适合大量频繁的读写,0.9版本之后,放在kafka默认的Topic(__consumer_offsets)里保存
低于0.9的老版本可能需指定Zookeeper地址
我这里使用3.1.2,所以不需要指定 zookeeper

# 如果需要从头消费 可以加上--from-beginning 或者 指定 --offset进行消费,默认是消费最新的./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic topic-test

在这里插入图片描述


http://www.mrgr.cn/p/82001871

相关文章

(七)小案例银行家应用程序-申请贷款-some方法和every方法

some方法 ● 我们先回顾一下includes方法 console.log(movements.includes(-130));只要数组中存在-130这个值,就会返回true,否则就会返回flase ● 而some方法只要达成某一个条件就会返回true,否则就返回flase const someMethod movement…

集团公司管控的三种模式

集团管控是集团公司通过对子公司采用层级的管理控制、资源的协调分配等策略和方式,使集团的组织架构和业务流程达到最佳运作效率的管理体系。 不同的集团管控模式决定了不同的财务管控方式。但不论采取何种管控模式,集团对财务的管理与控制都是其最为核心的内容。 在多元化集…

4.23日总结(项目总结)

1.项目: 今日项目通过一个在登录界面的一个静态变量,完成了区分老师和学生,能够分开老师和学生,并且不同身份的人进去会有不同的显示,以及登录链接主界面,还有学生和老师的不同的表,其次就是创…

Go语言并发赋值的安全性

struct并发赋值 type Test struct {X intY int }func main() {var g Testfor i : 0; i < 1000000; i {var wg sync.WaitGroup// 协程 1wg.Add(1)go func() {defer wg.Done()g Test{1, 2}}()// 协程 2wg.Add(1)go func() {defer wg.Done()g Test{3, 4}}()wg.Wait()// 赋值…

AWS Key disabler:AWS IAM用户访问密钥安全保护工具

关于AWS Key disabler AWS Key disabler是一款功能强大的AWS IAM用户访问密钥安全保护工具&#xff0c;该工具可以通过设置一个时间定量来禁用AWS IAM用户访问密钥&#xff0c;以此来降低旧访问密钥所带来的安全风险。 工具运行流程 AWS Key disabler本质上是一个Lambda函数&…

2024-04-23 闲话

2024-04-23 闲话郭军凯脱单之后,朋友圈频率大大提升。从一年一条提升到一个月一条了。前两天他把qq头像给换了(似乎现在微信头像也换了),因为我们三个人的qq群头像是三个人各自头像拼起来的,车昱辉点开qq给他雷击了一下。那天我正复习物理还是高代还是数分还是赶组会ddl忘…

@ComponentScan注解的实现,Spring扫描包的过程

相信接触过spring做开发的小伙伴们一定使用过@ComponentScan注解 @ComponentScan("com.wangm.lifecycle") public class AppConfig {}@ComponentScan指定basePackage,将包下的类按照一定规则注册成Bean。 但是这个注解的其他参数,比如excludeFilters、includeFilte…

C语言指针+-整数、指针-指针、指针关系运算、指针和数组、二级指针、指针数组

文章目录 前言一、指针 - 整数二、指针 - 指针三、指针的关系运算四、指针和数组五、二级指针六、指针数组指针数组可以将几个一维数组模拟成二维数组 总结 前言 C语言指针整数、指针-指针、指针关系运算、指针和数组、二级指针、指针数组等介绍&#xff0c;还包括指针数组将几…

【vue】el-tree的新增/编辑/删除节点

1、概述 关于树形结构的新增同级节点&#xff0c;新增子级节点&#xff0c;修改节点名称&#xff0c;删除节点等四种操作&#xff0c;各种参数配置完全继承el-tree&#xff0c;本篇使用vue2 element-ui 2、效果图展示 3、调用方式 <template><Tree:data"tree…

Java常见输入输出练习

1.AB(1) 计算ab 数据范围&#xff1a; 数据组数 1≤ t ≤100 , 数据大小满足 1≤ n ≤1000 输入描述&#xff1a; 输入包括两个正整数a,b(1 < a, b < 1000),输入数据包括多组。 输出描述&#xff1a; 输出ab的结果 输入例子&#xff1a; 1 5 10 20 输出例子&#xff…

IDEA安装插件【Database navigation】并连接数据库

IDEA安装插件Database navigation 快捷键 Ctrl+alt+s打开搜索插件-安装并重启如果安装过程中下载较慢 解决措施:win+R --> CMD打开执行 ping plugins.jetbrains.com再次打开idea就可以飞速下载了 IDEA使用Database连接数据库--mysql 1. 下载驱动 https://downloads.mysql.c…

Abaqus2024 安装教程(附免费安装包资源)

鼠标右击软件压缩包&#xff0c;选择“解压到Abaqus2024”。 鼠标右击“此电脑”&#xff0c;选择“属性”。 点击“高级系统设置”。 点击“环境变量”。 点击“新建”。 变量名输入&#xff1a;NOLICENSECHECK 变量值输入&#xff1a;true 然后点击“确定”。 点击“确定”。…

利用ollama和open-webui本地部署通义千问Qwen1.5-7B-Chat模型

目录 1 安装ollama 2 安装open-webui 2.1 镜像下载 3 配置ollama的模型转换工具环境 3.1 下载ollama源码 3.2 下载ollama子模块 3.3 创建ollama虚拟环境 3.4 安装依赖 3.5 编译量化工具 7 创建ollama模型 8 运行模型 参考文献&#xff1a; 1 安装ollama curl -fsSL …

力扣HOT100 - 2. 两数相加

解题思路&#xff1a; 缺位的节点进行补零处理&#xff0c;如97323补充为973023 注意相加的进位问题 class Solution {public ListNode addTwoNumbers(ListNode l1, ListNode l2) {ListNode head null, tail null;int carry 0;while (l1 ! null || l2 ! null) {int n1 l…

Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?

大家好,我是Python进阶者。 一、前言 前几天在Python白银交流群【Jethro Shen】问了一个Python谷歌驱动下载的问题。二、实现过程 这里【Kim】和【Crazy】给了一个指导,如上图所示。说来奇怪,在链接中看了没有124 版本的,以前找114、96版本的,每次屡试不爽,现在最新的浏览…

关于开设RT-DETR专栏及更新内容的一些说明

​ 专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;助力高效涨点&#xff01;&#xff01;&#xff01; 专栏介绍 YOLOv9作为最新的YOLO系列模型&#xff0c;对于做目标检测的同学是必不可少的。本专栏将针对2024年最新推出的YOLOv9检测模型&#xff0…

[题解]ARC176 A~B

赛时心态崩了,0pts遗憾离场……今天在学校冷静思考了下。发现B题思路其实很简单,不过A题怎么也没有想到,回来看了题解,其实思路也很简单,不过是自己思考方向错了。看来打比赛心态很重要,如果能冷静下来思考结果会好很多。 果然算法竞赛不能被常理所束缚(笑) A - 01 Mat…

Day08React——第八天

useEffect 概念&#xff1a;useEffect 是一个 React Hook 函数&#xff0c;用于在React组件中创建不是由事件引起而是由渲染本身引起的操作&#xff0c;比如发送AJAx请求&#xff0c;更改daom等等 需求&#xff1a;在组件渲染完毕后&#xff0c;立刻从服务器获取频道列表数据…

软件行业中的蓝海领域有哪些?

一、什么是蓝海&#xff1f; 蓝海&#xff0c;指的是未知的市场空间。这个概念相对于“红海”而言&#xff0c;红海则是指已知的市场空间。 企业要启动和保持获利性增长&#xff0c;就必须超越产业竞争&#xff0c;开创全新市场&#xff0c;这其中包括两块&#xff1a;一块是…

裤裤带你一起学C语言内存函数啦!

目录 1.memcpy的使用和模拟实现 2.memmove使用和模拟实现 3.memset函数的使用 4.memcmp函数的使用 内存函数在<string.h>库中&#xff0c;我们使用内存函数必须先引入<string.h>头文件 1.memcpy的使用和模拟实现 memcpy的函数原型如下&#xff1a; void * m…