Kafka
qa_pytest_kafka
__all__ = ['KafkaConfiguration', 'KafkaHandler', 'KafkaSteps', 'KafkaTests', 'Message']
module-attribute
KafkaConfiguration
Bases: BaseConfiguration
Kafka-specific test configuration. Provides access to the Kafka bootstrap servers from the configuration parser.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_configuration.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | |
bootstrap_servers
cached
property
group_id
cached
property
topic
cached
property
KafkaHandler
dataclass
Bases: LoggerMixin
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | |
consumer
instance-attribute
consuming_by
instance-attribute
indexing_by
instance-attribute
producer
instance-attribute
publishing_by
instance-attribute
received_messages
property
Returns a snapshot of all received messages, indexed by key.
topic
instance-attribute
__enter__()
Context manager entry. Returns self.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
71 72 73 | |
__exit__(exc_type, exc_value, traceback)
Context manager exit. Ensures the handler is closed.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
75 76 77 78 79 80 81 82 | |
__init__(consumer, producer, topic, indexing_by, consuming_by, publishing_by)
__post_init__()
Starts the consumer thread for handling asynchronous Kafka operations.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
65 66 67 68 69 | |
close()
Gracefully shuts down the consumer thread and closes Kafka clients.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
116 117 118 119 120 | |
publish(messages)
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
104 105 106 107 108 109 110 111 | |
publish_values(values)
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
113 114 | |
KafkaSteps
Bases: GenericSteps[KafkaSteps[TConfiguration]]
BDD-style step definitions for Kafka operations.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | |
a_kafka_handler(kafka_handler)
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
25 26 27 28 29 | |
publishing(messages)
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
31 32 33 34 35 | |
the_message_by_key(key, by_rule)
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
45 46 47 48 49 50 51 | |
the_received_messages(by_rule)
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_steps.py
37 38 39 40 41 42 43 | |
KafkaTests
Bases: AbstractTestsBase[KafkaTests[TSteps], KafkaTests[TConfiguration]]
Base class for BDD-style Kafka integration tests. Manages the lifecycle of a Kafka handler for test scenarios.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_tests.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | |
teardown_method()
Tears down the Kafka handler after each test method.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_tests.py
31 32 33 34 35 36 37 38 39 | |
Message
dataclass
Represents a Kafka message with business content and metadata.
Fields
content: The business payload of the message (None if message is empty). headers: Optional headers as a dictionary. key: Optional message key (for partitioning). offset: Kafka-assigned sequence number within a partition. - Not part of business equality or hash. - Excluded from eq and hash to ensure tests and business logic compare messages by content, key, and headers only. - Used for tracking consumer progress and ordering, not for business identity.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | |
content
instance-attribute
headers = field(default_factory=(lambda: {}))
class-attribute
instance-attribute
key = None
class-attribute
instance-attribute
offset = field(default=None, compare=False, hash=False)
class-attribute
instance-attribute
__init__(content, headers=(lambda: {})(), key=None, offset=None)
from_kafka(msg, deserializer)
staticmethod
Factory method to construct a Message from a confluent_kafka message.
Source code in qa-pytest-kafka/src/qa_pytest_kafka/kafka_handler.py
34 35 36 37 38 39 40 41 42 43 44 45 | |