kafka部署

一、概念

1.1 角色

Broker:kafka集群包含一个或多个服务器,这种服务器被称为broker。

Topic:每条发布到kafka集群的信息都有一个类别,这个类型被称为Topic。在物理上不同topic的消息分开存储在不同文件夹,逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定topic即可生产或消费数据,而不必关心数据存于何处。topic在逻辑上对record进行分组保存,消费者需要订阅相应的topic才能消费topic中的消息。

partition:是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定partition数量,每个partition对应一个文件夹,该文件夹下存储该partition的数据和索引文件,为了实现数据的高可用,比如将分区0的数据分散到不同的kafka节点,每个分区都有一个broker作为leader和一个broker作为follower。

producter:负责发布消息到kafka。

consumer:每一个consumer属于一个特定的sonsumer group(若不指定则属于默认组)。使用consumer high level API时,同一个topic的一条消息只能被同一个consumer group内的consumer消费,但多个consumer group可以同时消费这一消息。

二、部署

2.1 下载

https://kafka.apache.org/downloads

2.2 解压安装

tar xf kafka_2.13-3.2.3.tgz
#解压缩

cp -r kafka_2.13-3.2.3 /apps/
#复制到部署目录

2.3 配置文件

vim config/server.properties
#编辑配置文件

broker.id=1
#brokerID节点,值唯一。

listeners=PLAINTEXT://192.168.1.1:9092
#配置监听端口

log.dirs=/data/kafka-data
#kafka数据目录

num.partitions=1
#partition数量,建议设置为集群节点数量。

log.retention.hours=168
#数据保留时间,单位为小时。


zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
#zookeeper集群地址

2.4 启动集群

/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties &
#启动kafka

2.5 使用systemd管理服务

cat << EOF > /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target
After=network.target

[Service]
#EnvironmentFile=-/etc/sysconfig/kafka
Type=simple
User=root
Group=root
ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
ExecStop=/apps/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target
EOF
#创建service文件

systemctl daemon-reload
systemctl enable --now kafka
#启动服务

三、操作

3.1 操作命令

创建topic

./kafka-topics.sh --create --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --replication-factor 3 --partitions 3 --topic test

查看topic

./kafka-topics.sh --describe --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181  --topic test

列出所有topic

./kafka-topics.sh --list --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181

删除topic

./kafka-topics.sh --delete --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181  --topic test

创建发布者

./kafka-console-producer.sh --broker-list  192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic test

创建消费者

./kafka-console-consumer.sh --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --from-beginning --topic test

四、管理

3.1 使用客户端工具

http://www.kafkatool.com/download.html

3.2 kafka-manager

https://github.com/yahoo/CMAK

3.3 开启JMX

vim /apps/kafka/bin/kafka-server-start.sh
#编辑启动脚本

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi
#添加JMX配置

标签: Message Queue, Kafka

添加新评论