kafka.coordinator package

Submodules

kafka.coordinator.base module

class kafka.coordinator.base.BaseCoordinator(client, metrics, **configs)[source]

Bases: object

BaseCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. See ConsumerCoordinator for example usage.

From a high level, Kafka’s group management protocol consists of the following sequence of actions:

  1. Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
  2. Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
  3. State Assignment: The leader collects the metadata from all the members of the group and assigns state.
  4. Group Stabilization: Each member receives the state assigned by the leader and begins processing.

To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in group_protocols() and the format of the state assignment provided by the leader in _perform_assignment() and which becomes available to members in _on_join_complete().

Note on locking: this class shares state between the caller and a background thread which is used for sending heartbeats after the client has joined the group. All mutable state as well as state transitions are protected with the class’s monitor. Generally this means acquiring the lock before reading or writing the state of the group (e.g. generation, member_id) and holding the lock when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).

DEFAULT_CONFIG = {'metric_group_prefix': '', 'session_timeout_ms': 10000, 'retry_backoff_ms': 100, 'max_poll_interval_ms': 300000, 'heartbeat_interval_ms': 3000, 'group_id': 'kafka-python-default-group', 'api_version': (0, 10, 1)}
close()[source]

Close the coordinator, leave the current group, and reset local generation / member_id

coordinator()[source]

Get the current coordinator

Returns: the current coordinator id or None if it is unknown

coordinator_dead(error)[source]

Mark the current coordinator as dead.

coordinator_unknown()[source]

Check if we know who the coordinator is and have an active connection

Side-effect: reset coordinator_id to None if connection failed

Returns:True if the coordinator is unknown
Return type:bool
ensure_active_group()[source]

Ensure that the group is active (i.e. joined and synced)

ensure_coordinator_ready()[source]

Block until the coordinator for this group is known (and we have an active connection – java client uses unsent queue).

generation()[source]

Get the current generation state if the group is stable.

Returns: the current generation or None if the group is unjoined/rebalancing

group_protocols()[source]

Return the list of supported group protocols and metadata.

This list is submitted by each group member via a JoinGroupRequest. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).

Note: metadata must be type bytes or support an encode() method

Returns:[(protocol, metadata), …]
Return type:list
lookup_coordinator()[source]
maybe_leave_group()[source]

Leave the current group and reset local generation/memberId.

need_rejoin()[source]

Check whether the group should be rejoined (e.g. if metadata changes)

Returns:True if it should, False otherwise
Return type:bool
poll_heartbeat()[source]

Check the status of the heartbeat thread (if it is active) and indicate the liveness of the client. This must be called periodically after joining with ensure_active_group() to ensure that the member stays in the group. If an interval of time longer than the provided rebalance timeout (max_poll_interval_ms) expires without calling this method, then the client will proactively leave the group.

Raises: RuntimeError for unexpected errors raised from the heartbeat thread

protocol_type()[source]

Unique identifier for the class of supported protocols (e.g. “consumer” or “connect”).

Returns:protocol type name
Return type:str
request_rejoin()[source]
reset_generation()[source]

Reset the generation and memberId because we have fallen out of the group.

time_to_next_heartbeat()[source]
class kafka.coordinator.base.Generation(generation_id, member_id, protocol)[source]

Bases: object

NO_GENERATION = <kafka.coordinator.base.Generation object>
class kafka.coordinator.base.GroupCoordinatorMetrics(heartbeat, metrics, prefix, tags=None)[source]

Bases: object

class kafka.coordinator.base.HeartbeatThread(coordinator)[source]

Bases: threading.Thread

close()[source]
disable()[source]
enable()[source]
run()[source]
class kafka.coordinator.base.MemberState[source]

Bases: object

REBALANCING = '<rebalancing>'
STABLE = '<stable>'
UNJOINED = '<unjoined>'
exception kafka.coordinator.base.UnjoinedGroupException[source]

Bases: kafka.errors.KafkaError

retriable = True

kafka.coordinator.consumer module

class kafka.coordinator.consumer.ConsumerCoordinator(client, subscription, metrics, **configs)[source]

Bases: kafka.coordinator.base.BaseCoordinator

This class manages the coordination process with the consumer coordinator.

DEFAULT_CONFIG = {'assignors': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>, <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>), 'max_poll_interval_ms': 300000, 'metric_group_prefix': 'consumer', 'exclude_internal_topics': True, 'session_timeout_ms': 10000, 'auto_commit_interval_ms': 5000, 'retry_backoff_ms': 100, 'default_offset_commit_callback': None, 'enable_auto_commit': True, 'heartbeat_interval_ms': 3000, 'group_id': 'kafka-python-default-group', 'api_version': (0, 10, 1)}
close(autocommit=True)[source]

Close the coordinator, leave the current group, and reset local generation / member_id.

Keyword Arguments:
 autocommit (bool) – If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any pending consumed offsets prior to close. Default: True
commit_offsets_async(offsets, callback=None)[source]

Commit specific offsets asynchronously.

Parameters:
  • (dict {TopicPartition (offsets) – OffsetAndMetadata}): what to commit
  • callback (callable, optional) – called as callback(offsets, response) response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes.
commit_offsets_sync(offsets)[source]

Commit specific offsets synchronously.

This method will retry until the commit completes successfully or an unrecoverable error is encountered.

Parameters:(dict {TopicPartition (offsets) – OffsetAndMetadata}): what to commit

Raises error on failure

fetch_committed_offsets(partitions)[source]

Fetch the current committed offsets for specified partitions

Parameters:partitions (list of TopicPartition) – partitions to fetch
Returns:{TopicPartition: OffsetAndMetadata}
Return type:dict
group_protocols()[source]

Returns list of preferred (protocols, metadata)

need_rejoin()[source]

Check whether the group should be rejoined

Returns:True if consumer should rejoin group, False otherwise
Return type:bool
poll()[source]

Poll for coordinator events. Only applicable if group_id is set, and broker version supports GroupCoordinators. This ensures that the coordinator is known, and if using automatic partition assignment, ensures that the consumer has joined the group. This also handles periodic offset commits if they are enabled.

protocol_type()[source]
refresh_committed_offsets_if_needed()[source]

Fetch committed offsets for assigned partitions.

time_to_next_poll()[source]

Return seconds (float) remaining until poll() should be called again

class kafka.coordinator.consumer.ConsumerCoordinatorMetrics(metrics, metric_group_prefix, subscription)[source]

Bases: object

kafka.coordinator.heartbeat module

class kafka.coordinator.heartbeat.Heartbeat(**configs)[source]

Bases: object

DEFAULT_CONFIG = {'retry_backoff_ms': 100, 'group_id': None, 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000}
fail_heartbeat()[source]
poll()[source]
poll_timeout_expired()[source]
received_heartbeat()[source]
reset_timeouts()[source]
sent_heartbeat()[source]
session_timeout_expired()[source]
should_heartbeat()[source]
time_to_next_heartbeat()[source]

Returns seconds (float) remaining before next heartbeat should be sent

Module contents