Skip to content

Kafka

qa_pytest_kafka

__all__ = ['KafkaConfiguration', 'KafkaHandler', 'KafkaSteps', 'KafkaTests', 'Message'] module-attribute

KafkaConfiguration

Bases: BaseConfiguration

Kafka-specific test configuration. Provides access to the Kafka bootstrap servers from the configuration parser.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_configuration.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class KafkaConfiguration(BaseConfiguration):
    """
    Kafka-specific test configuration.
    Provides access to the Kafka bootstrap servers from the configuration parser.
    """
    @cached_property
    def _kafka_url(self) -> ParseResult:
        url = self.parser.get("kafka", "url")
        parsed = urlparse(url)
        if not parsed.scheme or not parsed.netloc or not parsed.path:
            raise ValueError(f"malformed kafka url: {url}")
        return parsed

    @cached_property
    def bootstrap_servers(self) -> str:
        return self._kafka_url.netloc

    @cached_property
    def topic(self) -> str:
        return self._kafka_url.path.lstrip("/")

    @cached_property
    def group_id(self) -> str:
        group_id = parse_qs(self._kafka_url.query).get("group_id", [])
        if group_id:
            return group_id[0]
        raise ValueError("missing group_id in kafka url")

bootstrap_servers cached property

group_id cached property

topic cached property

KafkaHandler dataclass

Bases: LoggerMixin

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@final
@dataclass
class KafkaHandler[K, V](LoggerMixin):
    consumer: Final[Consumer]
    producer: Final[Producer]
    topic: Final[str]
    indexing_by: Final[Callable[[Message[V]], K]]
    consuming_by: Final[Callable[[bytes], V]]
    publishing_by: Final[Callable[[V], bytes]]

    _received_messages: Final[dict[K, Message[V]]] = field(
        default_factory=lambda: dict(), init=False)
    _worker_thread: threading.Thread = field(init=False)
    _shutdown_event: threading.Event = field(
        default_factory=threading.Event, init=False)
    _lock: threading.RLock = field(default_factory=threading.RLock, init=False)

    def __post_init__(self) -> None:
        """Starts the consumer thread for handling asynchronous Kafka operations."""
        self._worker_thread = threading.Thread(
            target=self._worker_loop, name="kafka-consumer", daemon=True)
        self._worker_thread.start()

    def __enter__(self) -> "KafkaHandler[K, V]":
        """Context manager entry. Returns self."""
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None
    ) -> None:
        """Context manager exit. Ensures the handler is closed."""
        self.close()

    def _worker_loop(self) -> None:
        """Internal consumer loop for processing Kafka messages."""
        try:
            self.consumer.subscribe([self.topic])
            while not self._shutdown_event.is_set():
                msg = self.consumer.poll(0.5)
                if msg is None:
                    continue
                error = msg.error()
                if error is not None:
                    self.log.warning(f"Kafka error: {error}")
                    continue
                message = Message.from_kafka(msg, self.consuming_by)
                key = self.indexing_by(message)
                with self._lock:
                    self._received_messages[key] = message
                self.log.debug(f"received {key}")
        except Exception as e:
            self.log.error(f"Unhandled error in consumer thread: {e}")

    def publish(self, messages: Iterator[Message[V]]) -> None:
        for message in messages:
            value = self.publishing_by(
                message.content) if message.content is not None else None
            self.producer.produce(self.topic, value, message.key,
                                  headers=list(message.headers.items()))
            self.log.debug(f"published {message}")
        self.producer.flush()

    def publish_values(self, values: Iterator[V]) -> None:
        self.publish((Message(content=value) for value in values))

    def close(self) -> None:
        """Gracefully shuts down the consumer thread and closes Kafka clients."""
        self._shutdown_event.set()
        self._worker_thread.join(timeout=5.0)
        self.consumer.close()

    @property
    def received_messages(self) -> Mapping[K, Message[V]]:
        """Returns a snapshot of all received messages, indexed by key."""
        with self._lock:
            return dict(self._received_messages)

consumer instance-attribute

consuming_by instance-attribute

indexing_by instance-attribute

producer instance-attribute

publishing_by instance-attribute

received_messages property

Returns a snapshot of all received messages, indexed by key.

topic instance-attribute

__enter__()

Context manager entry. Returns self.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
71
72
73
def __enter__(self) -> "KafkaHandler[K, V]":
    """Context manager entry. Returns self."""
    return self

__exit__(exc_type, exc_value, traceback)

Context manager exit. Ensures the handler is closed.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
75
76
77
78
79
80
81
82
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None
) -> None:
    """Context manager exit. Ensures the handler is closed."""
    self.close()

__init__(consumer, producer, topic, indexing_by, consuming_by, publishing_by)

__post_init__()

Starts the consumer thread for handling asynchronous Kafka operations.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
65
66
67
68
69
def __post_init__(self) -> None:
    """Starts the consumer thread for handling asynchronous Kafka operations."""
    self._worker_thread = threading.Thread(
        target=self._worker_loop, name="kafka-consumer", daemon=True)
    self._worker_thread.start()

close()

Gracefully shuts down the consumer thread and closes Kafka clients.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
116
117
118
119
120
def close(self) -> None:
    """Gracefully shuts down the consumer thread and closes Kafka clients."""
    self._shutdown_event.set()
    self._worker_thread.join(timeout=5.0)
    self.consumer.close()

publish(messages)

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
104
105
106
107
108
109
110
111
def publish(self, messages: Iterator[Message[V]]) -> None:
    for message in messages:
        value = self.publishing_by(
            message.content) if message.content is not None else None
        self.producer.produce(self.topic, value, message.key,
                              headers=list(message.headers.items()))
        self.log.debug(f"published {message}")
    self.producer.flush()

publish_values(values)

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
113
114
def publish_values(self, values: Iterator[V]) -> None:
    self.publish((Message(content=value) for value in values))

KafkaSteps

Bases: GenericSteps[KafkaSteps[TConfiguration]]

BDD-style step definitions for Kafka operations.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class KafkaSteps[K, V, TConfiguration: KafkaConfiguration](
        GenericSteps[TConfiguration]):
    """
    BDD-style step definitions for Kafka operations.

    Type Parameters:
        K: The type of the message key.
        V: The type of the message content.
        TConfiguration: The configuration type, must be a KafkaConfiguration.
    """
    _kafka_handler: KafkaHandler[K, V]

    @Context.traced
    @final
    def a_kafka_handler(self, kafka_handler: KafkaHandler[K, V]) -> Self:
        self._kafka_handler = kafka_handler
        return self

    @Context.traced
    @final
    def publishing(self, messages: Iterable[Message[V]]) -> Self:
        self._kafka_handler.publish(iter(messages))
        return self

    @Context.traced
    @final
    def the_received_messages(
            self, by_rule: Matcher[Iterator[Message[V]]]) -> Self:
        return self.eventually_assert_that(
            lambda: iter(self._kafka_handler.received_messages.values()),
            by_rule)

    @Context.traced
    @final
    def the_message_by_key(self, key: K, by_rule: Matcher[Message[V]]) -> Self:
        return self.eventually_assert_that(
            lambda: require_not_none(
                self._kafka_handler.received_messages.get(key)),
            by_rule)

a_kafka_handler(kafka_handler)

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
25
26
27
28
29
@Context.traced
@final
def a_kafka_handler(self, kafka_handler: KafkaHandler[K, V]) -> Self:
    self._kafka_handler = kafka_handler
    return self

publishing(messages)

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
31
32
33
34
35
@Context.traced
@final
def publishing(self, messages: Iterable[Message[V]]) -> Self:
    self._kafka_handler.publish(iter(messages))
    return self

the_message_by_key(key, by_rule)

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
45
46
47
48
49
50
51
@Context.traced
@final
def the_message_by_key(self, key: K, by_rule: Matcher[Message[V]]) -> Self:
    return self.eventually_assert_that(
        lambda: require_not_none(
            self._kafka_handler.received_messages.get(key)),
        by_rule)

the_received_messages(by_rule)

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
37
38
39
40
41
42
43
@Context.traced
@final
def the_received_messages(
        self, by_rule: Matcher[Iterator[Message[V]]]) -> Self:
    return self.eventually_assert_that(
        lambda: iter(self._kafka_handler.received_messages.values()),
        by_rule)

KafkaTests

Bases: AbstractTestsBase[KafkaTests[TSteps], KafkaTests[TConfiguration]]

Base class for BDD-style Kafka integration tests. Manages the lifecycle of a Kafka handler for test scenarios.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_tests.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class KafkaTests[
    K,
    V,
    TSteps: KafkaSteps[Any, Any, Any],
    TConfiguration: KafkaConfiguration
](AbstractTestsBase[TSteps, TConfiguration]):
    """
    Base class for BDD-style Kafka integration tests.
    Manages the lifecycle of a Kafka handler for test scenarios.

    Type Args:
        K: The type of the message key.
        V: The type of the message content.
        TSteps: The steps implementation type.
        TConfiguration: The configuration type, must be a KafkaConfiguration.
    """
    _handler: KafkaHandler[K, V]

    @override
    def teardown_method(self):
        """
        Tears down the Kafka handler after each test method.
        """
        try:
            self._handler.close()
        finally:
            super().teardown_method()

teardown_method()

Tears down the Kafka handler after each test method.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_tests.py
31
32
33
34
35
36
37
38
39
@override
def teardown_method(self):
    """
    Tears down the Kafka handler after each test method.
    """
    try:
        self._handler.close()
    finally:
        super().teardown_method()

Message dataclass

Represents a Kafka message with business content and metadata.

Fields

content: The business payload of the message (None if message is empty). headers: Optional headers as a dictionary. key: Optional message key (for partitioning). offset: Kafka-assigned sequence number within a partition. - Not part of business equality or hash. - Excluded from eq and hash to ensure tests and business logic compare messages by content, key, and headers only. - Used for tracking consumer progress and ordering, not for business identity.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@dataclass(frozen=True)
class Message[V]:
    """
    Represents a Kafka message with business content and metadata.

    Fields:
        content: The business payload of the message (None if message is empty).
        headers: Optional headers as a dictionary.
        key: Optional message key (for partitioning).
        offset: Kafka-assigned sequence number within a partition.
            - Not part of business equality or hash.
            - Excluded from __eq__ and __hash__ to ensure tests and business logic
              compare messages by content, key, and headers only.
            - Used for tracking consumer progress and ordering, not for business identity.
    """
    content: V | None
    headers: dict[str, bytes | str | None] = field(default_factory=lambda: {})
    key: bytes | None = None
    offset: int | None = field(default=None, compare=False, hash=False)

    @staticmethod
    def from_kafka[VV](
            msg: KafkaMessage,
            deserializer: Callable[[bytes], VV]) -> "Message[VV]":
        """Factory method to construct a Message from a confluent_kafka message."""
        raw_value = msg.value()
        return Message(
            content=deserializer(raw_value) if raw_value is not None else None,
            headers=dict(msg.headers() or []),
            key=msg.key(),
            offset=msg.offset()
        )

content instance-attribute

headers = field(default_factory=(lambda: {})) class-attribute instance-attribute

key = None class-attribute instance-attribute

offset = field(default=None, compare=False, hash=False) class-attribute instance-attribute

__init__(content, headers=(lambda: {})(), key=None, offset=None)

from_kafka(msg, deserializer) staticmethod

Factory method to construct a Message from a confluent_kafka message.

Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
34
35
36
37
38
39
40
41
42
43
44
45
@staticmethod
def from_kafka[VV](
        msg: KafkaMessage,
        deserializer: Callable[[bytes], VV]) -> "Message[VV]":
    """Factory method to construct a Message from a confluent_kafka message."""
    raw_value = msg.value()
    return Message(
        content=deserializer(raw_value) if raw_value is not None else None,
        headers=dict(msg.headers() or []),
        key=msg.key(),
        offset=msg.offset()
    )