kafka环境搭建与基本使用
2023-12-13 16:54:57
一、环境准备
本地需要提前安装好Java8环境
下载kafka
安装包,然后上传kafka安装包到kafka
文件夹下并进行解压。
$ mkdir kafka
$ chmod -R 777 kafka
$ tar -xzf kafka_2.13-3.6.1.tgz
二、启动zookeeper
修改zookeeper配置
$ cd /kafka/kafka_2.13-3.6.1/config
$ vim zookeeper.properties
$ cat zookeeper.properties
dataDir=/kafka/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=true
admin.serverPort=8185
# 多个节点使用,分隔
zookeeper.connect=localhost:2181
进入bin目录, 启动zookeeper服务
$ cd /kafka/kafka_2.13-3.6.1/bin
$ ./zookeeper-server-start.sh ../config/zookeeper.properties
三、启动kafka
修改kafka配置
$ cd /kafka/kafka_2.13-3.6.1/config
$ vim server.properties
$ cat server.properties
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.248.139:9092
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
启动kafka
$ cd /kafka/kafka_2.13-3.6.1/bin
$ ./kafka-server-start.sh ../config/server.properties
正常启动成功后,可以看到如下日志
[2023-12-11 01:02:43,622] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-11 01:02:43,624] INFO [Controller id=0, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
[2023-12-11 01:02:43,635] INFO Kafka version: 3.6.1 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-11 01:02:43,635] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-11 01:02:43,635] INFO Kafka startTimeMs: 1702278163631 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-11 01:02:43,638] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-12-11 01:02:43,809] INFO [zk-broker-0-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node ubuntu-Jasper-Lake-Client-Platform:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-12-11 01:02:43,849] INFO [zk-broker-0-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node ubuntu-Jasper-Lake-Client-Platform:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
在zookeeper上可以看到如下日志
[2023-12-11 01:02:41,951] INFO Creating new log file: log.4a (org.apache.zookeeper.server.persistence.FileTxnLog)
3.1 启动失败处理
如果看到启动kafka报错如下
[2023-12-11 00:33:53,100] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
kafka.common.InconsistentClusterIdException: The Cluster ID jB8a1sbFS8i5ta-gh4JecA doesn't match stored clusterId Some(M_JKap02Ra2ERDEJiT6WrQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:243)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
[2023-12-11 00:33:53,102] INFO shutting down (kafka.server.KafkaServer)
在server.properties 配置文件里面 找到 log.dirs 配置的路径,将该路径下的文件全部删除[可能是由于kafka重复启动或者非正常关闭造成的
],重新启动kafka。
$ rm -rf /tmp/kafka-logs/
$ ./kafka-server-start.sh ../config/server.properties
四、kafka使用
4.1 创建主题
$ ./kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
4.2 发送主题数据
$ cd/kafka/kafka_2.13-3.6.1/bin
$ ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>11
>12
>13566554
4.2 读取主题数据
$ cd /kafka/kafka_2.13-3.6.1/bin
$ ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
11
12
13566554
文章来源:https://blog.csdn.net/panyongjie2577/article/details/134928964
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!