Upgrading from 2.3 to 3.0
kafka-python 3.0 is a major release with several breaking changes. This guide walks through everything an application upgrading from 2.3 will typically need to look at, in roughly the order of how likely it is to matter.
Most applications using the public KafkaProducer / KafkaConsumer /
KafkaAdminClient APIs at default settings should still work without
code changes. The biggest change is that kafka-python no longer works on
Python 2. Python 3.8 is now the minimum supported python interpreter.
Other use cases that will need changes are: catching
NoBrokersAvailableError (error removed), implementing a custom
'kafka_client' (internals have changed substantially), using a
non-default Serializer / Deserializer, producer Partitioner,
or consumer AbstractPartitionAssignor (minor abstract interface changes),
or using the 'sasl_oauth_token_provider' configuration (import rename).
A full list of changes is in the Changelog. This page covers only the user-visible breaking changes and the most useful additions.
Python compatibility
Python 2 is no longer supported. 3.0 requires Python 3.8 or newer. If
you still need Python 2.7, pin to kafka-python<3.0; the 2.3.x line
remains compatible.
Configuration changes
Producer defaults
KafkaProducer now enables idempotence and full acks by default
(KIP-679):
enable_idempotencenow defaults toTrue(wasFalse).acksnow effectively defaults to'all'(-1) (was1).
These together mean the producer waits for full ISR acknowledgment and deduplicates retries on the broker side, giving exactly-once-per-broker delivery semantics out of the box.
To restore 2.3 behavior, pass enable_idempotence=False and let
acks default, or set acks=1 explicitly:
KafkaProducer(
bootstrap_servers=...,
enable_idempotence=False,
acks=1,
)
If you explicitly pass acks=0 or acks=1 (or
max_in_flight_requests_per_connection > 5) without also passing
enable_idempotence=False, KafkaProducer raises
KafkaConfigurationError rather than silently dropping idempotence.
Consumer session_timeout_ms
The default consumer session_timeout_ms is now 45000 (45s),
up from 10000 (10s), tracking the upstream change in
KIP-735.
This reduces spurious rebalances under transient broker / network
disruptions. Set session_timeout_ms=10000 to restore the old
default.
api_version_auto_timeout_ms renamed to bootstrap_timeout_ms
The config previously named api_version_auto_timeout_ms is now
bootstrap_timeout_ms, applies to the entire bootstrap process
(not just the API-version probe), and defaults to 30000 (30s).
The old name is no longer accepted.
sasl_oauth_token_provider abstract baseclass / kafka.sasl module moved
The kafka.sasl module has been moved to kafka.net.sasl. Users that have
implemented an AbstractTokenProvider (or implemented a custom SASL mechanism)
will need to modify imports:
from kafka.net.sasl.oauth import AbstractTokenProvider
buffer_memory removed
The buffer_memory config has been removed from KafkaProducer;
the deprecation warning emitted in 2.x is now a hard error. Use
max_request_size (per-request bound) and let the accumulator manage
in-flight memory naturally.
Serializer / Deserializer interface
The Serializer and Deserializer abstract interfaces now receive
headers alongside the topic and the payload:
# old
class MySerializer(Serializer):
def serialize(self, topic, data):
...
# new
class MySerializer(Serializer):
def serialize(self, topic, headers, data):
...
Old single-argument callables (or two-argument (topic, data)
serializers) still work - KafkaProducer / KafkaConsumer wrap
them automatically - but emit a DeprecationWarning. Update your
classes to accept the headers arg.
Two helper classes are now shipped: DefaultSerializer (a UTF-8
serializer/deserializer) and JsonSerializer. Importable from
kafka.serializer. Both serialize/deserialize None<->None to
maintain expected key partitioning behavior.
Partitioner interface
The partitioner contract changed shape. In 2.3 a partitioner was a
callable invoked as partitioner(serialized_key, all_partitions, available).
In 3.0 partitioners are instances of the kafka.partitioner.Partitioner
ABC with a partition(...) method that receives the topic, both the
raw and serialized key / value, and the live cluster snapshot:
# old (2.3) - a callable
class MyPartitioner:
def __call__(self, key, all_partitions, available):
...
# new (3.0)
from kafka.partitioner import Partitioner
class MyPartitioner(Partitioner):
def partition(self, topic, key, serialized_key, value, serialized_value, cluster):
partitions = sorted(cluster.partitions_for_topic(topic))
available = list(cluster.available_partitions_for_topic(topic))
...
Legacy callables that match the old shape still work (with a
DeprecationWarning); subclasses of the new ABC must implement the
new signature.
The default partitioner is still DefaultPartitioner (murmur2 hash
on the serialized key; null keys go to a random available partition) -
unchanged routing behavior for callers using the default.
A sticky partitioner (KIP-480) is now shipped and is opt-in: for records with a null key, it sticks to one partition per topic until the current batch fills, then rotates. Enable it by passing an instance:
from kafka.partitioner import StickyPartitioner
KafkaProducer(
bootstrap_servers=...,
partitioner=StickyPartitioner(),
)
Keyed-record routing is unchanged under either partitioner.
Admin client API
Response shapes
Admin client methods now return plain dict / list structures
derived from the protocol response, instead of namedtuples or custom
response classes. If your code accesses fields by attribute
(response.topics[0].name), switch to dict access
(response['topics'][0]['name']).
create_topics / create_partitions
create_topics now accepts any of the following:
a list of topic name strings (uses broker defaults for partitions / replication on broker >= 2.4)
a dict
{name: {num_partitions, replication_factor, assignments, configs}}a list of
NewTopicinstances (deprecated)
The dict form is the recommended shape going forward. NewTopic and
NewPartitions are still exported and still work, but emit
deprecation warnings.
Group APIs renamed
The group management APIs were renamed and expanded; consult the
KafkaAdminClient reference for the current method names.
The most common 2.x names (describe_consumer_groups,
list_consumer_groups, delete_consumer_groups) continue to work,
but were renamed to describe_groups, list_groups, etc…
New: list_group_offsets, list_group_members,
reset_group_offsets (with extended options),
remove_group_members.
Consumer API
partition_assignment_strategy
Assignor classes passed as partition_assignment_strategy are now
always instantiated before use. If you have a custom AbstractPartitionAssignor
class that uses @classmethod you will need to drop the decorators and update
to instance method definitions.
Incremental cooperative rebalance
KIP-429 cooperative rebalancing is now supported. Existing
ConsumerRebalanceListener callbacks (on_partitions_revoked /
on_partitions_assigned) continue to work. New:
on_partitions_lost hook, AsyncConsumerRebalanceListener base
class for listeners that need to await rather than block, and
RebalanceListener is now called on consumer.close().
Exceptions raised from a rebalance listener now propagate, instead of being swallowed. Audit any custom listener code for accidental raises.
Error hierarchy
KafkaErrornow subclassesException(wasRuntimeError). Code that catchesRuntimeErrorto handle kafka-python errors will no longer match; catchKafkaError(orException).IncompatibleBrokerVersionis now a subclass ofUnsupportedVersionError. Code catching either will keep working; code that distinguished them by type will not.NoBrokersAvailableErrorhas been removed. Connection-failure scenarios that previously raised it now raiseKafkaTimeoutError(during bootstrap) or specific connection / authentication errors at send time.KafkaProtocolErroris no longer marked retriable; if you usederr.retriableto drive retry loops, this changes behavior on protocol-level errors.New base classes
RetriableErrorandInvalidMetadataErrorare available for catching whole categories of errors at once.
Broker version checks
kafka-python now relies exclusively on ApiVersionsRequest to determine
broker verions. For early brokers older than 0.10 you must now pass
an explicit api_version in order to connect.
In addition the configuration api_version_auto_timeout_ms= which
previously was used to manage timeouts during the broker version check
process has been removed. It has been replaced with the more general
bootstrap_timeout_ms (see above).
Removed and relocated internals
If your code imported from kafka-python internals (not the public top- level API), several modules have moved or been removed:
kafka.client_asyncand most ofkafka.connare removed. Their responsibilities now live inkafka.net(selector / event loop, connection pool, transport). A compatibility shimkafka.net.compat.KafkaNetClientexposes the legacyclient.poll()/client.send()shape for callers that have not migrated.Users that implemented a custom
kafka_clientwill need to evaluate independently. Due to the migration tokafka.netmost internals no longer directly rely on thekafka_clientinterface, instead using the simplerkafka.net.managerconnection manager andkafka.net.selectorIO event loop.HeartbeatThreadis gone; consumer heartbeats now run as anasync defcoroutine on a shared IO thread.The hand-written protocol classes in
kafka.protocol.*have been replaced by classes generated from the upstream Apache Kafka JSON schemas. The legacy hand-written protocol types are still available underkafka.protocol.oldfor the small number of places that reach into protocol internals.
Version probes for pre-0.10 brokers have been removed. kafka-python
3.0 always sends ApiVersionsRequest on connect and uses the result
to negotiate per-API versions. Brokers older than 0.10 no longer
auto-detect; pass api_version=(0, 9) (or your specific version)
explicitly if you need to talk to one.
New conveniences
A few additions that may be useful:
Context managers.
KafkaProducer,KafkaConsumer, andKafkaAdminClientall now supportwithsyntax:with KafkaProducer(bootstrap_servers=...) as producer: producer.send('my-topic', b'hello')
CLI. A single
kafka-pythonentry point wraps the admin / consumer / producer subcommands (also runnable aspython -m kafka.adminetc.). See Command-Line Interface for examples.New public exports.
OffsetSpecandIsolationLevelare now importable directly fromkafka.HTTP CONNECT proxy support. Pass
proxy_url='http://...'to any client to tunnel through an HTTP CONNECT proxy (RFC 7231 S4.3.6).TCP keepalive is now enabled by default on broker sockets.
TLS minimum version is now TLS 1.2; the default
SSLContextusesPROTOCOL_TLS_CLIENT.
Upgrade checklist
In order, the things most likely to bite an upgrading 2.3 application:
Make sure you are using Python 3.8+.
If you pass
buffer_memory=toKafkaProducer, remove it.If you pass
api_version_auto_timeout_ms=, rename tobootstrap_timeout_ms.If your application can’t tolerate idempotent /
acks=allsemantics - typically because you explicitly wantacks=0/acks=1- passenable_idempotence=FalsetoKafkaProducer.If you catch
NoBrokersAvailableError, replace it withKafkaTimeoutError.If you catch
RuntimeErrorfrom kafka-python code, switch toKafkaError(orException).If you implement a custom
Serializer/Deserializer, update the signature to(self, topic, headers, data).If you implement a custom
Partitioner, update the signature to(self, topic, key, serialized_key, value, serialized_value, cluster).If you consume admin client responses by attribute access, switch to dict access.
If you import from
kafka.client_asyncorkafka.conn, migrate tokafka.net(or use theKafkaNetClientcompat shim).