• -------------------------------------------------------------
  • ====================================

zookeeper 和 kafka 集群搭建

kafka dewbay 5年前 (2019-04-12) 1693次浏览 已收录 0个评论 扫描二维码

1、Kafka 使用背景

在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:
我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
我想对用户的搜索关键词进行统计,分析出当前的流行趋势
有些数据,存储数据库浪费,直接存储硬盘效率又低 
这些场景都有一个共同点:
数据是由上游模块产生,上游模块,使用上游模块的数据计算、统计、分析,这个时候就可以使用消息系统,尤其是分布式消息系统!
2、Kafka 的定义
What is Kafka:它是一个分布式消息系统,由 linkedin 使用 scala 编写,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。
3、Kafka 和其他主流分布式消息系统的对比 

定义解释:
1、Java 和 scala 都是运行在 JVM 上的语言。
2、erlang 和最近比较火的和 go 语言一样是从代码级别就支持高并发的一种语言,所以 RabbitMQ 天生就有很高的并发性能,但是 有 RabbitMQ 严格按照 AMQP 进行实现,受到了很多限制。kafka的设计目标是高吞吐量,所以kafka自己设计了一套高性能但是不通用的协议,他也是仿照 AMQP( Advanced Message Queuing Protocol   高级消息队列协议)设计的。 
3、事物的概念:在数据库中,多个操作一起提交,要么操作全部成功,要么全部失败。举个例子, 在转账的时候付款和收款,就是一个事物的例子,你给一个人转账,你转成功,并且对方正常行收到款项后,这个操作才算成功,有一方失败,那么这个操作就是失败的。 
对应消在息队列中,就是多条消息一起发送,要么全部成功,要么全部失败。3 个中只有 ActiveMQ 支持,这个是因为,RabbitMQ 和 Kafka 为了更高的性能,而放弃了对事物的支持 。
4、集群:多台服务器组成的整体叫做集群,这个整体对生产者和消费者来说,是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的。
5、负载均衡,对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息,系统必须均衡这些请求使得每一台服务器的请求达到平衡,而不是大量的请求,落到某一台或几台,使得这几台服务器高负荷或超负荷工作,严重情况下会停止服务或宕机。
6、动态扩容是很多公司要求的技术之一,不支持动态扩容就意味着停止服务,这对很多公司来说是不可以接受的。 
注:
阿里巴巴的 Metal,RocketMQ 都有 Kafka 的影子,他们要么改造了 Kafka 或者借鉴了 Kafka,最后 Kafka 的动态扩容是通过Zookeeper来实现的。 
 
Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka 增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
Kafka 相关概念
1、 AMQP 协议

Advanced Message Queuing Protocol (高级消息队列协议)
The Advanced Message Queuing Protocol (AMQP):是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP 定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现 AMQP 协议的程序都可以和与 AMQP 协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
 
上面说的 3 种比较流行的消息队列协议,要么支持 AMQP 协议,要么借鉴了 AMQP 协议的思想进行了开发、实现、设计。
2、 一些基本的概念
1、消费者:(Consumer):从消息队列中请求消息的客户端应用程序
2、生产者:(Producer)  :向 broker 发布消息的应用程序
3、AMQP 服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于 fafka 将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个 broker 就是一个应用程序的实例
kafka支持的客户端语言:Kafka 客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的 consumer 从 kafka 集群订阅消息也可以自己写 producer 程序) 
3、Kafka 架构
生产者生产消息、kafka 集群、消费者获取消息这样一种架构,如下图:

kafka 集群中的消息,是通过 Topic(主题)来进行组织的,如下图:

一些基本的概念:
1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
2、分区(Partition):一个 Topic 中的消息数据按照多个分区组织,分区是 kafka 消息队列组织的最小单位,一个分区可以看作是一个 FIFO( First Input First Output 的缩写,先入先出队列)的队列。
kafka 分区是提高 kafka 性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加 Topic 的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
 
工作图:

 
 
备份(Replication):为了保证分布式可靠性,kafka0.8 开始对每个分区的数据进行备份(不同的 Broker 上),防止其中一个 Broker 宕机造成分区上的数据不可用。
kafka0.7 是一个很大的改变:1、增加了备份 2、增加了控制借点概念,增加了集群领导者选举 。
Zookeeper集群搭建
Kafka 集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
1、软件环境
(3 台服务器-我的测试)
192.168.7.100 server1
192.168.7.101 server2
192.168.7.107 server3
1、Linux 服务器一台、三台、五台、(2n+1),Zookeeper 集群的工作是超过半数才能对外提供服务,3 台中超过两台超过半数,允许 1 台挂掉 ,是否可以用偶数,其实没必要。 如果有四台那么挂掉一台还剩下三台服务器,如果在挂掉一个就不行了,这里记住是超过半数。 2、Java jdk1.7 zookeeper 是用 java 写的所以他的需要 JAVA 环境,java 是运行在 java 虚拟机上的 3、Zookeeper 的稳定版本 Zookeeper 3.4.6 版本  2、配置&安装 Zookeeper 下面的操作是:3 台服务器统一操作 1、安装 Java yum list java
yum -y install java-1.7.0-openjdk*
2、下载 Zookeeper

首先要注意在生产环境中目录结构要定义好,防止在项目过多的时候找不到所需的项目

我的目录统一放在/opt 下面

首先创建 Zookeeper 项目目录

mkdir zookeeper #项目目录
mkdir zkdata #存放快照日志
mkdir zkdatalog#存放事物日志
下载 Zookeeper

下载软件

cd /opt/zookeeper/

wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

解压软件

tar -zxvf zookeeper-3.4.6.tar.gz

3、修改配置文件

进入到解压好的目录里面的 conf 目录中,查看

进入 conf 目录

/opt/zookeeper/zookeeper-3.4.6/conf

查看

[root@192.168.7.107]

$ ll
-rw-rw-r–. 1 1000 1000 535 Feb 20 2014 configuration.xsl
-rw-rw-r–. 1 1000 1000 2161 Feb 20 2014 log4j.properties
-rw-rw-r–. 1 1000 1000 922 Feb 20 2014 zoo_sample.cfg

zoo_sample.cfg  这个文件是官方给我们的 zookeeper 的样板文件,给他复制一份命名为 zoo.cfg,zoo.cfg 是官方指定的文件命名规则。

3 台服务器的配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zkdatalog
clientPort=12181
server.1=192.168.7.100:12888:13888
server.2=192.168.7.101:12888:13888
server.3=192.168.7.107:12888:13888

server.1 这个 1 是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面 myid 文件里

192.168.7.107 为集群里的 IP 地址,第一个端口是 master 和 slave 之间的通信端口,默认是 2888,第二个端口是 leader 选举的端口,集群刚启动的时候选举或者 leader 挂掉之后进行新的选举的端口默认是 3888

配置文件解释:

tickTime:

这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

initLimit:

这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

syncLimit:

这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒

dataDir:

快照日志的存储路径

dataLogDir:

事物日志的存储路径,如果不配置这个那么事物日志会默认存储到 dataDir 制定的目录,这样会严重影响 zk 的性能,当 zk 吞吐量较大的时候,产生的事物日志、快照日志太多

clientPort:

这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

创建 myid 文件

server1

echo “1” > /opt/zookeeper/zkdata/myid

server2

echo “2” > /opt/zookeeper/zkdata/myid

server3

echo “3” > /opt/zookeeper/zkdata/myid
 4、重要配置说明

1、myid 文件和 server.myid  在快照目录下存放的标识本台服务器的文件,他是整个 zk 集群用来发现彼此的一个重要标识。

2、zoo.cfg 文件是 zookeeper 配置文件 在 conf 目录里。

3、log4j.properties 文件是 zk 的日志输出文件 在 conf 目录里用 java 写的程序基本上有个共同点日志都用 log4j,来进行管理。

 configuration for log4j
4、zkEnv.sh 和 zkServer.sh 文件

zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置,zookeeper 集群启动时配置环境变量的文件
5、还有一个需要注意
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
zookeeper 不会主动的清除旧的快照和日志文件,这个是操作者的责任。
但是可以通过命令去定期的清理。

!/bin/bash

snapshot file dir

dataDir=/opt/zookeeper/zkdata/version-2

tran log dir

dataLogDir=/opt/zookeeper/zkdatalog/version-2

Leave 66 files

count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f

以上这个脚本定义了删除对应两个目录中的文件,保留最新的 66 个文件,可以将他写到 crontab 中,设置为每天凌晨 2 点执行一次就可以了。

zk log dir del the zookeeper log

logDir=

ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f

其他方法:

第二种:使用 ZK 的工具类 PurgeTxnLog,它的实现了一种简单的历史文件清理策略,可以在这里看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html 

第三种:对于上面这个执行,ZK 自己已经写好了脚本,在 bin/zkCleanup.sh 中,所以直接使用这个脚本也是可以执行清理工作的。

第四种:从 3.4.0 开始,zookeeper 提供了自动清理 snapshot 和事务日志的功能,通过配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在 zoo.cfg 中配置的:

autopurge.purgeInterval  这个参数指定了清理频率,单位是小时,需要填写一个 1 或更大的整数,默认是 0,表示不开启自己清理功能。
autopurge.snapRetainCount 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留 3 个。
 
推荐使用第一种方法,对于运维人员来说,将日志清理工作独立出来,便于统一管理也更可控。毕竟 zk 自带的一些工具并不怎么给力。
5、启动服务并查看
1、启动服务

进入到 Zookeeper 的 bin 目录下

cd /opt/zookeeper/zookeeper-3.4.6/bin

启动服务(3 台都需要操作)

./zkServer.sh start
2、检查服务状态

检查服务器状态

./zkServer.sh status
通过 status 就能看到状态:

./zkServer.sh status
JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg #配置文件
Mode: follower #他是否为领导
zk 集群一般只有一个 leader,多个 follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从 follower 里投票选举一个 leader 出来。

可以用“jps”查看 zk 的进程,这个是 zk 的整个工程的 main

执行命令 jps

20348 Jps
4233 QuorumPeerMain
Kafka 集群搭建
1、软件环境
1、linux 一台或多台,大于等于 2
2、已经搭建好的 zookeeper 集群
3、软件版本 kafka_2.11-0.9.0.1.tgz
2、创建目录并下载安装软件

创建目录

cd /opt/
mkdir kafka #创建项目目录
cd kafka
mkdir kafkalogs #创建 kafka 消息目录,主要存放 kafka 消息

下载软件

wget http://apache.opencas.org/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

解压软件

tar -zxvf kafka_2.11-0.9.0.1.tgz

3、修改配置文件

进入到 config 目录
cd /opt/kafka/kafka_2.11-0.9.0.1/config/
主要关注:server.properties 这个文件即可,我们可以发现在目录下:

有很多文件,这里可以发现有 Zookeeper 文件,我们可以根据 Kafka 内带的 zk 集群来启动,但是建议使用独立的 zk 集群

-rw-r–r–. 1 root root 5699 Feb 22 09:41 192.168.7.101
-rw-r–r–. 1 root root 906 Feb 12 08:37 connect-console-sink.properties
-rw-r–r–. 1 root root 909 Feb 12 08:37 connect-console-source.properties
-rw-r–r–. 1 root root 2110 Feb 12 08:37 connect-distributed.properties
-rw-r–r–. 1 root root 922 Feb 12 08:38 connect-file-sink.properties
-rw-r–r–. 1 root root 920 Feb 12 08:38 connect-file-source.properties
-rw-r–r–. 1 root root 1074 Feb 12 08:37 connect-log4j.properties
-rw-r–r–. 1 root root 2055 Feb 12 08:37 connect-standalone.properties
-rw-r–r–. 1 root root 1199 Feb 12 08:37 consumer.properties
-rw-r–r–. 1 root root 4369 Feb 12 08:37 log4j.properties
-rw-r–r–. 1 root root 2228 Feb 12 08:38 producer.properties
-rw-r–r–. 1 root root 5699 Feb 15 18:10 server.properties
-rw-r–r–. 1 root root 3325 Feb 12 08:37 test-log4j.properties
-rw-r–r–. 1 root root 1032 Feb 12 08:37 tools-log4j.properties
-rw-r–r–. 1 root root 1023 Feb 12 08:37 zookeeper.properties

修改配置文件:

broker.id=0 #当前机器在集群中的唯一标识,和 zookeeper 的 myid 性质一样
port=19092 #当前 kafka 对外提供服务的端口默认是 9092
host.name=192.168.7.100 #这个参数默认是关闭的,在 0.8.1 有个 bug,DNS 解析问题,失败率的问题。
num.network.threads=3 #这个是 borker 进行网络处理的线程数
num.io.threads=8 #这个是 borker 进行 I/O 处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的 num.io.threads 要大于这个目录的个数这个目录,如果配置多个目录,新创建的 topic 他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区 buffer 大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka 接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向 kafka 请求消息或者向 kafka 发送消息的请请求的最大数,这个值不能超过 java 的堆栈大小
num.partitions=1 #默认的分区数,一个 topic 默认 1 个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168 小时,7 天
message.max.byte=5242880 #消息保存的最大值 5M
default.replication.factor=2 #kafka 保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为 kafka 的消息是以追加的形式落地到文件,当超过这个值的时候,kafka 会新起一个文件
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去检查上面配置的 log 失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用 log 压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置 zookeeper 的连接端口

上面是参数的解释,实际的修改项为:

broker.id=0 每台服务器的 broker.id 都不能相同

hostname

host.name=192.168.7.100

在 log.retention.hours=168 下面新增下面三项

message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

设置 zookeeper 的连接端口

zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:12181

4、启动 Kafka 集群并测试

1、启动服务

从后台启动 Kafka 集群(3 台都需要启动)

cd
/opt/kafka/kafka_2.11-0.9.0.1//bin #进入到 kafka 的 bin 目录
./kafka-server-start.sh -daemon ../config/server.properties
2、检查服务是否启动

执行命令 jps

20348 Jps
4233 QuorumPeerMain
18991 Kafka
3、创建 Topic 来验证是否创建成功

更多请看官方文档:http://kafka.apache.org/documentation.html

创建 Topic

./kafka-topics.sh –create –zookeeper 192.168.7.100:12181 –replication-factor 2 –partitions 1 –topic shuaige

解释

–replication-factor 2 #复制两份
–partitions 1 #创建 1 个分区
–topic #主题为 shuaige

”’在一台服务器上创建一个发布者”’

创建一个 broker,发布者

./kafka-console-producer.sh –broker-list 192.168.7.100:19092 –topic shuaige

”’在一台服务器上创建一个订阅者”’
./kafka-console-consumer.sh –zookeeper localhost:12181 –topic shuaige –from-beginning

测试(在发布者那里发布消息看看订阅者那里是否能正常收到~):

4、其他命令

大部分命令可以去官方文档查看

4.1、查看 topic

./kafka-topics.sh –list –zookeeper localhost:12181

就会显示我们创建的所有 topic

4.2、查看 topic 状态

/kafka-topics.sh –describe –zookeeper localhost:12181 –topic shuaige

下面是显示信息

Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:
Topic: shuaige Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1

分区为为 1 复制因子为 2 他的 shuaige 的分区为 0

Replicas: 0,1 复制的为 0,1

 OKkafka 集群搭建完毕

5、其他说明标注

5.1、日志说明

默认 kafka 的日志是保存在/opt/kafka/kafka_2.10-0.9.0.0/logs 目录下的,这里说几个需要注意的日志

server.log #kafka 的运行日志
state-change.log #kafka 他是用 zookeeper 来保存状态,所以他可能会进行切换,切换的日志就保存在这里

controller.log #kafka 选择一个节点作为“controller”,当发现有节点 down 掉的时候它负责在游泳分区的所有节点中选择新的 leader,这使得 Kafka 可以批量的高效的管理所有分区节点的主从关系。如果 controller down 掉了,活着的节点中的一个会备切换为新的 controller.
5.2、上面的大家你完成之后可以登录 zk 来查看 zk 的目录情况

使用客户端进入 zk

./zkCli.sh -server 127.0.0.1:12181 #默认是不用加’-server‘参数的因为我们修改了他的端口

查看目录情况 执行“ls /”

[zk: 127.0.0.1:12181(CONNECTED) 0]

ls /

显示结果:[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]

”’
上面的显示结果中:只有 zookeeper 是,zookeeper 原生的,其他都是 Kafka 创建的
”’

标注一个重要的

[zk: 127.0.0.1:12181(CONNECTED) 1]

get /brokers/ids/0
{“jmx_port”:-1,”timestamp”:”1456125963355″,”endpoints”:[“PLAINTEXT://192.168.7.100:19092″],”host”:”192.168.7.100″,”version”:2,”port”:19092}
cZxid = 0x1000001c1
ctime = Mon Feb 22 15:26:03 CST 2016
mZxid = 0x1000001c1
mtime = Mon Feb 22 15:26:03 CST 2016
pZxid = 0x1000001c1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x152e40aead20016
dataLength = 139
numChildren = 0

[zk: 127.0.0.1:12181(CONNECTED) 2]

还有一个是查看 partion

[zk: 127.0.0.1:12181(CONNECTED) 7]

get /brokers/topics/shuaige/partitions/0
null
cZxid = 0x100000029
ctime = Mon Feb 22 10:05:11 CST 2016
mZxid = 0x100000029
mtime = Mon Feb 22 10:05:11 CST 2016
pZxid = 0x10000002a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

[zk: 127.0.0.1:12181(CONNECTED) 8]

作者:my_bai
来源:CSDN
原文:https://blog.csdn.net/my_bai/article/details/68490632
版权声明:本文为博主原创文章,转载请附上博文链接!


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:zookeeper 和 kafka 集群搭建
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址