Kafka部署
kafka部署
一、概念
1.1 角色
Broker:kafka集群包含一个或多个服务器,这种服务器被称为broker。
Topic:每条发布到kafka集群的信息都有一个类别,这个类型被称为Topic。在物理上不同topic的消息分开存储在不同文件夹,逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定topic即可生产或消费数据,而不必关心数据存于何处。topic在逻辑上对record进行分组保存,消费者需要订阅相应的topic才能消费topic中的消息。
partition:是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定partition数量,每个partition对应一个文件夹,该文件夹下存储该partition的数据和索引文件,为了实现数据的高可用,比如将分区0的数据分散到不同的kafka节点,每个分区都有一个broker作为leader和一个broker作为follower。
producter:负责发布消息到kafka。
consumer:每一个consumer属于一个特定的sonsumer group(若不指定则属于默认组)。使用consumer high level API时,同一个topic的一条消息只能被同一个consumer group内的consumer消费,但多个consumer group可以同时消费这一消息。
二、部署
2.1 下载
https://kafka.apache.org/downloads
2.2 解压安装
tar xf kafka_2.13-3.2.3.tgz
#解压缩
cp -r kafka_2.13-3.2.3 /apps/
#复制到部署目录2.3 配置文件
vim config/server.properties
#编辑配置文件
broker.id=1
#brokerID节点,值唯一。
listeners=PLAINTEXT://192.168.1.1:9092
#配置监听端口
log.dirs=/data/kafka-data
#kafka数据目录
num.partitions=1
#partition数量,建议设置为集群节点数量。
log.retention.hours=168
#数据保留时间,单位为小时。
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
#zookeeper集群地址2.4 启动集群
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties &
#启动kafka2.5 使用systemd管理服务
cat << EOF > /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target
After=network.target
[Service]
#EnvironmentFile=-/etc/sysconfig/kafka
Type=simple
User=root
Group=root
ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
ExecStop=/apps/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
EOF
#创建service文件
systemctl daemon-reload
systemctl enable --now kafka
#启动服务三、操作
3.1 操作命令
创建topic
./kafka-topics.sh --create --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --replication-factor 3 --partitions 3 --topic test查看topic
./kafka-topics.sh --describe --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --topic test列出所有topic
./kafka-topics.sh --list --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181删除topic
./kafka-topics.sh --delete --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --topic test创建发布者
./kafka-console-producer.sh --broker-list 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic test创建消费者
./kafka-console-consumer.sh --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --from-beginning --topic test四、管理
3.1 使用客户端工具
http://www.kafkatool.com/download.html
3.2 kafka-manager
3.3 开启JMX
vim /apps/kafka/bin/kafka-server-start.sh
#编辑启动脚本
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
#添加JMX配置