Kafka命令行操作

2023-12-16 04:52:32

版本:3.6.1

1 kafka-topics.sh

Create, delete, describe, or change a topic.

创建、删除、描述或更改主题。

Option(选项)Description(描述)翻译
–alterAlter the number of partitions and replica assignment. Update the configuration of an existing topic via --alter is no longer supported here (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option).更改分区数和副本分配。此处不再支持通过–alter更新现有主题的配置(kafka configs CLI支持使用–bootstrap server选项更改主题配置)。
–at-min-isr-partitionsIf set when describing topics, only show partitions whose isr count is equal to the configured minimum.如果在描述主题时设置,则仅显示isr计数等于配置的最小值的分区。
–bootstrap-server <String: server to connect to>REQUIRED: The Kafka server to connect to.必需:要连接的Kafka服务器。
–command-config <String: command config property file>Property file containing configs to be passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.包含要传递给管理客户端的配置的属性文件。这仅与–bootstrap服务器选项一起使用,用于描述和更改代理配置。
–config <String: name=value>A topic configuration override for the topic being created or altered.正在创建或更改的主题的主题配置覆盖。
–createCreate a new topic.创建一个新的主题。
–deleteDelete a topic.删除一个主题。
–delete-config <String: name>A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option.为现有主题删除主题配置。(请参阅–config选项下的配置列表)。不支持–bootstrap-server选项。
–describeList details for the given topics.列出给定主题的详细信息。
–exclude-internalExclude internal topics when running list or describe command. The internal topics will be listed by default.运行list或describe命令时排除内部主题。默认情况下会列出内部主题。
–helpPrint usage information.打印使用信息。
–if-existsif set when altering or deleting or describing topics, the action will only execute if the topic exists.如果在更改、删除或描述主题时设置,则仅当主题存在时才会执行该操作。
–if-not-existsIf set when creating topics, the action will only execute if the topic does not already exist.如果在创建主题时设置,则仅当主题不存在时才会执行该操作。
–listList all available topics.列出所有可用的主题。
–partitions <Integer: # of partitions>The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.正在创建或更改的主题的分区数(警告:如果为具有键的主题增加分区,则分区逻辑或消息顺序将受到影响)。如果未提供用于创建,则默认为集群默认值。
–replica-assignment <String: broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , …>A list of manual partition-to-broker assignments for the topic being created or altered.为正在创建或更改的主题分配给代理程序的手动分区列表。
–replication-factor <Integer: replication factor>The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.正在创建的主题中每个分区的副本。如果未提供,则默认为集群默认值。
–topic <String: topic>The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the ‘’ prefix to escape regular expression symbols; e.g. “test.topic”.要创建、更改、描述或删除的主题。它还接受一个正则表达式,但–create选项除外。将主题名称置于双引号中,并使用“\”前缀转义正则表达式符号;例如“test.topic”。
–topic-id <String: topic-id>The topic-id to describe.This is used only with --bootstrap-server option for describing topics.要描述的主题id。这仅与用于描述主题的–bootstrap-server选项一起使用。
–topics-with-overridesIf set when describing topics, only show topics that have overridden configs.如果在描述主题时设置,则仅显示已重写配置的主题。
–unavailable-partitionsIf set when describing topics, only show partitions whose leader is not available.如果在描述主题时设置,则仅显示其leader不可用的分区。
–under-min-isr-partitionsIf set when describing topics, only show partitions whose isr count is less than the configured minimum.如果在描述主题时设置,则仅显示isr计数小于配置的最小值的分区。
–under-replicated-partitionsIf set when describing topics, only show under replicated partitions.如果在描述主题时设置,则仅显示已复制分区
–versionDisplay Kafka version.显示Kafka版本。

1.1 查看所有topic

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --list

1.2 创建topic

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --create --partitions 1 --replication-factor 3 --topic first

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

1.3 描述topic详情

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --describe --topic first

1.4 修改topic分区数

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --alter --topic first --partitions 3

1.5 删除topic

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --delete --topic first

2 kafka-console-producer.sh

This tool helps to read data from standard input and publish it to Kafka.

该工具从标准输入中读取数据并将其发布到Kafka。

Option(选项)Description(描述)翻译
–batch-size <Integer: size>Number of messages to send in a single batch if they are not being sent synchronously. please note that this option will be replaced if max-partition-memory-bytes is also set (default: 16384)如果不是同步发送,则在单个批处理中发送的消息数。请注意,如果还设置了最大分区内存字节数(max-partition-memory-bytes)(默认值:16384),则此选项将被替换
–bootstrap-server <String: server to connect to>REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.必需,除非指定了–broker-list(已弃用)。要连接到的服务器。形式为HOST1:PORT1,HOST2:PORT2的broker列表字符串。
–broker-list <String: broker-list>DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.已弃用,请改用–bootstrap-server;如果指定了–bootstrap-server,则忽略。形式为HOST1:PORT1,HOST2:PORT2的broker列表字符串。
–compression-codec [String:compression-codec]The compression codec: either ‘none’, ‘gzip’, ‘snappy’, ‘lz4’, or ‘zstd’.If specified without value, then it defaults to ‘gzip’压缩编解码器:“none”、“gzip”、“snappy”、“lz4”或“zstd”。如果未指定值,则默认为“gzip”
–helpPrint usage information.打印使用信息。
–line-reader <String: reader_class>The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka.tools.ConsoleProducer$LineMessageReader)用于从中的标准读取行的类的类名。默认情况下,每一行都作为一条单独的消息读取。(默认值:kafka.tools.ConsoleProducer$LineMessageReader)
–max-block-ms <Long: max block on send>The max time that the producer will block for during a send request.(default: 60000)生产者在发送请求期间阻止的最长时间。(默认值:60000)
–max-memory-bytes <Long: total memory in bytes>The total memory used by the producer to buffer records waiting to be sent to the server. This is the option to control buffer.memory in producer configs. (default: 33554432)生产者用于缓冲等待发送到服务器的记录的总内存。这是在生产者配置中控制“buffer.memory”的选项。(默认值:33554432)
–max-partition-memory-bytes <Integer: memory in bytes per partition>The buffer size allocated for a partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. This is the option to control batch.size in producer configs. (default: 16384)为一个分区分配的缓冲区大小。当接收到小于此大小的记录时,生产者将尝试乐观地将它们组合在一起,直到达到此大小。这是在生产者配置中控制batch.size的选项。(默认值:16384)
–message-send-max-retries Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. This is the option to control retries in producer configs. (default: 3)Brokers可能由于多种原因无法接收消息,而暂时不可用只是其中之一。此属性指定生产者放弃并丢弃此消息之前的重试次数。这是在生产者配置中控制·retries·的选项。(默认值:3)
–metadata-expiry-ms <Long: metadata expiration interval>The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any leadership changes. This is the option to control metadata.max.age.ms in producer configs. (default:300000)以毫秒为单位的一段时间,在此之后,即使我们没有看到任何领导层变动,我们也会强制刷新元数据。这是在生产者配置中控制metadata.max.age.ms的选项。(默认值:300000)
–producer-property <String: producer_prop>A mechanism to pass user-defined properties in the form key=value to the producer.将key=value形式的用户定义属性传递给生产者的机制。
–producer.config <String: config file>Producer config properties file. Note that [producer-property] takes precedence over this config.生产者配置属性文件。请注意,[producer-property]优先于此配置。
–property <String: prop>A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader.将key=value形式的用户定义属性传递给消息读取器的机制。这允许对用户定义的消息读取器进行自定义配置。
–reader-config <String: config file>Config properties file for the message reader. Note that [property] takes precedence over this config.配置消息读取器的属性文件。请注意,[property]优先于此配置。
–request-required-acks <String: request required acks>The required acks of the producer requests (default: -1)生产者请求所需的“acks”(默认值:-1)
–request-timeout-ms <Integer: request timeout ms>The ack timeout of the producer requests. Value must be non-negative and non-zero. (default: 1500)生产者请求的ack超时。值必须为非负且非零。(默认值:1500)
–retry-backoff-ms Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. This is the option to control retry.backoff.ms in producer configs. (default: 100)每次重试之前,生产者都会刷新相关主题的元数据。由于leader选举需要一些时间,因此此属性指定生产者在刷新元数据之前等待的时间。这是在生产者配置中控制“retry.backoff.ms”的选项。(默认值:100)
–socket-buffer-size <Integer: size>The size of the tcp RECV size. This is the option to control send.buffer.bytes in producer configs. (default: 102400)tcp RECV size的大小。这是在生产者配置中控制“send.buffer.bytes”的选项。(默认值:102400)
–syncIf set message send requests to the brokers are synchronously, one at a time as they arrive.如果设置消息发送请求到brokers是同步的,则在它们到达时一次一个。
–timeout <Long: timeout_ms>If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. This is the option to control linger.ms in producer configs. (default: 1000)如果设置了,并且生产者以异步模式运行,这将为消息排队等待足够的批处理大小提供最长时间。该值以毫秒为单位。这是在生产者配置中控制linger.ms的选项`。(默认值:1000)
–topic <String: topic>REQUIRED: The topic id to produce messages to.REQUIRED:要向其生成消息的主题id。
–versionDisplay Kafka version.显示Kafka版本。

2.1 发送消息

kafka-console-producer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --topic first

3 kafka-console-consumer.sh

This tool helps to read data from Kafka topics and outputs it to standard output.

该工具读取Kafka主题中的数据,并将其输出到标准输出。

Option(选项)Description(描述)翻译
–bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.必需:要连接到的服务器。
–consumer-property <String: consumer_prop>A mechanism to pass user-defined properties in the form key=value to the consumer.将key=value形式的用户定义属性传递给消费者的机制。
–consumer.config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.消费者配置属性文件。请注意,[consumer-property]优先于此配置。
–enable-systest-eventsLog lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)除了记录所消费的消息之外,还记录消费者的生命周期事件。(这是针对系统测试的。)
–formatter <String: class>The name of a class to use for formatting kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)用于格式化要显示的kafka消息的类的名称。(默认值:kafka.tools.DefaultMessageFormatter)
–formatter-config <String: config file>Config properties file to initialize the message formatter. Note that [property] takes precedence over this config.配置属性文件以初始化消息格式化程序。请注意,[property]优先于此配置。
–from-beginningIf the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.如果消费者还没有建立的偏移量,从日志中出现的最早消息开始,而不是从最新消息开始。
–group <String: consumer group id>The consumer group id of the consumer.消费者的消费者组id。
–helpPrint usage information.打印使用信息。
–include <String: Java regex (String)>Regular expression specifying list of topics to include for consumption.指定要包含以供使用的主题列表的正则表达式。
–isolation-level Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default: read_uncommitted)设置为read_committed,以便筛选出未提交的事务性消息。设置为read_uncommitted可读取所有消息。(默认值:read_uncommitted)
–key-deserializer <String: deserializer for key>
–max-messages <Integer: num_messages>The maximum number of messages to consume before exiting. If not set, consumption is continual.退出前要消费的最大消息数。如果未设置,则消费是连续的。
–offset <String: consume offset>The offset to consume from (a non-negative number), or ‘earliest’ which means from beginning, or ‘latest’ which means from end (default: latest)从(非负数)开始消费的偏移量,或“earliest”表示从开始,或“latest”表示从结束(默认值:最新)
–partition <Integer: partition>The partition to consume from. Consumption starts from the end of the partition unless ‘–offset’ is specified.要从中消费的分区。除非指定了“–offset”,否则消费从分区的末尾开始。
–property <String: prop>The properties to initialize the message formatter.用于初始化消息格式化程序的配置文件。
–skip-message-on-errorIf there is an error when processing a message, skip it instead of halt.如果在处理消息时出现错误,跳过它而不是停止。
–timeout-ms <Integer: timeout_ms>If specified, exit if no message is available for consumption for the specified interval.如果指定了,如果在指定的时间间隔内没有消息可供消费,则退出。
–topic <String: topic>The topic to consume on.要消费的主题。
–value-deserializer <String: deserializer for values>
–versionDisplay Kafka version.显示Kafka版本。
–whitelist <String: Java regex (String)>DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.弃用,请改用–include;如果指定了–include,则忽略。指定要包含以供使用的主题列表的正则表达式。

3.1 消费主题中的数据

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --topic first

3.2 从头开始消费数据

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --from-beginning --topic first

3.3 指定消费者组消费

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --from-beginning --topic first --group first_group

3.4 打印时间戳

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --from-beginning --topic first --property print.timestamp=true

4 kafka-consumer-groups.sh

Option(选项)Description(描述)翻译
–all-groupsApply to all consumer groups.应用于所有消费者组。
–all-topicsConsider all topics assigned to a group in the reset-offsets process.reset-offsets过程中,应用于一个组的所有主题。
–bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.必需:要连接到的服务器。
–by-duration <String: duration>Reset offsets to offset by duration from current timestamp. Format: ‘PnDTnHnMnS’将偏移量重置为当前时间戳的持续时间偏移量。格式:“PnDTnHnMnS”
–command-config <String: command config property file>Property file containing configs to be passed to Admin Client and Consumer.包含要传递给管理客户端和消费者的配置的属性文件。
–deletePass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 --group g2传入消费者组以删除整个消费者组的主题分区偏移和所有权信息。例如 --group g1 --group g2
–delete-offsetsDelete offsets of consumer group. Supports one consumer group at the time, and multiple topics.删除消费者组的偏移量。一次支持一个消费者组和多个主题。
–describeDescribe consumer group and list offset lag (number of messages not yet processed) related to given group.描述消费者组并列出与给定组相关的偏移滞后(尚未处理的消息数)。
–dry-runOnly show results without executing changes on Consumer Groups. Supported operations: reset-offsets.仅显示结果,而不对消费者组执行更改。支持的操作:reset-offsets。
–executeExecute operation. Supported operations: reset-offsets.执行操作。支持的操作:reset-offsets。
–exportExport operation execution to a CSV file. Supported operations: reset-offsets.将操作执行导出到CSV文件。支持的操作:reset-offsets。
–from-file <String: path to CSV file>Reset offsets to values defined in CSV file.将偏移重置为CSV文件中定义的值。
–group <String: consumer group>The consumer group we wish to act on.我们希望采取行动的消费者组。
–helpPrint usage information.打印使用信息。
–listList all consumer groups.列出所有消费者组。
–membersDescribe members of the group. This option may be used with ‘–describe’ and ‘–bootstrap-server’ options only.
Example: --bootstrap-server localhost: 9092 --describe --group group1 --members
描述小组成员。此选项只能与“–description”和“–bootstrap server”选项一起使用。
示例:–bootstrap-server localhost:9092 --describe --group group1–members
–offsetsDescribe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with ‘–describe’ and ‘–bootstrap-server’ options only.
Example: --bootstrap-server localhost: 9092 --describe --group group1 --offsets
描述组并列出组中的所有主题分区及其偏移滞后。这是的默认子操作,只能与“–description”和“–bootstrap server”选项一起使用。
示例:–bootstrap-server localhost: 9092 --describe --group group1 --offsets
–reset-offsetsReset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive.
Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. Additionally, the --export option is used to export the results to a CSV format.
You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset.
To define the scope use --all-topics or --topic. One scope must be specified unless you use ‘–from-file’.
重置消费者组的偏移量。一次支持一个消费者组,并且实例应处于非活动状态。
有两个执行选项:–dry-run(默认)计划要重置的偏移量,–execute更新偏移量。此外,–export选项用于将结果导出为CSV格式。
您必须选择以下重置规范之一:–to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset。
要定义范围,请使用—all-topics或–topic。除非使用“–from-file”,否则必须指定一个作用域。
–shift-by <Long: number-of-offsets>Reset offsets shifting current offset by ‘n’, where ‘n’ can be positive or negative.重置偏移量将当前偏移量偏移“n”,其中“n”可以是正或负。
–state [String]When specified with ‘–describe’, includes the state of the group.
Example: --bootstrap-server localhost:9092 --describe --group group1 --state
When specified with ‘–list’, it displays the state of all groups. It can also be used to list groups with specific states.
Example: --bootstrap-server localhost:9092 --list --state stable,empty
This option may be used with ‘–describe’, ‘–list’ and ‘–bootstrap-server’ options only.
当使用“–describe”指定时,包括组的状态。
示例:–bootstrap-server localhost:9092 --describe --group group1 --state
当使用“–list”指定时,它将显示所有组的状态。它还可以用于列出具有特定状态的组。
示例: --bootstrap-server localhost:9092 --list --state stable,empty
此选项只能与“–description”、“–list”和“–bootstrap server”选项一起使用。
–timeout <Long: timeout (ms)>The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)可以为某些用例设置的超时。例如,在描述组时,可以使用它来指定在组稳定之前(当组刚刚创建或正在进行一些更改时)等待的最大时间(以毫秒为单位)。(默认值:5000)
–to-currentReset offsets to current offset.将偏移重置为当前偏移。
–to-datetime <String: datetime>Reset offsets to offset from datetime. Format: ‘YYYY-MM-DDTHH:mm:SS.sss’将偏移量重置为来自日期时间的偏移量。格式:‘YYYY-MM-DDTHH:mm:SS.sss’
–to-earliestReset offsets to earliest offset.将偏移重置为最早偏移。
–to-latestReset offsets to latest offset.将偏移重置为最新偏移。
–to-offset <Long: offset>Reset offsets to a specific offset.将偏移重置为特定偏移。
–topic <String: topic>The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. In reset-offsets case, partitions can be specified using this format: topic1:0,1,2, where 0,1,2 are the partition to be included in the process. Reset-offsets also supports multiple topic inputs.应删除其消费者组信息的主题或应将其包含在重置偏移过程中的主题。在reset-offsets的情况下,可以使用以下格式指定分区:topic1:0,1,2`,其中0,1,2是要包含在进程中的分区。重置偏移量还支持多个主题输入。
–verboseProvide additional information, if any, when describing the group. This option may be used with ‘–offsets’/‘–members’/‘–state’ and ‘–bootstrap-server’ options only.
Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose
在描述消费者组时,如有其他信息,则提供。此选项只能与“–offsets”/“–members”/“–state”和“–bootstrap-server”选项一起使用。
示例:–bootstrap-server localhost:9092 --describe --group group1 --members --verbose
–versionDisplay Kafka version.显示Kafka版本。

4.1 查看所有消费者组

kafka-consumer-groups.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --list

4.2 描述指定消费者组

kafka-consumer-groups.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --describe --group first_group

4.3 重置消费偏移

kafka-consumer-groups.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --group fist_group --topic first --reset-offsets --to-datetime 2023-10-08T00:00:00.000 -execute

文章来源:https://blog.csdn.net/weixin_43724577/article/details/134895328
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。