kafka.coordinator package¶
Subpackages¶
Submodules¶
kafka.coordinator.base module¶
-
class
kafka.coordinator.base.
BaseCoordinator
(client, metrics, **configs)[source]¶ Bases:
object
BaseCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. See ConsumerCoordinator for example usage.
From a high level, Kafka’s group management protocol consists of the following sequence of actions:
- Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
- Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
- State Assignment: The leader collects the metadata from all the members of the group and assigns state.
- Group Stabilization: Each member receives the state assigned by the leader and begins processing.
To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in
group_protocols()
and the format of the state assignment provided by the leader in_perform_assignment()
and which becomes available to members in_on_join_complete()
.-
DEFAULT_CONFIG
= {'api_version': (0, 9), 'group_id': 'kafka-python-default-group', 'heartbeat_interval_ms': 3000, 'metric_group_prefix': '', 'retry_backoff_ms': 100, 'session_timeout_ms': 30000}¶
-
close
()[source]¶ Close the coordinator, leave the current group, and reset local generation / member_id
-
coordinator_unknown
()[source]¶ Check if we know who the coordinator is and have an active connection
Side-effect: reset coordinator_id to None if connection failed
Returns: True if the coordinator is unknown Return type: bool
-
ensure_coordinator_known
()[source]¶ Block until the coordinator for this group is known (and we have an active connection – java client uses unsent queue).
-
group_protocols
()[source]¶ Return the list of supported group protocols and metadata.
This list is submitted by each group member via a JoinGroupRequest. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).
Note: metadata must be type bytes or support an encode() method
Returns: [(protocol, metadata), …] Return type: list
kafka.coordinator.consumer module¶
-
class
kafka.coordinator.consumer.
ConsumerCoordinator
(client, subscription, metrics, **configs)[source]¶ Bases:
kafka.coordinator.base.BaseCoordinator
This class manages the coordination process with the consumer coordinator.
-
DEFAULT_CONFIG
= {'api_version': (0, 9), 'assignors': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>, <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>), 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': <function <lambda> at 0x7f7766a70578>, 'enable_auto_commit': True, 'exclude_internal_topics': True, 'group_id': 'kafka-python-default-group', 'heartbeat_interval_ms': 3000, 'metric_group_prefix': 'consumer', 'retry_backoff_ms': 100, 'session_timeout_ms': 30000}¶
-
close
(autocommit=True)[source]¶ Close the coordinator, leave the current group, and reset local generation / member_id.
Keyword Arguments: autocommit (bool) – If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any pending consumed offsets prior to close. Default: True
-
commit_offsets_async
(offsets, callback=None)[source]¶ Commit specific offsets asynchronously.
Parameters: - (dict {TopicPartition (offsets) – OffsetAndMetadata}): what to commit
- callback (callable, optional) – called as callback(offsets, response) response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes.
Returns: indicating whether the commit was successful or not
Return type: Future
-
commit_offsets_sync
(offsets)[source]¶ Commit specific offsets synchronously.
This method will retry until the commit completes successfully or an unrecoverable error is encountered.
Parameters: (dict {TopicPartition (offsets) – OffsetAndMetadata}): what to commit Raises error on failure
-
fetch_committed_offsets
(partitions)[source]¶ Fetch the current committed offsets for specified partitions
Parameters: partitions (list of TopicPartition) – partitions to fetch Returns: {TopicPartition: OffsetAndMetadata} Return type: dict
-
need_rejoin
()[source]¶ Check whether the group should be rejoined
Returns: True if consumer should rejoin group, False otherwise Return type: bool
-