kafka环境搭建与基本使用

2023-12-13 16:54:57

一、环境准备

官网地址

下载地址

本地需要提前安装好Java8环境

下载kafka安装包,然后上传kafka安装包到kafka文件夹下并进行解压。

$ mkdir kafka
$ chmod -R 777 kafka
$ tar -xzf kafka_2.13-3.6.1.tgz

二、启动zookeeper

修改zookeeper配置

$ cd /kafka/kafka_2.13-3.6.1/config
$ vim zookeeper.properties
$ cat zookeeper.properties
dataDir=/kafka/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=true
admin.serverPort=8185
# 多个节点使用,分隔
zookeeper.connect=localhost:2181

进入bin目录, 启动zookeeper服务

$ cd /kafka/kafka_2.13-3.6.1/bin
$ ./zookeeper-server-start.sh ../config/zookeeper.properties

三、启动kafka

修改kafka配置

$ cd /kafka/kafka_2.13-3.6.1/config
$ vim server.properties
$ cat server.properties

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.248.139:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


############################# Log Basics #############################

log.dirs=/tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

启动kafka

$ cd /kafka/kafka_2.13-3.6.1/bin
$ ./kafka-server-start.sh ../config/server.properties

正常启动成功后,可以看到如下日志

[2023-12-11 01:02:43,622] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2023-12-11 01:02:43,624] INFO [Controller id=0, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
[2023-12-11 01:02:43,635] INFO Kafka version: 3.6.1 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-11 01:02:43,635] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-11 01:02:43,635] INFO Kafka startTimeMs: 1702278163631 (org.apache.kafka.common.utils.AppInfoParser)
[2023-12-11 01:02:43,638] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-12-11 01:02:43,809] INFO [zk-broker-0-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node ubuntu-Jasper-Lake-Client-Platform:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-12-11 01:02:43,849] INFO [zk-broker-0-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node ubuntu-Jasper-Lake-Client-Platform:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

在zookeeper上可以看到如下日志

[2023-12-11 01:02:41,951] INFO Creating new log file: log.4a (org.apache.zookeeper.server.persistence.FileTxnLog)

3.1 启动失败处理

如果看到启动kafka报错如下

[2023-12-11 00:33:53,100] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
kafka.common.InconsistentClusterIdException: The Cluster ID jB8a1sbFS8i5ta-gh4JecA doesn't match stored clusterId Some(M_JKap02Ra2ERDEJiT6WrQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:243)
        at kafka.Kafka$.main(Kafka.scala:113)
        at kafka.Kafka.main(Kafka.scala)
[2023-12-11 00:33:53,102] INFO shutting down (kafka.server.KafkaServer)

在server.properties 配置文件里面 找到 log.dirs 配置的路径,将该路径下的文件全部删除[可能是由于kafka重复启动或者非正常关闭造成的],重新启动kafka。

$ rm -rf /tmp/kafka-logs/
$ ./kafka-server-start.sh ../config/server.properties

四、kafka使用

4.1 创建主题

$ ./kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.

4.2 发送主题数据

$ cd/kafka/kafka_2.13-3.6.1/bin
$ ./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>11
>12
>13566554

4.2 读取主题数据

$ cd /kafka/kafka_2.13-3.6.1/bin
$ ./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
11
12
13566554

文章来源:https://blog.csdn.net/panyongjie2577/article/details/134928964
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。