Consumer¶
The Consumer
is the main part of RabbitLeap framework. It’s responsible
for connecting to RabbitMQ, besides, receiving, acknowledging, and rejecting messages.
It handles connection failures and automatically reconnects to RabbitMQ. The consumer
consumes a share or exclusive RabbitMQ queue.
When the Consumer
receives a message from RabbitMQ, it packages the message
properties, payload, and delivery information in an Envelope
object,
preparing it for handling. The envelope then, is routed to its handler by the
Router
, which the consumer relies on to route envelopes to their handlers.
The consumer can be configured with a retry policy. The RetryPolicy
defines
how to do handling retries incase a timeout or an error happens.
Consumer¶
-
class
rabbitleap.consumer.
Consumer
(amqp_url, queue_name, durable=True, exclusive=False, dlx_name=None, auto_reconnect=True, auto_reconnect_delay=3)[source] Bases:
rabbitleap.routing.RuleRouter
RabbitMQ Consumer.
Public Methods:
start()
: start consumer.stop()
: stop consumer.restart()
: restart consumer.abort()
: reject message.skip()
: skip message handling.error()
: raiseHandlingError
exception to report a handling error.add_exchange_bind()
: add exchange bind to the bindings list.add_rule()
: add a routing rule to the routing rules list.set_default_rule()
: set default routing rule to catch all unmatched messagesadd_handler()
: add handler, it creates a routing rule then add it to the routing rules list.set_default_handler()
: set default handler, it creates a rule then set it as default routing ruleset_retry_policy()
: set a retry policyunset_retry_policy()
: un-set retry policy
-
Consumer.
start
()[source] Start consumer.
-
Consumer.
stop
()[source] Stop consumer.
-
Consumer.
restart
()[source] Restart consumer.
Close the connection and reconnect again.
-
Consumer.
abort
(reason=None)[source] Abort handling the message.
This can be called during
pre_handle()
orhandle()
to abort handling. It raisesAbortHandling
exception. The exception is handled by the consumer, and causes the consumer to reject the message.Raise: AbortHandling
when calledNOTE: when called inside the handler, the handler MUST re-raise
AbortHandling
exception to the consumer, in case the exception is handled inside.Parameters: reason (str) – Reason for aborting handling the message
-
Consumer.
skip
(reason=None)[source] Skip handling the message.
This can be called during
pre_handle()
orhandle()
to skip handling. It raisesSkipHandling
exception. The exception is handled by the consumer, and causes the consumer to ack and skip the message.Raise: SkipHandling
when calledNOTE: when called inside the handler, the handler should re-raise
SkipHandling
exception to the consumer, in case the exception is handled inside.Parameters: reason (str) – Reason for skipping handling the message
-
Consumer.
error
(error_msg=None)[source] Raise
HandlingError
exception.This method can be called inside the handler or invoked by the consumer in case of an error happened while handling the message. This method raises
HandlingError
which is handled by the consumer. The consumer will retry handling the message if a retry policy is set or reject the message incase no retry policy is set. When calling this method inside the handler, it should be called duringpre_handle()
orhandle()
.Raise: HandlingError
when calledNOTE: when called inside the handler, the handler should re-raise
HandlingError
exception to the consumer, in case the exception is handled inside.Parameters: error_msg (str) – Error message
-
Consumer.
add_exchange_bind
(exchange_name, routing_key, declare_exchange=False, declare_kwargs=None)[source] Add exchange binding to the bindings list
NOTE: Actual exchange binding is happening during the consumer start up, when the connection with RabbitMQ established. Invoking this method after the connection is already established won’t take affect until the consumer re-establishes the connection. The method
restart()
can be called to force the consumer to disconnect and reconnect again, doing exchange binding as part of that process.Raise: AssertionError
if declare_exchange isTrue
and declare_kwargs isNone
or ‘type’ is not in declare_kwargsParameters: - exchange_name (str) – name of the exchange to bind to.
- routing_key (str) – binding routing key.
- declare_exchange (bool) – should declare exchange before binding.
- declare_kwargs (dict) – is exchange_declare() arguments.
declare_kwargs dict:
'exchange_type': `required`, 'durable': `optional`, default to ``False``. 'auto_delete': `optional`, default to ``False`` 'internal': `optional`, default to ``False`` `arguments`: `optional` additional exchange declaration arguments
-
Consumer.
add_rule
(rule) Add a routing rule to the routing rules list.
A routing rule is added to the end of routing rules list, just before the default rule which is always the last one.
A routing rule is an instance of the class
Rule
. It has 3 fields, aMatcher
, target, and target arguments field. The matcher is used to determines whether the target can handler the envelope. The target can be aHandler
class or aRouter
instance (sub-router). The target arguments is a dictionary passed to the newly created handler for initialization.Since the rules of the Rule Router are checked sequentially in the order they added, more specific rules should be added first, then generic ones later.
Parameters: rule (Rule) – routing rule instance.
-
Consumer.
set_default_rule
(rule) Set default rule.
Default rule, when set, it catches all unroutable envelopes.
Parameters: rule (Rule) – default routing rule which catches all unmatched messages. None
will unset default rule
-
Consumer.
add_handler
(matcher_or_pattern, target, target_kwargs=None)[source] Construct and add a routing rule for the provided handler.
Handlers are added as routing rules. A routing rule (
Rule
) is an object that contains a matcher (Matcher
) instance, target (Handler subclass or a Router instance), and target kwargs. A routing rule is constructed and added routing rules list.matcher is an instance of
Matcher
(can be a str explained later) which calling itsmatch()
method, passingEnvelope
instance, returnsTrue
orFalse
indicating match or no match.target can be a
Handler
subclass or aRouter
instance. When aRouter
instance is provided as a target, it will act as a sub-router which has its own routing logic. This way, a chain of routers can be constructed.target_kwargs is a dict passed to the handler
initialize()
hook method.When finding a handler, the Rule Router (which the consumer is based on) goes through the list of rules sequentially in the order they were added, invoking the matcher’s match method of the each rule.
In case the router finds a match whose target is a router (sub-router) instance, its
find_handler()
is invoked it find handlerIn case, the match target is a subclass of
Handler
, the router creates a handler instance, invokes itsinitialize()
method passing the handler kwargs, then returns it.The router stops upon first match, returning the handler to the consumer.
If a default_handler or default_rule is set, a default rule is added to the end of the routing rules list which will catch all unmatched messages.
The router returns
None
when there is no match and no default_rule is set.Since the rules of the Rule Router (which the consumer is based on) are checked sequentially, more specific handlers should be added first, then generic ones later.
When passed matcher is a string, the default matcher
MessageTypeMatches
is constructed and the passed string is its message type regular expression stringParameters: - matcher (Matcher|str) – a
Matcher
instance used to determin the match or a pattern string used for the default matcher typeMessageTypeMatches
as its message type regx pattern. - target (Type[Handler]|Router) – a subclass of
Handler
or aRouter
instance. - target_kwargs (dict) – a dict of kwargs that are passed to
handler
initialize()
method. Only used when the target is aHandler
subclass
- matcher (Matcher|str) – a
-
Consumer.
set_default_handler
(default_target, default_target_kwargs=None)[source] Construct and add default routing rule for the given target
This method constructs a routing rule for the given target and pass it to
set_default_rule()
.Parameters: - default_target (Type[Handler]) – Default target, which will catch all unmatched message. “None” means unset default target, unmached messages will be sent to dlx.
- default_target_kwargs (dict) – a dict of kwargs that are passed to
handler
initialize()
method. Only used when the target is aHandler
subclass
-
Consumer.
set_retry_policy
(retry_policy)[source] Set retry policy.
Retry policy can be an instance of any
RetryPolicy
subclass.Parameters: retry_policy (RetryPolicy) – an instance of a retry policy
-
Consumer.
unset_retry_policy
()[source] Unset retry policy.