kafka.coordinator package

Submodules

kafka.coordinator.base module

class kafka.coordinator.base.BaseCoordinator(client, metrics, **configs)[source]

Bases: object

BaseCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. See ConsumerCoordinator for example usage.

From a high level, Kafka’s group management protocol consists of the following sequence of actions:

  1. Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
  2. Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
  3. State Assignment: The leader collects the metadata from all the members of the group and assigns state.
  4. Group Stabilization: Each member receives the state assigned by the leader and begins processing.

To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in group_protocols() and the format of the state assignment provided by the leader in _perform_assignment() and which becomes available to members in _on_join_complete().

DEFAULT_CONFIG = {'metric_group_prefix': '', 'session_timeout_ms': 30000, 'retry_backoff_ms': 100, 'heartbeat_interval_ms': 3000, 'group_id': 'kafka-python-default-group', 'api_version': (0, 9)}
close()[source]

Close the coordinator, leave the current group, and reset local generation / member_id

coordinator_dead(error)[source]

Mark the current coordinator as dead.

coordinator_unknown()[source]

Check if we know who the coordinator is and have an active connection

Side-effect: reset coordinator_id to None if connection failed

Returns:True if the coordinator is unknown
Return type:bool
ensure_active_group()[source]

Ensure that the group is active (i.e. joined and synced)

ensure_coordinator_known()[source]

Block until the coordinator for this group is known (and we have an active connection – java client uses unsent queue).

group_protocols()[source]

Return the list of supported group protocols and metadata.

This list is submitted by each group member via a JoinGroupRequest. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).

Note: metadata must be type bytes or support an encode() method

Returns:[(protocol, metadata), ...]
Return type:list
need_rejoin()[source]

Check whether the group should be rejoined (e.g. if metadata changes)

Returns:True if it should, False otherwise
Return type:bool
protocol_type()[source]

Unique identifier for the class of protocols implements (e.g. “consumer” or “connect”).

Returns:protocol type name
Return type:str
class kafka.coordinator.base.GroupCoordinatorMetrics(heartbeat, metrics, prefix, tags=None)[source]

Bases: object

class kafka.coordinator.base.HeartbeatTask(coordinator)[source]

Bases: object

disable()[source]
reset()[source]

kafka.coordinator.consumer module

class kafka.coordinator.consumer.AutoCommitTask(coordinator, interval)[source]

Bases: object

reschedule(at=None)[source]
class kafka.coordinator.consumer.ConsumerCoordinator(client, subscription, metrics, **configs)[source]

Bases: kafka.coordinator.base.BaseCoordinator

This class manages the coordination process with the consumer coordinator.

DEFAULT_CONFIG = {'assignors': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>, <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>), 'metric_group_prefix': 'consumer', 'exclude_internal_topics': True, 'session_timeout_ms': 30000, 'auto_commit_interval_ms': 5000, 'retry_backoff_ms': 100, 'default_offset_commit_callback': <function <lambda>>, 'enable_auto_commit': True, 'heartbeat_interval_ms': 3000, 'group_id': 'kafka-python-default-group', 'api_version': (0, 9)}
close(autocommit=True)[source]

Close the coordinator, leave the current group, and reset local generation / member_id.

Keyword Arguments:
 autocommit (bool) – If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any pending consumed offsets prior to close. Default: True
commit_offsets_async(offsets, callback=None)[source]

Commit specific offsets asynchronously.

Parameters:
  • (dict {TopicPartition (offsets) – OffsetAndMetadata}): what to commit
  • callback (callable, optional) – called as callback(offsets, response) response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes.
Returns:

indicating whether the commit was successful or not

Return type:

Future

commit_offsets_sync(offsets)[source]

Commit specific offsets synchronously.

This method will retry until the commit completes successfully or an unrecoverable error is encountered.

Parameters:(dict {TopicPartition (offsets) – OffsetAndMetadata}): what to commit

Raises error on failure

fetch_committed_offsets(partitions)[source]

Fetch the current committed offsets for specified partitions

Parameters:partitions (list of TopicPartition) – partitions to fetch
Returns:{TopicPartition: OffsetAndMetadata}
Return type:dict
group_protocols()[source]

Returns list of preferred (protocols, metadata)

need_rejoin()[source]

Check whether the group should be rejoined

Returns:True if consumer should rejoin group, False otherwise
Return type:bool
protocol_type()[source]
refresh_committed_offsets_if_needed()[source]

Fetch committed offsets for assigned partitions.

class kafka.coordinator.consumer.ConsumerCoordinatorMetrics(metrics, metric_group_prefix, subscription)[source]

Bases: object

kafka.coordinator.heartbeat module

class kafka.coordinator.heartbeat.Heartbeat(**configs)[source]

Bases: object

DEFAULT_CONFIG = {'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000}
received_heartbeat()[source]
reset_session_timeout()[source]
sent_heartbeat()[source]
session_expired()[source]
should_heartbeat()[source]
ttl()[source]

Module contents