Upgrading from 2.3 to 3.0

kafka-python 3.0 is a major release with several breaking changes. This guide walks through everything an application upgrading from 2.3 will typically need to look at, in roughly the order of how likely it is to matter.

Most applications using the public KafkaProducer / KafkaConsumer / KafkaAdminClient APIs at default settings should still work without code changes. The biggest change is that kafka-python no longer works on Python 2. Python 3.8 is now the minimum supported python interpreter. Other use cases that will need changes are: catching NoBrokersAvailableError (error removed), implementing a custom 'kafka_client' (internals have changed substantially), using a non-default Serializer / Deserializer, producer Partitioner, or consumer AbstractPartitionAssignor (minor abstract interface changes), or using the 'sasl_oauth_token_provider' configuration (import rename).

A full list of changes is in the Changelog. This page covers only the user-visible breaking changes and the most useful additions.

Python compatibility

Python 2 is no longer supported. 3.0 requires Python 3.8 or newer. If you still need Python 2.7, pin to kafka-python<3.0; the 2.3.x line remains compatible.

Configuration changes

Producer defaults

KafkaProducer now enables idempotence and full acks by default (KIP-679):

  • enable_idempotence now defaults to True (was False).

  • acks now effectively defaults to 'all' (-1) (was 1).

These together mean the producer waits for full ISR acknowledgment and deduplicates retries on the broker side, giving exactly-once-per-broker delivery semantics out of the box.

To restore 2.3 behavior, pass enable_idempotence=False and let acks default, or set acks=1 explicitly:

KafkaProducer(
    bootstrap_servers=...,
    enable_idempotence=False,
    acks=1,
)

If you explicitly pass acks=0 or acks=1 (or max_in_flight_requests_per_connection > 5) without also passing enable_idempotence=False, KafkaProducer raises KafkaConfigurationError rather than silently dropping idempotence.

Consumer session_timeout_ms

The default consumer session_timeout_ms is now 45000 (45s), up from 10000 (10s), tracking the upstream change in KIP-735. This reduces spurious rebalances under transient broker / network disruptions. Set session_timeout_ms=10000 to restore the old default.

api_version_auto_timeout_ms renamed to bootstrap_timeout_ms

The config previously named api_version_auto_timeout_ms is now bootstrap_timeout_ms, applies to the entire bootstrap process (not just the API-version probe), and defaults to 30000 (30s). The old name is no longer accepted.

sasl_oauth_token_provider abstract baseclass / kafka.sasl module moved

The kafka.sasl module has been moved to kafka.net.sasl. Users that have implemented an AbstractTokenProvider (or implemented a custom SASL mechanism) will need to modify imports:

from kafka.net.sasl.oauth import AbstractTokenProvider

buffer_memory removed

The buffer_memory config has been removed from KafkaProducer; the deprecation warning emitted in 2.x is now a hard error. Use max_request_size (per-request bound) and let the accumulator manage in-flight memory naturally.

Serializer / Deserializer interface

The Serializer and Deserializer abstract interfaces now receive headers alongside the topic and the payload:

# old
class MySerializer(Serializer):
    def serialize(self, topic, data):
        ...

# new
class MySerializer(Serializer):
    def serialize(self, topic, headers, data):
        ...

Old single-argument callables (or two-argument (topic, data) serializers) still work - KafkaProducer / KafkaConsumer wrap them automatically - but emit a DeprecationWarning. Update your classes to accept the headers arg.

Two helper classes are now shipped: DefaultSerializer (a UTF-8 serializer/deserializer) and JsonSerializer. Importable from kafka.serializer. Both serialize/deserialize None<->None to maintain expected key partitioning behavior.

Partitioner interface

The partitioner contract changed shape. In 2.3 a partitioner was a callable invoked as partitioner(serialized_key, all_partitions, available). In 3.0 partitioners are instances of the kafka.partitioner.Partitioner ABC with a partition(...) method that receives the topic, both the raw and serialized key / value, and the live cluster snapshot:

# old (2.3) - a callable
class MyPartitioner:
    def __call__(self, key, all_partitions, available):
        ...

# new (3.0)
from kafka.partitioner import Partitioner

class MyPartitioner(Partitioner):
    def partition(self, topic, key, serialized_key, value, serialized_value, cluster):
        partitions = sorted(cluster.partitions_for_topic(topic))
        available = list(cluster.available_partitions_for_topic(topic))
        ...

Legacy callables that match the old shape still work (with a DeprecationWarning); subclasses of the new ABC must implement the new signature.

The default partitioner is still DefaultPartitioner (murmur2 hash on the serialized key; null keys go to a random available partition) - unchanged routing behavior for callers using the default.

A sticky partitioner (KIP-480) is now shipped and is opt-in: for records with a null key, it sticks to one partition per topic until the current batch fills, then rotates. Enable it by passing an instance:

from kafka.partitioner import StickyPartitioner

KafkaProducer(
    bootstrap_servers=...,
    partitioner=StickyPartitioner(),
)

Keyed-record routing is unchanged under either partitioner.

Admin client API

Response shapes

Admin client methods now return plain dict / list structures derived from the protocol response, instead of namedtuples or custom response classes. If your code accesses fields by attribute (response.topics[0].name), switch to dict access (response['topics'][0]['name']).

create_topics / create_partitions

create_topics now accepts any of the following:

  • a list of topic name strings (uses broker defaults for partitions / replication on broker >= 2.4)

  • a dict {name: {num_partitions, replication_factor, assignments, configs}}

  • a list of NewTopic instances (deprecated)

The dict form is the recommended shape going forward. NewTopic and NewPartitions are still exported and still work, but emit deprecation warnings.

Group APIs renamed

The group management APIs were renamed and expanded; consult the KafkaAdminClient reference for the current method names. The most common 2.x names (describe_consumer_groups, list_consumer_groups, delete_consumer_groups) continue to work, but were renamed to describe_groups, list_groups, etc… New: list_group_offsets, list_group_members, reset_group_offsets (with extended options), remove_group_members.

Consumer API

partition_assignment_strategy

Assignor classes passed as partition_assignment_strategy are now always instantiated before use. If you have a custom AbstractPartitionAssignor class that uses @classmethod you will need to drop the decorators and update to instance method definitions.

Incremental cooperative rebalance

KIP-429 cooperative rebalancing is now supported. Existing ConsumerRebalanceListener callbacks (on_partitions_revoked / on_partitions_assigned) continue to work. New: on_partitions_lost hook, AsyncConsumerRebalanceListener base class for listeners that need to await rather than block, and RebalanceListener is now called on consumer.close().

Exceptions raised from a rebalance listener now propagate, instead of being swallowed. Audit any custom listener code for accidental raises.

Error hierarchy

  • KafkaError now subclasses Exception (was RuntimeError). Code that catches RuntimeError to handle kafka-python errors will no longer match; catch KafkaError (or Exception).

  • IncompatibleBrokerVersion is now a subclass of UnsupportedVersionError. Code catching either will keep working; code that distinguished them by type will not.

  • NoBrokersAvailableError has been removed. Connection-failure scenarios that previously raised it now raise KafkaTimeoutError (during bootstrap) or specific connection / authentication errors at send time.

  • KafkaProtocolError is no longer marked retriable; if you used err.retriable to drive retry loops, this changes behavior on protocol-level errors.

  • New base classes RetriableError and InvalidMetadataError are available for catching whole categories of errors at once.

Broker version checks

kafka-python now relies exclusively on ApiVersionsRequest to determine broker verions. For early brokers older than 0.10 you must now pass an explicit api_version in order to connect.

In addition the configuration api_version_auto_timeout_ms= which previously was used to manage timeouts during the broker version check process has been removed. It has been replaced with the more general bootstrap_timeout_ms (see above).

Removed and relocated internals

If your code imported from kafka-python internals (not the public top- level API), several modules have moved or been removed:

  • kafka.client_async and most of kafka.conn are removed. Their responsibilities now live in kafka.net (selector / event loop, connection pool, transport). A compatibility shim kafka.net.compat.KafkaNetClient exposes the legacy client.poll() / client.send() shape for callers that have not migrated.

  • Users that implemented a custom kafka_client will need to evaluate independently. Due to the migration to kafka.net most internals no longer directly rely on the kafka_client interface, instead using the simpler kafka.net.manager connection manager and kafka.net.selector IO event loop.

  • HeartbeatThread is gone; consumer heartbeats now run as an async def coroutine on a shared IO thread.

  • The hand-written protocol classes in kafka.protocol.* have been replaced by classes generated from the upstream Apache Kafka JSON schemas. The legacy hand-written protocol types are still available under kafka.protocol.old for the small number of places that reach into protocol internals.

Version probes for pre-0.10 brokers have been removed. kafka-python 3.0 always sends ApiVersionsRequest on connect and uses the result to negotiate per-API versions. Brokers older than 0.10 no longer auto-detect; pass api_version=(0, 9) (or your specific version) explicitly if you need to talk to one.

New conveniences

A few additions that may be useful:

  • Context managers. KafkaProducer, KafkaConsumer, and KafkaAdminClient all now support with syntax:

    with KafkaProducer(bootstrap_servers=...) as producer:
        producer.send('my-topic', b'hello')
    
  • CLI. A single kafka-python entry point wraps the admin / consumer / producer subcommands (also runnable as python -m kafka.admin etc.). See Command-Line Interface for examples.

  • New public exports. OffsetSpec and IsolationLevel are now importable directly from kafka.

  • HTTP CONNECT proxy support. Pass proxy_url='http://...' to any client to tunnel through an HTTP CONNECT proxy (RFC 7231 S4.3.6).

  • TCP keepalive is now enabled by default on broker sockets.

  • TLS minimum version is now TLS 1.2; the default SSLContext uses PROTOCOL_TLS_CLIENT.

Upgrade checklist

In order, the things most likely to bite an upgrading 2.3 application:

  1. Make sure you are using Python 3.8+.

  2. If you pass buffer_memory= to KafkaProducer, remove it.

  3. If you pass api_version_auto_timeout_ms=, rename to bootstrap_timeout_ms.

  4. If your application can’t tolerate idempotent / acks=all semantics - typically because you explicitly want acks=0 / acks=1 - pass enable_idempotence=False to KafkaProducer.

  5. If you catch NoBrokersAvailableError, replace it with KafkaTimeoutError.

  6. If you catch RuntimeError from kafka-python code, switch to KafkaError (or Exception).

  7. If you implement a custom Serializer / Deserializer, update the signature to (self, topic, headers, data).

  8. If you implement a custom Partitioner, update the signature to (self, topic, key, serialized_key, value, serialized_value, cluster).

  9. If you consume admin client responses by attribute access, switch to dict access.

  10. If you import from kafka.client_async or kafka.conn, migrate to kafka.net (or use the KafkaNetClient compat shim).