kafka-python admin

kafka-python admin exposes KafkaAdminClient operations as a command-line tool. Commands are grouped by the kind of resource they act on (topics, partitions, configs, …). Each group contains one or more subcommands.

Output is printed to stdout. --format raw (the default) uses pprint; --format json emits a single JSON document, useful for piping to jq or other tooling.

kafka-python admin -b localhost:9092 cluster describe
kafka-python admin -b localhost:9092 --format json topics list
python -m kafka.admin -b localhost:9092 topics create -t foo --num-partitions 3

Global options

Kafka Admin Client

usage: kafka-python admin [-h] [-b BOOTSTRAP_SERVERS] [-S SECURITY_PROTOCOL]
                          [-M SASL_MECHANISM] [-U SASL_USER]
                          [-P SASL_PASSWORD] [-l LOG_LEVEL] [-L ENABLE_LOGGER]
                          [-D DISABLE_LOGGER] [-C EXTRA_CONFIG]
                          [--format FORMAT]
                          GROUP ...

connection

-b, --bootstrap-servers

host:port for cluster bootstrap server. Can be provided multiple times.

-S, --security-protocol

PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

Default: 'PLAINTEXT'

-M, --sasl-mechanism

PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512

Default: 'PLAIN'

-U, --sasl-user
-P, --sasl-password

logging

-l, --log-level

logging level, passed to logging.basicConfig

Default: 'CRITICAL'

-L, --enable-logger

enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled

-D, --disable-logger

disable a specific logger. Can be provided multiple times.

extended

-C, --extra-config

additional configuration properties for client in “key=val” format. Can be provided multiple times.

output

--format

output format: raw|json

Default: 'raw'

Available command groups

GROUP

Possible choices: acls, cluster, configs, topics, partitions, groups, transactions, users

acls

usage: kafka-python admin acls [-h] COMMAND ...

Available commands

COMMAND

Possible choices: describe, create, delete

Sub-commands

describe

Describe Kafka ACLs

kafka-python admin acls describe [-h] [--principal PRINCIPAL] [--host HOST]
                                 [--operation {unknown,any,all,read,write,create,delete,alter,describe,cluster_action,describe_configs,alter_configs,idempotent_write,create_tokens,describe_tokens}]
                                 [--permission-type {unknown,any,deny,allow}]
                                 [--resource-type {unknown,any,topic,group,cluster,transactional_id,delegation_token,user}]
                                 [--resource-name RESOURCE_NAME]
                                 [--pattern-type {unknown,any,match,literal,prefixed}]
command options
--principal

ACL principal (e.g. User:alice)

--host

ACL host (default: match any)

--operation

Possible choices: unknown, any, all, read, write, create, delete, alter, describe, cluster_action, describe_configs, alter_configs, idempotent_write, create_tokens, describe_tokens

ACL operation (default: any)

Default: 'any'

--permission-type

Possible choices: unknown, any, deny, allow

ACL permission type (default: any)

Default: 'any'

--resource-type

Possible choices: unknown, any, topic, group, cluster, transactional_id, delegation_token, user

Resource type (default: any)

Default: 'any'

--resource-name

Resource name (* wildcards accepted)

--pattern-type

Possible choices: unknown, any, match, literal, prefixed

Resource pattern type (default: any)

Default: 'any'

create

Create Kafka ACLs

kafka-python admin acls create [-h] --principal PRINCIPAL [--host HOST]
                               --operation
                               {unknown,any,all,read,write,create,delete,alter,describe,cluster_action,describe_configs,alter_configs,idempotent_write,create_tokens,describe_tokens}
                               [--permission-type {unknown,any,deny,allow}]
                               --resource-type
                               {unknown,any,topic,group,cluster,transactional_id,delegation_token,user}
                               --resource-name RESOURCE_NAME
                               [--pattern-type {unknown,any,match,literal,prefixed}]
command options
--principal

ACL principal (e.g. User:alice)

--host

ACL host (default: *)

Default: '*'

--operation

Possible choices: unknown, any, all, read, write, create, delete, alter, describe, cluster_action, describe_configs, alter_configs, idempotent_write, create_tokens, describe_tokens

ACL operation

--permission-type

Possible choices: unknown, any, deny, allow

ACL permission type (default: allow)

Default: 'allow'

--resource-type

Possible choices: unknown, any, topic, group, cluster, transactional_id, delegation_token, user

Resource type

--resource-name

Resource name (* wildcards accepted)

--pattern-type

Possible choices: unknown, any, match, literal, prefixed

Resource pattern type (default: literal)

Default: 'literal'

delete

Delete Kafka ACLs

kafka-python admin acls delete [-h] [--principal PRINCIPAL] [--host HOST]
                               [--operation {unknown,any,all,read,write,create,delete,alter,describe,cluster_action,describe_configs,alter_configs,idempotent_write,create_tokens,describe_tokens}]
                               [--permission-type {unknown,any,deny,allow}]
                               [--resource-type {unknown,any,topic,group,cluster,transactional_id,delegation_token,user}]
                               [--resource-name RESOURCE_NAME]
                               [--pattern-type {unknown,any,match,literal,prefixed}]
command options
--principal

ACL principal (e.g. User:alice)

--host

ACL host (default: match any)

--operation

Possible choices: unknown, any, all, read, write, create, delete, alter, describe, cluster_action, describe_configs, alter_configs, idempotent_write, create_tokens, describe_tokens

ACL operation (default: any)

Default: 'any'

--permission-type

Possible choices: unknown, any, deny, allow

ACL permission type (default: any)

Default: 'any'

--resource-type

Possible choices: unknown, any, topic, group, cluster, transactional_id, delegation_token, user

Resource type (default: any)

Default: 'any'

--resource-name

Resource name (* wildcards accepted)

--pattern-type

Possible choices: unknown, any, match, literal, prefixed

Resource pattern type (default: any)

Default: 'any'

cluster

usage: kafka-python admin cluster [-h] COMMAND ...

Available commands

COMMAND

Possible choices: describe, describe-quorum, api-versions, broker-version, describe-features, update-features, describe-log-dirs, alter-log-dirs

Sub-commands

describe

Describe Kafka Cluster

kafka-python admin cluster describe [-h]

describe-quorum

Describe the KRaft metadata quorum

kafka-python admin cluster describe-quorum [-h]

api-versions

Get Supported Api Versions

kafka-python admin cluster api-versions [-h] [-k API_KEYS] [--raw]
command options
-k, --api-key
--raw

Default: False

broker-version

Get Version for Broker

kafka-python admin cluster broker-version [-h] --broker BROKER
command options
--broker

describe-features

Describe Features of Kafka Cluster

kafka-python admin cluster describe-features [-h] [-f FEATURES]
command options
-f, --feature

Show one or more specific features. If not provided, returns all features.

Default: []

update-features

Update Features of Kafka Cluster

kafka-python admin cluster update-features [-h] [-f FEATURES] [--downgrade]
                                           [--unsafe] [--timeout TIMEOUT]
                                           [--validate-only]
command options
-f, --feature

set feature=value

Default: []

--downgrade

Default: False

--unsafe

Default: False

--timeout

Default: 60

--validate-only

Default: False

describe-log-dirs

Get topic log directories and stats

kafka-python admin cluster describe-log-dirs [-h] [--broker BROKERS]
                                             [--topic TOPICS]
command options
--broker

Query specific broker(s)

--topic

Get data about specific topic(s)

alter-log-dirs

Move replicas between log directories on their hosting brokers

kafka-python admin cluster alter-log-dirs [-h] -a ASSIGNMENTS
command options
-a, --assignment

TOPIC:PARTITION:BROKER_ID=/absolute/log/dir/path (repeatable). Instructs BROKER_ID to move its replica of TOPIC:PARTITION into the given log directory.

Default: []

configs

usage: kafka-python admin configs [-h] COMMAND ...

Available commands

COMMAND

Possible choices: describe, alter, list, reset

Sub-commands

describe

Describe Kafka Configs

kafka-python admin configs describe [-h] -r RESOURCE_TYPE -n RESOURCE_NAMES
                                    [-c CONFIGS] [--dynamic] [--modified]
                                    [--static] [--default]
command options
-r, --resource-type

Type of resource to describe: topic, broker, broker_logger, client_metrics, group.

-n, --resource-name

Name of resource(s) to describe. May be repeated.

-c, --config
--dynamic

Default: False

--modified

Default: False

--static

Default: False

--default

Default: False

alter

Alter Kafka Configs

kafka-python admin configs alter [-h] -r RESOURCE_TYPE -n RESOURCE_NAMES -c
                                 CONFIGS [-v] [--allow-unknown]
                                 [--force-incremental | --force-alter]
command options
-r, --resource-type

Type of resource to describe: topic, broker, broker_logger, client_metrics, group.

-n, --resource-name

Name of resource(s) to describe. May be repeated.

-c, --config

key=value to alter

-v, --validate-only

Default: False

--allow-unknown

Default: True

--force-incremental
--force-alter

list

List config resources known to the cluster (requires broker >= 4.1 for non client_metrics types)

kafka-python admin configs list [-h] [-r RESOURCE_TYPES]
command options
-r, --resource-type

Filter by resource type (repeatable): topic, broker, broker_logger, client_metrics, group. Omit to list all supported types.

Default: []

reset

Reset Kafka Configs

kafka-python admin configs reset [-h] -r RESOURCE_TYPE -n RESOURCE_NAMES
                                 [-c CONFIGS] [-v] [--allow-unknown]
command options
-r, --resource-type

Type of resource to describe: topic, broker, broker_logger, client_metrics, group.

-n, --resource-name

Name of resource(s) to describe. May be repeated.

-c, --config

key to reset

Default: []

-v, --validate-only

Default: False

--allow-unknown

Default: True

topics

usage: kafka-python admin topics [-h] COMMAND ...

Available commands

COMMAND

Possible choices: list, describe, create, delete

Sub-commands

list

List Kafka Topics

kafka-python admin topics list [-h]

describe

Describe Kafka Topics

kafka-python admin topics describe [-h] [-t TOPICS] [--id TOPIC_IDS]
command options
-t, --topic

topic name

Default: []

--id

topic UUID (requires broker >= 2.8, KIP-516)

Default: []

create

Create a Kafka Topic

kafka-python admin topics create [-h] -t TOPIC
                                 [--num-partitions NUM_PARTITIONS]
                                 [--replication-factor REPLICATION_FACTOR]
command options
-t, --topic
--num-partitions

Default: -1

--replication-factor

Default: -1

delete

Delete Kafka Topic

kafka-python admin topics delete [-h] [-t TOPICS] [--id TOPIC_IDS]
command options
-t, --topic

topic name

Default: []

--id

topic UUID

Default: []

partitions

usage: kafka-python admin partitions [-h] COMMAND ...

Available commands

COMMAND

Possible choices: create, describe, list-offsets, list-reassignments, alter-reassignments, delete-records, elect-leaders

Sub-commands

create

Create additional partitions for existing topics

kafka-python admin partitions create [-h] -p TOPIC_PARTITIONS
                                     [--timeout-ms TIMEOUT_MS]
                                     [--validate-only]
command options
-p, --topic-partitions

TOPIC:TOTAL_PARTITION_COUNT pair (repeatable)

Default: []

--timeout-ms

Request timeout in milliseconds

--validate-only

Validate the request without actually creating partitions

Default: False

describe

Describe topic partitions with pagination (KIP-966, broker >=3.9)

kafka-python admin partitions describe [-h] -t TOPICS
                                       [--response-partition-limit RESPONSE_PARTITION_LIMIT]
                                       [--cursor-topic CURSOR_TOPIC]
                                       [--cursor-partition CURSOR_PARTITION]
command options
-t, --topic

Topic to describe (repeatable)

Default: []

--response-partition-limit

Maximum number of partitions to include in the response (default: 2000)

Default: 2000

--cursor-topic

Topic name to start pagination from

--cursor-partition

Partition index to start pagination from

list-offsets

List offsets for partitions by spec (earliest/latest/timestamp)

kafka-python admin partitions list-offsets [-h] [-t TOPIC] [-s SPEC]
                                           [-p PARTITIONS]
command options
-t, --topic
-s, --spec

Spec may be one of earliest, latest, max-timestamp, earliest-local, latest-tiered, or a millisecond timestamp.

-p, --partition

TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a single partition, a closed range (0-2), an open range (1-), or a single wildcard “*” for all partitions. SPEC may be one of earliest, latest, max-timestamp, earliest-local, latest-tiered, or a millisecond timestamp.

Default: []

list-reassignments

List the current ongoing partition reassignments

kafka-python admin partitions list-reassignments [-h] [-p TOPIC_PARTITIONS]
                                                 [--timeout-ms TIMEOUT_MS]
command options
-p, --topic-partition

TOPIC:PARTITION pair (repeatable). Omit to list reassignments for all partitions.

Default: []

--timeout-ms

Request timeout in milliseconds

alter-reassignments

Alter replica assignments for partitions

kafka-python admin partitions alter-reassignments [-h] -r REASSIGNMENTS
                                                  [--timeout-ms TIMEOUT_MS]
command options
-r, --reassign

TOPIC:PARTITION=BROKER_ID[,BROKER_ID…] to set a new replica set, or TOPIC:PARTITION=cancel to cancel an in-progress reassignment for that partition. Repeatable.

Default: []

--timeout-ms

Request timeout in milliseconds

delete-records

Delete records from partitions up to a given offset

kafka-python admin partitions delete-records [-h] -r RECORDS
                                             [--timeout-ms TIMEOUT_MS]
                                             [--partition-leader-id PARTITION_LEADER_ID]
command options
-r, --record

TOPIC:PARTITION:OFFSET triple (repeatable). Use -1 as OFFSET to delete up to the current high-water mark.

Default: []

--timeout-ms

Request timeout in milliseconds

--partition-leader-id

Send all delete requests to this broker id, skipping metadata lookup

elect-leaders

Trigger leader election for partitions

kafka-python admin partitions elect-leaders [-h]
                                            [--election-type {preferred,unclean}]
                                            [-p TOPIC_PARTITIONS] [-t TOPICS]
                                            [--timeout-ms TIMEOUT_MS]
                                            [--no-raise-errors]
command options
--election-type

Possible choices: preferred, unclean

Election type (default: preferred)

Default: 'preferred'

-p, --topic-partition

TOPIC:PARTITION pair (repeatable). Omit to elect leaders for all partitions of all topics.

Default: []

-t, --topic

Elect leaders for all partitions of TOPIC (repeatable). Mutually exclusive with –topic-partition.

Default: []

--timeout-ms

Request timeout in milliseconds

--no-raise-errors

Do not raise on partition-level errors; return the response instead

Default: True

groups

usage: kafka-python admin groups [-h] COMMAND ...

Available commands

COMMAND

Possible choices: list, describe, delete, list-offsets, alter-offsets, reset-offsets, delete-offsets, remove-members

Sub-commands

list

List Groups

kafka-python admin groups list [-h] [--state STATES_FILTER]
                               [--type TYPES_FILTER]
command options
--state

Filter by group state (repeatable). One of: Assigning, CompletingRebalance, Dead, Empty, PreparingRebalance, Reconciling, Stable, Unknown. Case-insensitive; names also accepted. Requires broker >= 3.0 (KIP-518).

Default: []

--type

Filter by group type (repeatable). One of: Unknown, classic, consumer, share. Requires broker >= 4.0 (KIP-848).

Default: []

describe

Describe Groups

kafka-python admin groups describe [-h] -g GROUPS
command options
-g, --group-id

delete

Delete Groups

kafka-python admin groups delete [-h] -g GROUPS
command options
-g, --group-id

list-offsets

List Offsets for Group

kafka-python admin groups list-offsets [-h] -g GROUP_ID
command options
-g, --group-id

alter-offsets

Alter committed offsets for a consumer group

kafka-python admin groups alter-offsets [-h] -g GROUP_ID -o OFFSETS
                                        [--group-coordinator-id GROUP_COORDINATOR_ID]
command options
-g, --group-id
-o, --offset

TOPIC:PARTITION:OFFSET triple (repeatable).

Default: []

--group-coordinator-id

Send directly to this broker id, skipping coordinator lookup

reset-offsets

Reset committed offsets for a consumer group

kafka-python admin groups reset-offsets [-h] -g GROUP_ID [-p PARTITIONS]
                                        (-s SPEC | --to-offset TO_OFFSET | --shift-by SHIFT_BY | --by-duration BY_DURATION | --to-datetime TO_DATETIME | --to-current)
command options
-g, --group-id
-p, --partition

TOPIC:PARTITION pair (repeatable). Scopes the reset to these partitions. If omitted, the reset applies to every partition currently committed by the group.

Default: []

-s, --spec

Spec may be one of earliest, latest, max-timestamp, earliest-local, latest-tiered, or a millisecond timestamp.

--to-offset

Reset all in-scope partitions to this explicit offset (clamped to [earliest, latest]).

--shift-by

Shift each in-scope committed offset by N positions (may be negative); clamped to [earliest, latest]. Requires a current commit for each in-scope partition.

--by-duration

Reset to the offset at (now - DURATION). DURATION is ISO-8601, e.g. P7D, PT1H, PT30M.

--to-datetime

Reset to the offset at the given ISO-8601 datetime (UTC assumed if no tz offset is provided).

--to-current

Re-commit the current committed offsets, clamped to [earliest, latest]. Useful to heal out-of-range offsets. Requires a current commit for each in-scope partition.

Default: False

delete-offsets

Delete committed offsets for a consumer group

kafka-python admin groups delete-offsets [-h] -g GROUP_ID -p PARTITIONS
                                         [--group-coordinator-id GROUP_COORDINATOR_ID]
command options
-g, --group-id
-p, --partition

TOPIC:PARTITION pair (repeatable).

Default: []

--group-coordinator-id

Send directly to this broker id, skipping coordinator lookup

remove-members

Remove members from a consumer group

kafka-python admin groups remove-members [-h] -g GROUP_ID [-m MEMBER_IDS]
                                         [-i GROUP_INSTANCE_IDS]
                                         [--reason REASON]
                                         [--group-coordinator-id GROUP_COORDINATOR_ID]
command options
-g, --group-id
-m, --member-id

Dynamic member id to remove (repeatable).

Default: []

-i, --group-instance-id

Static group.instance.id to remove (repeatable; requires broker LeaveGroup v3+).

Default: []

--reason

Optional reason; sent to broker on LeaveGroup v5+ (KIP-800). Applied to all members.

--group-coordinator-id

Send directly to this broker id, skipping coordinator lookup

transactions

KIP-664 administrative tools for inspecting and recovering from hanging transactions. list, describe, and describe-producers require broker >= 3.0 (describe-producers works against broker >= 2.8).

usage: kafka-python admin transactions [-h] COMMAND ...

Available commands

COMMAND

Possible choices: list, describe, describe-producers, find-hanging, abort

Sub-commands

list

List transactions across all brokers (broker >= 3.0)

kafka-python admin transactions list [-h] [--broker-id BROKER_IDS]
                                     [--producer-id PRODUCER_ID_FILTERS]
                                     [--state STATE_FILTERS]
                                     [--duration-filter-ms DURATION_FILTER_MS]
                                     [--id-pattern TRANSACTIONAL_ID_PATTERN]
command options
--broker-id

Query only these brokers (repeatable). Default: all brokers.

--producer-id

Only list transactions for these producer ids (repeatable).

--state

Only list transactions in these states (repeatable). Valid values: Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence.

--duration-filter-ms

Only list transactions running longer than this. Requires broker >= 3.8 (ListTransactions v1+).

--id-pattern

Only list transactions whose id matches this regex. Requires broker >= 4.1 (KIP-1152).

describe

Describe one or more transactional ids (broker >= 3.0)

kafka-python admin transactions describe [-h] --transactional-id
                                         TRANSACTIONAL_IDS
command options
--transactional-id

Transactional id to describe (repeatable).

describe-producers

Describe active producers on a partition (broker >= 2.8)

kafka-python admin transactions describe-producers [-h] -t TOPIC -p PARTITIONS
                                                   [--broker-id BROKER_ID]
command options
-t, --topic

Topic name.

-p, --partition

Partition index (repeatable).

--broker-id

Send to this replica instead of the partition leader.

find-hanging

Find transactions older than the broker timeout + 5 min

kafka-python admin transactions find-hanging [-h] [--broker-id BROKER_IDS]
                                             [--max-transaction-timeout-ms MAX_TRANSACTION_TIMEOUT_MS]
command options
--broker-id

Query only these brokers (repeatable). Default: all.

--max-transaction-timeout-ms

Broker’s max-transaction-timeout (ms). Transactions older than this + 5 min are flagged as hanging. Default: 900000 (Kafka default).

Default: 900000

abort

Administratively abort an open transaction on a partition

kafka-python admin transactions abort [-h] -t TOPIC -p PARTITION --producer-id
                                      PRODUCER_ID --producer-epoch
                                      PRODUCER_EPOCH
                                      [--coordinator-epoch COORDINATOR_EPOCH]
command options
-t, --topic

Topic name.

-p, --partition

Partition index.

--producer-id

Producer id of the hanging transaction.

--producer-epoch

Producer epoch of the hanging transaction.

--coordinator-epoch

Coordinator epoch (default: -1, the admin sentinel).

Default: -1

users

SCRAM credential management. See KIP-554.

usage: kafka-python admin users [-h] COMMAND ...

Available commands

COMMAND

Possible choices: describe-scram-credentials, alter-scram-credentials

Sub-commands

describe-scram-credentials

Describe SCRAM credentials for Kafka users

kafka-python admin users describe-scram-credentials [-h] [--user USERS]
command options
--user

User name to describe (repeatable). If omitted, describes all users.

Default: []

alter-scram-credentials

Alter SCRAM credentials for Kafka users

kafka-python admin users alter-scram-credentials [-h] [--delete DELETIONS]
                                                 [--upsert UPSERTIONS]
                                                 [--iterations ITERATIONS]
command options
--delete

USER:MECHANISM pair to delete (e.g. alice:SCRAM-SHA-256)

Default: []

--upsert

USER:MECHANISM:PASSWORD triple to insert or update

Default: []

--iterations

PBKDF2 iteration count for upsertions (default: 4096)