High-Level Clients
KafkaConsumer
from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition, JsonSerializer, DefaultSerializer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# Manually commit offsets (disable auto-commit)
consumer = KafkaConsumer('my-topic',
group_id='my-group',
enable_auto_commit=False,
bootstrap_servers=['localhost:9092'])
for message in consumer:
# process message
process_message(message)
# TopicPartition for this record
tp = TopicPartition(message.topic, message.partition)
# Note: When committing offsets manually, commit the next offset the consumer
# should read. For example, after successfully processing a message at
# offset 42, commit offset 43.
consumer.commit({
tp: OffsetAndMetadata(message.offset + 1, '', -1)
})
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=JsonSerializer())
# consume utf-8
KafkaConsumer(value_deserializer=DefaultSerializer())
# consume utf-16
KafkaConsumer(value_deserializer=DefaultSerializer('utf-16'))
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
# Use multiple consumers in parallel
# (run each on a different server / process / CPU)
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
There are many configuration options for the consumer class. See
KafkaConsumer API documentation for more details.
KafkaProducer
from kafka import KafkaProducer, JsonSerializer, DefaultSerializer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('keyed-topic', key=b'foo', value=b'bar')
# encode str with utf-8 encoding
producer = KafkaProducer(value_serializer=DefaultSerializer())
producer.send('utf-8-topic', 'value_str')
# encode str with utf-16 encoding
producer = KafkaProducer(value_serializer=DefaultSerializer('utf-16'))
producer.send('utf-16-topic', '懂不懂')
# produce json messages
producer = KafkaProducer(value_serializer=JsonSerializer())
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
KafkaAdminClient
from kafka import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers=['broker1:1234'])
# create topics with defaults (requires kafka 2.4+)
admin.create_topics(['testtopic1'], timeout_ms=None, validate_only=False)
# create a new topic with details
new_topics = {
'num_partitions': 1,
'replication_factor': 1,
'assignments': {0: [1]}, # assign partition 0 to broker id 1
'configs': {'max_message_bytes': '1000000'}, # set non-default configs
}
admin.create_topics(new_topics, timeout_ms=None, validate_only=False)
# delete a topic
admin.delete_topics(['testtopic1'])
# list consumer groups
print(admin.list_groups())
# get consumer group details
print(admin.describe_groups(['cft-plt-qa.connect']))
# get consumer group offset
print(admin.list_group_offsets(['cft-plt-qa.connect']))