首页 >> 大全

进击消息中间件系列(二):Kafka 单机与集群部署实践

2023-08-28 大全 39 作者:考证青年

点击下方名片,设为星标!

回复“1024”获取2TB学习资源!

前面介绍了 Kafka 的基础概念与架构相关的知识点,今天我将详细的为大家介绍 Kafka 单机与集群部署相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!

系统 官方网站下载安装包

因为Kafka的运行依赖于 ,因此,还需要下并安装,当然Kafka也内置了服务,因此,也可以不额外安装,直接使用内置的服务。

这里为简单起见,直接使用Kafka内置的服务。

启动

在操作系统中找到解压的.12-2.4.0

进入CMD命令行窗口

输入.\bin\\--start.bat .\\. 然后回车

出现下图表示启动成功

接着进入CMD命令行窗口

输入 .\bin\\kafka--start.bat .\\.然后回车

出现下图表示Kafka启动成功

更多关于消息中间件 Kafka 系列的学习文章,请参阅:消息中间件 Kafka,本系列持续更新中。

Linux 系统

下载地址:

解压安装包

下载后的安装包存放在/opt//中,再将安装包解压到/opt//中。(根据个人习惯选择目录存放)。

[root@hadoop102 software]# tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

解压完进入kafka目录,结构如下:

查看bin目录结构,存放了所有脚本,每个模块都有一个脚本,例如:、、对应的脚本 kafka--.sh、kafka--.sh、kafka-.sh

修改配置文件

查看目录结构:

修改.配置文件:

[hadoop102 config]# vim server.properties

主要修改以下圈出部分的内容:

主要配置项说明

broker.id=0            #broker的id号,集群模式下id号要配置成唯一的
listeners=PLAINTEXT://192.168.81.210:9092   #kafka监听地址
num.network.threads=3             #处理网络请求的线程数量,线程会将接收到的消息放到内存中然后再写入磁盘
num.io.threads=8          #消息从内存写入磁盘时使用的线程数量,主要是用来处理磁盘IO的线程数量
socket.send.buffer.bytes=102400        #发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400       #接收套接字的缓冲区大小
socket.request.max.bytes=104857600       #请求套接字的缓冲区大小
log.dirs=/data/kafka/data/kafka-logs      #消息数据存储路径
num.partitions=1         #topic在当前broker上的分区个数
num.recovery.threads.per.data.dir=1       #用来设置恢复和清理data下数据的线程数量,segment文件默认会被保留7天
offsets.topic.replication.factor=1       #
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168         #segment数据文件保留的期限,单位小时,也就是7天
log.segment.bytes=1073741824       #日志文件中每个segment的大小,默认为1G
log.retention.check.interval.ms=300000      #定期检查segment文件的大小是否超出闲置,单位是毫秒
zookeeper.connect=                         #zookeeper地址
zookeeper.connection.timeout.ms=6000      #zookeeper链接超时时间
group.initial.rebalance.delay.ms=0       #延迟初始消费者重新平衡的时间

更多关于消息中间件 Kafka 系列的学习文章,请参阅:消息中间件 Kafka,本系列持续更新中。

编写集群分发脚本 ① scp( copy)安全拷贝

1.scp 定义:scp 可以实现服务器与服务器之间的数据拷贝。(from to )

2.基本语法:

scp  -r   $pdir/$fname         $user@$host:$pdir/$fname
命令 递归  要拷贝的文件路径/名称   目的地用户@主机:目的地路径/名称

3.案例实操

前提:在 、、 都已经创建好的/opt/、 /opt/ 两个目录。

#在 hadoop102 上,将 hadoop102 中/opt/module/kafka_2.12-3.0.0 目录拷贝到hadoop103上。
[root@hadoop102 ~]# scp -r /opt/module/kafka_2.12-3.0.0/  root@hadoop103:/opt/module/

#也可以在 hadoop104 上,将 hadoop102 中/opt/module/kafka_2.12-3.0.0 目录拷贝到hadoop104上。
[root@hadoop104 ~]# scp -r  root@hadoop102:/opt/module/kafka_2.12-3.0.0   ./

#还可以在 hadoop103 上操作,将 hadoop102 中/opt/modulekafka_2.12-3.0.0 目录拷贝到hadoop104上。
[root@hadoop103 ~]# scp -r  root@hadoop102:/opt/module/kafka_2.12-3.0.0   root@hadoop104:/opt/module

② rsync 远程同步工具

rsync 主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点。rsync 和 scp 区别:用 rsync 做文件的复制要比 scp 的速度快,rsync 只对差异文件做更新。scp 是把所有文件都复制过去。

1.基本语法

rsync  -av      $pdir/$fname         $user@$host:$pdir/$fname
命令  选项参数  要拷贝的文件路径/名称    目的地用户@主机:目的地路径/名称
-a 归档拷贝
-v 显示复制过程

2.案例实操

#删除 hadoop103 中/opt/module/kafka_2.12-3.0.0 
[root@hadoop103 module]# rm -rf kafka_2.12-3.0.0/#同步 hadoop102 中的/opt/module/kafka_2.12-3.0.0  到 hadoop103
[root@hadoop102 module]# rsync -av /opt/module/kafka_2.12-3.0.0/ root@hadoop103:/opt/module/kafka_2.12-3.0.0/

③ xsync 集群分发脚本

由于每次同步时都要输入很多命令,还要输入密码,所以可以通过 xsync 集群分发脚本,一步到位。下面演示xsync集群分发脚本的编写与使用

#rsync 命令原始拷贝:之前hadoop102 同步到hadoop103、hadoop104需要输入以下命令
[root@hadoop102 module]# rsync -av /opt/module/kafka_2.12-3.0.0/ root@hadoop103:/opt/module/kafka_2.12-3.0.0/
[root@hadoop102 module]# rsync -av /opt/module/kafka_2.12-3.0.0/ root@hadoop104:/opt/module/kafka_2.12-3.0.0/#现在期望只输入以下命令就能同步到所有服务器上
[root@hadoop102 module]# xsync kafka_2.12-3.0.0/#并且期望脚本xsync kafka_2.12-3.0.0/在任何路径都能使用(所以脚本需要放在声明了全局环境变量的路径下)1.首先查看哪些目录是全局的
[root@hadoop102 module]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin2.如果不想在提供的目录下存放脚本, 可以添加一个自己的目录到全局环境中(前提是要创建好/home/hadoop/bin目录)
#在/etc/profile.d/my_env.sh 文件中增加 export PATH="$PATH:/home/hadoop/bin" 环境变量配置
[root@hadoop102 ~]# vim /etc/profile.d/my_env.sh
export PATH="$PATH:/home/hadoop/bin"
#配置完后,刷新一下环境变量
[root@hadoop102 ~]# source /etc/profile3.再次查看时,多了一个全局目录/home/hadoop/bin
[root@hadoop102 ~]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/home/hadoop/bin

更多关于消息中间件 Kafka 系列的学习文章,请参阅:消息中间件 Kafka,本系列持续更新中。

编写脚本

#在/home/hadoop/bin 目录下创建 xsync 文件,在该文件中编写如下代码
[root@hadoop102 bin]$ vim xsync#!/bin/bash
#$#表示参数个数,判断参数个数是否小于1
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi
#遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do#打印输入的主机名echo ==================== $host ====================#遍历所有目录,逐个发送# 例如执行命令:xsync kafka zookeeper,表示先同步kafka文件,再同步zookeeper文件for file in $@do#判断文件是否存在if [ -e $file ]then#获取父目录# 先执行cd -P /opt/module,再执行pwd,可以获取当前的父目录,-P表示软链接(P大写)pdir=$(cd -P $(dirname $file); pwd)#获取当前文件的名称# basename /opt/module/kafka, 则当前文件名为kafkafname=$(basename $file)# ssh hadoop103可以访问hadoop103主机,并且创建目录mkdir -p /opt/module/kafka,-p表示不管目录存不存在都会创建ssh $host "mkdir -p $pdir"# rsync -av /opt/module/kafka/ root@hadoop103:/opt/module/kafka/rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done

#修改脚本 xsync 具有执行权限
[root@hadoop102 bin]# chmod 777 xsync

#执行脚本
[root@hadoop102 ~]# xsync /opt/module/kafka_2.12-3.0.0/
执行完后,hadoop102、hadoop103、hadoop104上都会有kafka_2.12-3.0.0文件

SSH 无密登录配置

由于每向一个服务器进行同步时,都需要输入密码。为了不用每次输密码,所以可以进行SSH无密登录配置。

① 配置 ssh

基本语法

#ssh 另一台电脑的 IP 地址,例如:hadoop102登录到hadoop103
[root@hadoop102 ~]# ssh hadoop103#hadoop103 退回到 hadoop102
[atguigu@hadoop103 ~]$ exit

② 无密钥配置

首先在A服务器生成密钥对,只将公钥拷贝给B服务器,B服务器会将公钥放在授权key的文件中

#进入.ssh目录,该目录下有个known_hosts文件,再查看known_hosts文件
[root@hadoop102 ~]# cd .ssh/
[root@hadoop102 .ssh]# ll
total 4
-rw-r--r--. 1 root root 558 Nov 28 00:11 known_hosts
[root@hadoop102 .ssh]# cat known_hosts
hadoop103,192.168.10.103 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBIW8j1y/nhu5Q4K+3VGjTnChzq9cbIjzozabQQcjUCU9PdVFBIdD8PleMmDBEK6NCHzF7EW1m6n6iA1S4ihC3GM=
hadoop104,192.168.10.104 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBIW8j1y/nhu5Q4K+3VGjTnChzq9cbIjzozabQQcjUCU9PdVFBIdD8PleMmDBEK6NCHzF7EW1m6n6iA1S4ihC3GM=
hadoop102,192.168.10.102 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBIW8j1y/nhu5Q4K+3VGjTnChzq9cbIjzozabQQcjUCU9PdVFBIdD8PleMmDBEK6NCHzF7EW1m6n6iA1S4ihC3GM=说明hadoop102访问过hadoop102、hadoop103、hadoop104

#输入ssh-keygen -t rsa后,一直点击回车键,直到出现以下结果
[root@hadoop102 .ssh]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:NIVyD44Gsv0U5nVOiN5KA79KFCl4sQvlVO7hlQdhFZA root@hadoop102
The key's randomart image is:
+---[RSA 2048]----+
| .+o..=*o+.      |
|.++o=.Eo*.o      |
|..o=oOoO+*       |
| ..+ooXoo.o      |
|  ..o= +S        |
|    . +          |
|   . .           |
|    .            |
|                 |
+----[SHA256]-----+

# 再次查看.ssh目录,此时生成了公钥和私钥id_rsa、id_rsa.pub
[root@hadoop102 .ssh]# ll
total 12
-rw-------. 1 root root 1675 Nov 28 00:54 id_rsa
-rw-r--r--. 1 root root  396 Nov 28 00:54 id_rsa.pub
-rw-r--r--. 1 root root  558 Nov 28 00:11 known_hosts

# 查看公钥
[root@hadoop102 .ssh]# cat id_rsa.pub
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDC6TzfM8ZEZQ8X0u9zFyWQ65cMuBk8YB3yqEjyEjVYtAHijSg0EpTh3NVrEmWaPaSv+6tCziPYjsdWdIQ1k+h8CgyRnV1vc4UTH+YETcl78ScoLWfDxiEBVmKS4aPMYt21yvExGQ/3wZYGFpGVoiUdpHU/QVbpXPjQZ5KjIkOZxzuqsDx5pUbuLT/TLxB5d0ZGYl5mMmVpFIyPlr+btozGNe23h1KAI/rniIrAqU/jYaeoWcrpFLgmiuoG4DPpPqev+GjBe3CS6ow+2i0UwX9czul+cZMWF58EJrhXX/zD6lC+nr6qX5mUTzuhoQpquYy9YetjhpD4iHOAjwT6sbiX root@hadoop102# 将hadoop102的公钥复制给hadoop102、hadoop103、hadoop104
[root@hadoop102 .ssh]# ssh-copy-id hadoop102
[root@hadoop102 .ssh]# ssh-copy-id hadoop103
[root@hadoop102 .ssh]# ssh-copy-id hadoop104# hadoop102再次访问hadoop103时,将不再输入hadoop103的密码
[root@hadoop102 .ssh]# ssh hadoop103
Last login: Mon Nov 28 00:24:06 2022 from hadoop102
[root@hadoop103 ~]## 查看hadoop103中的.ssh目录,多了一个authorized_keys文件,该文件尾部有一行root@hadoop102表示允许hadoop102免密登录
[root@hadoop103 .ssh]# ll
total 8
-rw-------. 1 root root 396 Nov 28 00:59 authorized_keys
-rw-r--r--. 1 root root 372 Nov 28 00:51 known_hosts
[root@hadoop103 .ssh]# cat authorized_keys
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDC6TzfM8ZEZQ8X0u9zFyWQ65cMuBk8YB3yqEjyEjVYtAHijSg0EpTh3NVrEmWaPaSv+6tCziPYjsdWdIQ1k+h8CgyRnV1vc4UTH+YETcl78ScoLWfDxiEBVmKS4aPMYt21yvExGQ/3wZYGFpGVoiUdpHU/QVbpXPjQZ5KjIkOZxzuqsDx5pUbuLT/TLxB5d0ZGYl5mMmVpFIyPlr+btozGNe23h1KAI/rniIrAqU/jYaeoWcrpFLgmiuoG4DPpPqev+GjBe3CS6ow+2i0UwX9czul+cZMWF58EJrhXX/zD6lC+nr6qX5mUTzuhoQpquYy9YetjhpD4iHOAjwT6sbiX root@hadoop102

# 再次进行同步分发给其他服务器,不用再输入密码了
[root@hadoop102 bin]# xsync /opt/module/kafka_2.12-3.0.0/

修改集群其他服务器的配置

分别在 和 上修改配置文件/opt//kafka\_2.12-3.0.0//.中的 .id=1、.id=2

注:.id 不得重复,整个集群中唯一。更多关于消息中间件 Kafka 系列的学习文章,请参阅:消息中间件 Kafka,本系列持续更新中。

[root@hadoop103 config]# vim server.properties
修改:broker.id=1
[root@hadoop104 config]# vim server.properties
修改:broker.id=2

配置环境变量

#在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
[root@hadoop102 ~]# vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin#刷新一下环境变量
[root@hadoop102 ~]# source /etc/profile#分发环境变量文件到其他服务器,并source /etc/profile
[root@hadoop102 ~]# xsync /etc/profile.d/my_env.sh
[root@hadoop103 ~]$ source /etc/profile
[root@hadoop104 ~]$ source /etc/profile

kafka启动集群

先启动 集群,然后启动 Kafka。

#先启动 Zookeeper 集群,然后启动 Kafka。
[root@hadoop102 kafka]$ zk.sh start#依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka
[root@hadoop102 kafka_2.12-3.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop103 kafka_2.12-3.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop104 kafka_2.12-3.0.0]# bin/kafka-server-start.sh -daemon config/server.properties

kafka关闭集群

[root@hadoop102 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh 
[root@hadoop103 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh 
[root@hadoop104 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh

kafka集群启停脚本

#在/home/hadoop/bin 目录下创建文件 kf.sh 脚本文件
[root@hadoop102 bin]$ vim kf.sh脚本如下:#! /bin/bash
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka_2.12-3.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.12-3.0.0/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka_2.12-3.0.0/bin/kafka-server-stop.sh "done
};;
esac

添加执行权限

[root@hadoop102 bin]$ chmod +x kf.sh

启动集群命令

[root@hadoop102 ~]$ kf.sh start

停止集群命令

[root@hadoop102 ~]$ kf.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后,再停止集群。因为集群当中记录着Kafka集群相关信息,集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。更多关于消息中间件 Kafka 系列的学习文章,请参阅:消息中间件 Kafka,本系列持续更新中。

Kafka 集群中 ,,HW

集群中谁来充当?

每个启动会向zk创建一个临时序号节点,获得的序号最小的那个将会作为集群的

作用

1.负责管理整个集群的分区和副本,当挂了之后,它可以从ISR中找到下一个中的分区副本作为新(选举新的)。

2.当宕机后会将消息传递给其他分区副本(本质就是zk,感受ISR集合的变化并且传递变化消息——>利用watch机制观察ISR集合的节点)。

3.当ISR中新增分区或者减少,也会同步消息给其他的。

重平衡机制 消费策略

range指定,轮询,,range:sum(分区总数)/消费者总数+1。

:粘合策略,如果需要,会在之前已经分配的基础上做出调整,不会改变之前的分配情况,如果没有开启该策略就会重新全部分配,效率较低,建议开启。

HW(高水位) 作用

用HW防止数据丢失,可能副本还没有同步好的消息就被消费者消费,那么当宕机之后消息就丢失了,因为会产生新的

概念

消费者最多只能消费到HW的位置,对于新写入的消息,不能立即消费而是等待该消息被ISR集合进行同步,之后再更新HW,更新完就可以消费。

HW取决于ISR中最小的LEO(某个副本最后消息的消息位置)——>每个完成这个消息的同步之后,HW才会变化。

更多关于消息中间件 Kafka 系列的学习文章,请参阅:消息中间件 Kafk,本系列持续更新中。

参考文章:

/

///

读者专属技术群

构建高质量的技术交流社群,欢迎从事后端开发、运维技术进群(备注岗位,已在技术交流群的请勿重复添加)。主要以技术交流、内推、行业探讨为主,请文明发言。广告人士勿入,切勿轻信私聊,防止被骗。

扫码加我好友,拉你进群

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了