Skip to content

RabbitMQ

qa_pytest_rabbitmq

__all__ = ['Message', 'QueueHandler', 'RabbitMqConfiguration', 'RabbitMqSteps', 'RabbitMqTests'] module-attribute

Message dataclass

Represents a message to be published or consumed from a RabbitMQ queue.

Attributes:

Name Type Description
content V

The message payload.

properties BasicProperties

Optional message properties for RabbitMQ.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
16
17
18
19
20
21
22
23
24
25
26
27
@to_string()
@dataclass(frozen=True)
class Message[V]:
    """
    Represents a message to be published or consumed from a RabbitMQ queue.

    Attributes:
        content (V): The message payload.
        properties (BasicProperties): Optional message properties for RabbitMQ.
    """
    content: V
    properties: BasicProperties = field(default_factory=BasicProperties)

content instance-attribute

properties = field(default_factory=BasicProperties) class-attribute instance-attribute

__init__(content, properties=BasicProperties())

QueueHandler dataclass

Bases: LoggerMixin

Handles publishing and consuming messages from a RabbitMQ queue in a thread-safe, asynchronous manner.

Parameters:

Name Type Description Default
channel BlockingChannel

The RabbitMQ channel to use.

required
queue_name str

The name of the queue to operate on.

required
indexing_by Callable

Function to extract a key from a message.

required
consuming_by Callable

Function to deserialize message bytes.

required
publishing_by Callable

Function to serialize message content.

required
Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@to_string()
@dataclass
@final
class QueueHandler[K, V](LoggerMixin):
    """
    Handles publishing and consuming messages from a RabbitMQ queue in a thread-safe, asynchronous manner.

    Args:
        channel (BlockingChannel): The RabbitMQ channel to use.
        queue_name (str): The name of the queue to operate on.
        indexing_by (Callable): Function to extract a key from a message.
        consuming_by (Callable): Function to deserialize message bytes.
        publishing_by (Callable): Function to serialize message content.
    """
    channel: Final[BlockingChannel]
    queue_name: Final[str]
    indexing_by: Final[Callable[[Message[V]], K]]
    consuming_by: Final[Callable[[bytes], V]]
    publishing_by: Final[Callable[[V], bytes]]

    _received_messages: Final[dict[K, Message[V]]] = field(
        default_factory=lambda: dict())
    _command_queue: Final[queue.Queue[Callable[[], None]]] = field(
        default_factory=lambda: queue.Queue())

    _worker_thread: threading.Thread = field(init=False)
    _shutdown_event: threading.Event = field(
        default_factory=threading.Event, init=False)
    _consumer_tag: str | None = field(default=None, init=False)
    _lock: threading.RLock = field(default_factory=threading.RLock, init=False)

    def __post_init__(self) -> None:
        """
        Starts the worker thread for handling asynchronous queue operations.
        """
        self._worker_thread = threading.Thread(
            target=self._worker_loop, name="rabbitmq-handler", daemon=True)
        self._worker_thread.start()

    def __enter__(self) -> "QueueHandler[K, V]":
        """
        Context manager entry. Returns self.
        """
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None
    ) -> None:
        """
        Context manager exit. Ensures the handler is closed and resources are released.
        """
        self.close()

    def _worker_loop(self) -> None:
        """
        Internal worker loop for processing commands and RabbitMQ events.
        """
        while not self._shutdown_event.is_set():
            try:
                self.channel.connection.process_data_events()
                try:
                    command = self._command_queue.get_nowait()
                    command()
                except queue.Empty:
                    pass
            except Exception as e:
                self.log.error(f"Unhandled error in worker thread: {e}")

    def _submit(self, fn: Callable[[], None]) -> None:
        """
        Submits a callable to be executed by the worker thread.
        """
        self._command_queue.put(fn)

    def consume(self) -> str:
        """
        Starts consuming messages from the queue asynchronously.
        Returns:
            str: A placeholder consumer tag (actual tag is set internally).
        """
        def _consume():
            def on_message(ch: BlockingChannel, method: Any,
                           props: BasicProperties, body: bytes) -> None:
                try:
                    content = self.consuming_by(body)
                    message = Message(content=content, properties=props)
                    key = self.indexing_by(message)
                    with self._lock:
                        self._received_messages[key] = message
                    ch.basic_ack(
                        delivery_tag=require_not_none(
                            method.delivery_tag))
                    self.log.debug(f"received {key}")
                except Exception as e:
                    self.log.warning(f"skipping message due to error: {e}")
                    ch.basic_reject(
                        delivery_tag=require_not_none(
                            method.delivery_tag),
                        requeue=True)

            self._consumer_tag = self.channel.basic_consume(
                queue=self.queue_name, on_message_callback=on_message
            )
            self.log.debug(f"consumer set up with tag {self._consumer_tag}")

        self._submit(_consume)
        return "pending-tag"

    def cancel(self) -> str:
        """
        Cancels the active consumer, if any.
        Returns:
            str: The previous consumer tag, or an empty string if none.
        """
        def _cancel():
            if self._consumer_tag:
                self.channel.connection.add_callback_threadsafe(
                    self.channel.stop_consuming)
                self._consumer_tag = None
                self.log.debug("consumer cancelled")
        self._submit(_cancel)
        return self._consumer_tag or ""

    def publish(self, messages: Iterator[Message[V]]) -> None:
        """
        Publishes an iterable of Message objects to the queue asynchronously.

        Args:
            messages (Iterator[Message[V]]): The messages to publish.
        """
        def _publish():
            for message in messages:
                body = self.publishing_by(message.content)
                self.channel.basic_publish(
                    exchange=EMPTY_STRING,
                    routing_key=self.queue_name,
                    body=body,
                    properties=message.properties
                )
                self.log.debug(f"published {message}")
        self._submit(_publish)

    def publish_values(self, values: Iterator[V]) -> None:
        """
        Publishes an iterable of values to the queue, wrapping each in a Message.

        Args:
            values (Iterator[V]): The values to publish.
        """
        self.publish((Message(content=value) for value in values))

    def close(self) -> None:
        """
        Gracefully shuts down the handler, cancels consumers, and joins the worker thread.
        """
        self.cancel()
        self._shutdown_event.set()
        self._worker_thread.join(timeout=5.0)

    @property
    def received_messages(self) -> Mapping[K, Message[V]]:
        """
        Returns a snapshot of all received messages, indexed by key.

        Returns:
            Mapping[K, Message[V]]: The received messages.
        """
        with self._lock:
            return dict(self._received_messages)

channel instance-attribute

consuming_by instance-attribute

indexing_by instance-attribute

publishing_by instance-attribute

queue_name instance-attribute

received_messages property

Returns a snapshot of all received messages, indexed by key.

Returns:

Type Description
Mapping[K, Message[V]]

Mapping[K, Message[V]]: The received messages.

__enter__()

Context manager entry. Returns self.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
69
70
71
72
73
def __enter__(self) -> "QueueHandler[K, V]":
    """
    Context manager entry. Returns self.
    """
    return self

__exit__(exc_type, exc_value, traceback)

Context manager exit. Ensures the handler is closed and resources are released.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
75
76
77
78
79
80
81
82
83
84
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None
) -> None:
    """
    Context manager exit. Ensures the handler is closed and resources are released.
    """
    self.close()

__init__(channel, queue_name, indexing_by, consuming_by, publishing_by, _received_messages=lambda: dict()(), _command_queue=lambda: queue.Queue()())

__post_init__()

Starts the worker thread for handling asynchronous queue operations.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
61
62
63
64
65
66
67
def __post_init__(self) -> None:
    """
    Starts the worker thread for handling asynchronous queue operations.
    """
    self._worker_thread = threading.Thread(
        target=self._worker_loop, name="rabbitmq-handler", daemon=True)
    self._worker_thread.start()

cancel()

Cancels the active consumer, if any. Returns: str: The previous consumer tag, or an empty string if none.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def cancel(self) -> str:
    """
    Cancels the active consumer, if any.
    Returns:
        str: The previous consumer tag, or an empty string if none.
    """
    def _cancel():
        if self._consumer_tag:
            self.channel.connection.add_callback_threadsafe(
                self.channel.stop_consuming)
            self._consumer_tag = None
            self.log.debug("consumer cancelled")
    self._submit(_cancel)
    return self._consumer_tag or ""

close()

Gracefully shuts down the handler, cancels consumers, and joins the worker thread.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
184
185
186
187
188
189
190
def close(self) -> None:
    """
    Gracefully shuts down the handler, cancels consumers, and joins the worker thread.
    """
    self.cancel()
    self._shutdown_event.set()
    self._worker_thread.join(timeout=5.0)

consume()

Starts consuming messages from the queue asynchronously. Returns: str: A placeholder consumer tag (actual tag is set internally).

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def consume(self) -> str:
    """
    Starts consuming messages from the queue asynchronously.
    Returns:
        str: A placeholder consumer tag (actual tag is set internally).
    """
    def _consume():
        def on_message(ch: BlockingChannel, method: Any,
                       props: BasicProperties, body: bytes) -> None:
            try:
                content = self.consuming_by(body)
                message = Message(content=content, properties=props)
                key = self.indexing_by(message)
                with self._lock:
                    self._received_messages[key] = message
                ch.basic_ack(
                    delivery_tag=require_not_none(
                        method.delivery_tag))
                self.log.debug(f"received {key}")
            except Exception as e:
                self.log.warning(f"skipping message due to error: {e}")
                ch.basic_reject(
                    delivery_tag=require_not_none(
                        method.delivery_tag),
                    requeue=True)

        self._consumer_tag = self.channel.basic_consume(
            queue=self.queue_name, on_message_callback=on_message
        )
        self.log.debug(f"consumer set up with tag {self._consumer_tag}")

    self._submit(_consume)
    return "pending-tag"

publish(messages)

Publishes an iterable of Message objects to the queue asynchronously.

Parameters:

Name Type Description Default
messages Iterator[Message[V]]

The messages to publish.

required
Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def publish(self, messages: Iterator[Message[V]]) -> None:
    """
    Publishes an iterable of Message objects to the queue asynchronously.

    Args:
        messages (Iterator[Message[V]]): The messages to publish.
    """
    def _publish():
        for message in messages:
            body = self.publishing_by(message.content)
            self.channel.basic_publish(
                exchange=EMPTY_STRING,
                routing_key=self.queue_name,
                body=body,
                properties=message.properties
            )
            self.log.debug(f"published {message}")
    self._submit(_publish)

publish_values(values)

Publishes an iterable of values to the queue, wrapping each in a Message.

Parameters:

Name Type Description Default
values Iterator[V]

The values to publish.

required
Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/queue_handler.py
175
176
177
178
179
180
181
182
def publish_values(self, values: Iterator[V]) -> None:
    """
    Publishes an iterable of values to the queue, wrapping each in a Message.

    Args:
        values (Iterator[V]): The values to publish.
    """
    self.publish((Message(content=value) for value in values))

RabbitMqConfiguration

Bases: BaseConfiguration

RabbitMQ-specific test configuration. Provides access to the RabbitMQ connection URI from the configuration parser.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_configuration.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class RabbitMqConfiguration(BaseConfiguration):
    """
    RabbitMQ-specific test configuration.
    Provides access to the RabbitMQ connection URI from the configuration parser.
    """

    @cached_property
    def connection_uri(self) -> pika.URLParameters:
        """
        Returns the RabbitMQ connection URI as a pika.URLParameters object.

        Returns:
            pika.URLParameters: The connection URI for RabbitMQ.
        """
        return pika.URLParameters(self.parser.get("rabbitmq", "connection_uri"))

connection_uri cached property

Returns the RabbitMQ connection URI as a pika.URLParameters object.

Returns:

Type Description
URLParameters

pika.URLParameters: The connection URI for RabbitMQ.

RabbitMqSteps

Bases: GenericSteps[TConfiguration]

BDD-style step definitions for RabbitMQ queue operations.

Type Parameters

K: The type of the message key. V: The type of the message content. TConfiguration: The configuration type, must be a RabbitMqConfiguration.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_steps.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class RabbitMqSteps[K, V, TConfiguration: RabbitMqConfiguration](
        GenericSteps[TConfiguration]):
    """
    BDD-style step definitions for RabbitMQ queue operations.

    Type Parameters:
        K: The type of the message key.
        V: The type of the message content.
        TConfiguration: The configuration type, must be a RabbitMqConfiguration.
    """
    _queue_handler: QueueHandler[K, V]

    @Context.traced
    @final
    def a_queue_handler(self, queue_handler: QueueHandler[K, V]) -> Self:
        """
        Sets the queue handler to use for subsequent steps.

        Args:
            queue_handler (QueueHandler[K, V]): The handler instance.
        Returns:
            Self: The current step instance for chaining.
        """
        self._queue_handler = queue_handler
        return self

    @Context.traced
    @final
    def publishing(self, messages: Iterable[Message[V]]) -> Self:
        """
        Publishes the provided messages to the queue.

        Args:
            messages (Iterable[Message[V]]): The messages to publish.
        Returns:
            Self: The current step instance for chaining.
        """
        self._queue_handler.publish(iter(messages))
        return self

    @Context.traced
    @final
    def consuming(self) -> Self:
        """
        Starts consuming messages from the queue.

        Returns:
            Self: The current step instance for chaining.
        """
        self._queue_handler.consume()
        return self

    @Context.traced
    @final
    def the_received_messages(
            self, by_rule: Matcher[Iterator[Message[V]]]) -> Self:
        """
        Asserts that the received messages match the provided matcher rule.

        Args:
            by_rule (Matcher[Iterator[Message[V]]]): Matcher for the received messages iterator.
        Returns:
            Self: The current step instance for chaining.
        """
        return self.eventually_assert_that(
            lambda: iter(self._queue_handler.received_messages.values()),
            by_rule)

    @Context.traced
    @final
    def the_message_by_key(
            self, key: K, by_rule: Matcher[Message[V]]) -> Self:
        """
        Asserts that the message with the given key matches the provided matcher rule.

        Args:
            key (K): The key to look up.
            by_rule (Matcher[Message[V]]): Matcher for the message.
        Returns:
            Self: The current step instance for chaining.
        """
        return self.eventually_assert_that(
            lambda: require_not_none(
                self._queue_handler.received_messages.get(key)),
            by_rule)

a_queue_handler(queue_handler)

Sets the queue handler to use for subsequent steps.

Parameters:

Name Type Description Default
queue_handler QueueHandler[K, V]

The handler instance.

required

Returns: Self: The current step instance for chaining.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_steps.py
26
27
28
29
30
31
32
33
34
35
36
37
38
@Context.traced
@final
def a_queue_handler(self, queue_handler: QueueHandler[K, V]) -> Self:
    """
    Sets the queue handler to use for subsequent steps.

    Args:
        queue_handler (QueueHandler[K, V]): The handler instance.
    Returns:
        Self: The current step instance for chaining.
    """
    self._queue_handler = queue_handler
    return self

consuming()

Starts consuming messages from the queue.

Returns:

Name Type Description
Self Self

The current step instance for chaining.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_steps.py
54
55
56
57
58
59
60
61
62
63
64
@Context.traced
@final
def consuming(self) -> Self:
    """
    Starts consuming messages from the queue.

    Returns:
        Self: The current step instance for chaining.
    """
    self._queue_handler.consume()
    return self

publishing(messages)

Publishes the provided messages to the queue.

Parameters:

Name Type Description Default
messages Iterable[Message[V]]

The messages to publish.

required

Returns: Self: The current step instance for chaining.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_steps.py
40
41
42
43
44
45
46
47
48
49
50
51
52
@Context.traced
@final
def publishing(self, messages: Iterable[Message[V]]) -> Self:
    """
    Publishes the provided messages to the queue.

    Args:
        messages (Iterable[Message[V]]): The messages to publish.
    Returns:
        Self: The current step instance for chaining.
    """
    self._queue_handler.publish(iter(messages))
    return self

the_message_by_key(key, by_rule)

Asserts that the message with the given key matches the provided matcher rule.

Parameters:

Name Type Description Default
key K

The key to look up.

required
by_rule Matcher[Message[V]]

Matcher for the message.

required

Returns: Self: The current step instance for chaining.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_steps.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@Context.traced
@final
def the_message_by_key(
        self, key: K, by_rule: Matcher[Message[V]]) -> Self:
    """
    Asserts that the message with the given key matches the provided matcher rule.

    Args:
        key (K): The key to look up.
        by_rule (Matcher[Message[V]]): Matcher for the message.
    Returns:
        Self: The current step instance for chaining.
    """
    return self.eventually_assert_that(
        lambda: require_not_none(
            self._queue_handler.received_messages.get(key)),
        by_rule)

the_received_messages(by_rule)

Asserts that the received messages match the provided matcher rule.

Parameters:

Name Type Description Default
by_rule Matcher[Iterator[Message[V]]]

Matcher for the received messages iterator.

required

Returns: Self: The current step instance for chaining.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_steps.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Context.traced
@final
def the_received_messages(
        self, by_rule: Matcher[Iterator[Message[V]]]) -> Self:
    """
    Asserts that the received messages match the provided matcher rule.

    Args:
        by_rule (Matcher[Iterator[Message[V]]]): Matcher for the received messages iterator.
    Returns:
        Self: The current step instance for chaining.
    """
    return self.eventually_assert_that(
        lambda: iter(self._queue_handler.received_messages.values()),
        by_rule)

RabbitMqTests

Bases: AbstractTestsBase[TSteps, TConfiguration]

Base class for BDD-style RabbitMQ integration tests. Manages the lifecycle of a RabbitMQ connection for test scenarios.

Type Args

K: The type of the message key. V: The type of the message content. TSteps: The steps implementation type. TConfiguration: The configuration type, must be a RabbitMqConfiguration.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_tests.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class RabbitMqTests[
    K,
    V,
    TSteps: RabbitMqSteps[Any, Any, Any],
    TConfiguration: RabbitMqConfiguration
](AbstractTestsBase[TSteps, TConfiguration]):
    """
    Base class for BDD-style RabbitMQ integration tests.
    Manages the lifecycle of a RabbitMQ connection for test scenarios.

    Type Args:
        K: The type of the message key.
        V: The type of the message content.
        TSteps: The steps implementation type.
        TConfiguration: The configuration type, must be a RabbitMqConfiguration.
    """
    _connection: pika.BlockingConnection

    @override
    def setup_method(self):
        """
        Sets up the RabbitMQ connection before each test method.
        """
        super().setup_method()
        self._connection = pika.BlockingConnection(
            self._configuration.connection_uri)

    @override
    def teardown_method(self):
        """
        Tears down the RabbitMQ connection after each test method.
        """
        try:
            self._connection.close()
        finally:
            super().teardown_method()
            super().teardown_method()

setup_method()

Sets up the RabbitMQ connection before each test method.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_tests.py
29
30
31
32
33
34
35
36
@override
def setup_method(self):
    """
    Sets up the RabbitMQ connection before each test method.
    """
    super().setup_method()
    self._connection = pika.BlockingConnection(
        self._configuration.connection_uri)

teardown_method()

Tears down the RabbitMQ connection after each test method.

Source code in qa-pytest-rabbitmq/src/qa_pytest_rabbitmq/rabbitmq_tests.py
38
39
40
41
42
43
44
45
46
47
@override
def teardown_method(self):
    """
    Tears down the RabbitMQ connection after each test method.
    """
    try:
        self._connection.close()
    finally:
        super().teardown_method()
        super().teardown_method()