kafka-python ############ .. image:: https://img.shields.io/pypi/v/kafka-python.svg :target: https://pypi.org/project/kafka-python .. image:: https://img.shields.io/badge/kafka-4.3--0.8-brightgreen.svg :target: https://kafka-python.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE .. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg :target: https://pypi.python.org/pypi/kafka-python kafka-python is a pure-python client library for Apache Kafka, the distributed stream processing engine. It has no external dependencies and no Cython/C/rust core, making installation across a wide variety of environments simple and easy to manage. It provides high-level class components for consumer, producer, and admin clients, as well as CLI scripts for quick interactive tasks. ``kafka-python admin`` serves as a simple alternative to the apache kafka bin/ scripts, particularly if/when you do not have easy access to an installed/compatible jvm. The CLI interface for admin commands is provided as ``kafka-python admin`` and ``python -m kafka.admin``. Users looking to add more raw throughput can ``pip install crc32c`` as an optional dependency, offloading one of the most CPU intensive subsystems to an optimized C library. .. code-block:: bash pip install kafka-python # callable as module or as cli-script kafka-python admin -b localhost:9092 cluster describe # Create a topic with the admin cli python -m kafka.admin -b localhost:9092 topics create -t foo-topic # Produce messages echo "foo message" | python -m kafka.producer -b localhost:9092 -t foo-topic # Consume messages python -m kafka.consumer -b localhost:9092 -C auto_offset_reset=earliest -C consumer_timeout_ms=1000 -g foo-group -t foo-topic What's New in 3.0 ***************** - Protocol Stack dynamically generated from Apache Kafka json message schemas. - Encode/decode performance optimizations with compiled/cached python bytecode. - Expanded KIP feature support, including Cooperative Rebalance (KIP-429), Rack-aware Fetch (KIP-392), Log-Truncation detection (KIP-320), Transactional Producer improvements (KIP-360, KIP-447, KIP-654), Sticky Partitioner (KIP-480), and splittting oversized producer batches (KIP-126). - Full refactor and expansion of KafkaAdminClient. - Networking changes to leverage kafka.net event-loop and async/await syntax. - Python 3.8+ required KafkaConsumer ************* :class:`~kafka.KafkaConsumer` is a high-level message consumer, intended to operate as similarly as possible to the official java client. See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: .. code-block:: python from kafka import KafkaConsumer consumer = KafkaConsumer('my_favorite_topic') for msg in consumer: print (msg) .. code-block:: python # join a consumer group for dynamic partition assignment and offset commits from kafka import KafkaConsumer consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') for msg in consumer: print (msg) .. code-block:: python # manually assign the partition list for the consumer from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer(bootstrap_servers='localhost:1234') consumer.assign([TopicPartition('foobar', 2)]) msg = next(consumer) Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a ``value_deserializer`` to automatically decode into something else. Helpers are available for simple utf-8 string decoding (``DefaultSerializer``) and json (``JsonSerializer``). .. code-block:: python # Deserialize json-encoded values from kafka import KafkaConsumer, JsonSerializer consumer = KafkaConsumer(value_deserializer=JsonSerializer()) consumer.subscribe(['json-foo']) for msg in consumer: assert isinstance(msg.value, dict) .. code-block:: python # Access record headers. The returned value is a list of # (str, bytes) tuples, representing the header key and value. for msg in consumer: print (msg.headers) .. code-block:: python # Read only committed messages from transactional topic from kafka import KafkaConsumer, IsolationLevel consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED) consumer.subscribe(['txn_topic']) for msg in consumer: print(msg) .. code-block:: python # Get consumer metrics metrics = consumer.metrics() KafkaProducer ************* :class:`~kafka.KafkaProducer` is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See `KafkaProducer `_ for more details. .. code-block:: python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:1234') for _ in range(100): # Fire-and-forget: send() is async and returns before delivery producer.send('foobar', b'some_message_bytes') .. code-block:: python # To check the status of an async message delivery, use .get() future = producer.send('foobar', b'another_message') # future.get() will block until it can return the result or raise on error result = future.get(timeout=60) .. code-block:: python # Block until all pending messages are at least put on the network # NOTE: This does not guarantee delivery or success! It is really # only useful if you configure internal batching using linger_ms producer.flush() Message keys are used to hash messages with the same key to the same partition. Both keys and values should be raw bytes unless a serializer is configured. .. code-block:: python # Use a key for hashed-partitioning producer.send('foobar', key=b'foo', value=b'bar') .. code-block:: python # Serialize json messages from kafka import KafkaProducer, JsonSerializer producer = KafkaProducer(value_serializer=JsonSerializer()) producer.send('fizzbuzz', {'foo': 'bar'}) .. code-block:: python # Serialize string keys from kafka import KafkaProducer, DefaultSerializer producer = KafkaProducer(key_serializer=DefaultSerializer()) producer.send('flipflap', key='ping', value=b'1234') Compression can be used to reduce message size on the wire. Gzip is supported via python stdlib. For other compression types you must install optional dependencies. .. code-block:: python # Compress messages producer = KafkaProducer(compression_type='gzip') for i in range(1000): producer.send('foobar', b'msg %d' % i) KafkaProducer also supports transactions and message headers when needed. .. code-block:: python # Use transactions producer = KafkaProducer(transactional_id='fizzbuzz') producer.init_transactions() producer.begin_transaction() future = producer.send('txn_topic', value=b'yes') future.get() # wait for successful produce producer.commit_transaction() # commit the transaction producer.begin_transaction() future = producer.send('txn_topic', value=b'no') future.get() # wait for successful produce producer.abort_transaction() # abort the transaction .. code-block:: python # Include record headers. The format is list of tuples with string key # and bytes value. producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) .. code-block:: python # Get producer performance metrics metrics = producer.metrics() Module CLI Interface ******************** kafka-python also provides simple command-line interfaces for consumer, producer, and admin clients. Access via ``python -m kafka.consumer``, ``python -m kafka.producer``, and ``python -m kafka.admin``. See `CLI `_ for the full reference. Compression *********** kafka-python supports the following compression formats: - gzip (via stdlib) - LZ4 (via `python-lz4`, `lz4tools`, or `py-lz4framed`) - Snappy (via `python-snappy`) - Zstandard (via `python-zstandard`) gzip is supported natively, the others require installing additional libraries. See `Install `_ for more information. Optimized CRC32 Validation ************************** Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure python implementation for compatibility. To improve performance for high-throughput applications, kafka-python will use `crc32c` for optimized native code if installed. See `Install `_ for installation instructions. See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. Protocol ******** A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. In version 3.0 the protocol layer was re-written to generate encoder/decoder classes using json message definitions imported directly from the Apache Kafka project source. Debugging ********* Use python's `logging` module to view internal operational events. See https://docs.python.org/3/howto/logging.html for overview / howto. .. code-block:: python import logging logging.basicConfig(level=logging.DEBUG) .. toctree:: :hidden: :maxdepth: 2 High-level Clients Command Line Interface API install tests compatibility Supported KIPs support license Upgrading to 3.0 changelog