Consumer¶
The Consumer is the main part of RabbitLeap framework. It’s responsible
for connecting to RabbitMQ, besides, receiving, acknowledging, and rejecting messages.
It handles connection failures and automatically reconnects to RabbitMQ. The consumer
consumes a share or exclusive RabbitMQ queue.
When the Consumer receives a message from RabbitMQ, it packages the message
properties, payload, and delivery information in an Envelope object,
preparing it for handling. The envelope then, is routed to its handler by the
Router, which the consumer relies on to route envelopes to their handlers.
The consumer can be configured with a retry policy. The RetryPolicy defines
how to do handling retries incase a timeout or an error happens
Consumer¶
-
class
rabbitleap.consumer.Consumer(amqp_url, queue_name, durable=True, exclusive=False, dlx_name=None, auto_reconnect=True, auto_reconnect_delay=3)[source] Bases:
rabbitleap.routing.RuleRouterRabbitMQ Consumer.
Public Methods:
start(): start consumer.stop(): stop consumer.restart(): restart consumer.abort(): reject message.skip(): skip message handling.error(): raiseHandlingErrorexception to report a handling error.add_exchange_bind(): add exchange bind to the bindings list.add_rule(): add a routing rule to the routing rules list.set_default_rule(): set default routing rule to catch all unmatched messagesadd_handler(): add handler, it creates a routing rule then add it to the routing rules list.set_default_handler(): set default handler, it creates a rule then set it as default routing ruleset_retry_policy(): set a retry policyunset_retry_policy(): un-set retry policy
-
Consumer.start()[source] Start consumer.
-
Consumer.stop()[source] Stop consumer.
-
Consumer.restart()[source] Restart consumer.
Close the connection and reconnect again.
-
Consumer.abort(reason=None)[source] Abort handling the message.
This can be called during
pre_handle()orhandle()to abort handling. It raisesAbortHandlingexception. The exception is handled by the consumer, and causes the consumer to reject the message.Raise: AbortHandlingwhen calledNOTE: when called inside the handler, the handler MUST re-raise
AbortHandlingexception to the consumer, in case the exception is handled inside.Parameters: reason (str) – Reason for aborting handling the message
-
Consumer.skip(reason=None)[source] Skip handling the message.
This can be called during
pre_handle()orhandle()to skip handling. It raisesSkipHandlingexception. The exception is handled by the consumer, and causes the consumer to ack and skip the message.Raise: SkipHandlingwhen calledNOTE: when called inside the handler, the handler should re-raise
SkipHandlingexception to the consumer, in case the exception is handled inside.Parameters: reason (str) – Reason for skipping handling the message
-
Consumer.error(error_msg=None)[source] Raise
HandlingErrorexception.This method can be called inside the handler or invoked by the consumer in case of an error happened while handling the message. This method raises
HandlingErrorwhich is handled by the consumer. The consumer will retry handling the message if a retry policy is set or reject the message incase no retry policy is set. When calling this method inside the handler, it should be called duringpre_handle()orhandle().Raise: HandlingErrorwhen calledNOTE: when called inside the handler, the handler should re-raise
HandlingErrorexception to the consumer, in case the exception is handled inside.Parameters: error_msg (str) – Error message
-
Consumer.add_exchange_bind(exchange_name, routing_key, declare_exchange=False, declare_kwargs=None)[source] Add exchange binding to the bindings list
NOTE: Actual exchange binding is happening during the consumer start up, when the connection with RabbitMQ established. Invoking this method after the connection is already established won’t take affect until the consumer re-establishes the connection. The method
restart()can be called to force the consumer to disconnect and reconnect again, doing exchange binding as part of that process.Raise: AssertionErrorif declare_exchange isTrueand declare_kwargs isNoneor ‘type’ is not in declare_kwargsParameters: - exchange_name (str) – name of the exchange to bind to.
- routing_key (str) – binding routing key.
- declare_exchange (bool) – should declare exchange before binding.
- declare_kwargs (dict) – is exchange_declare() arguments.
declare_kwargs dict:
'exchange_type': `required`, 'durable': `optional`, default to ``False``. 'auto_delete': `optional`, default to ``False`` 'internal': `optional`, default to ``False`` `arguments`: `optional` additional exchange declaration arguments
-
Consumer.add_rule(rule) Add a routing rule to the routing rules list.
A routing rule is added to the end of routing rules list, just before the default rule which is always the last one.
A routing rule is an instance of the class
Rule. It has 3 fields, aMatcher, target, and target arguments field. The matcher is used to determines whether the target can handler the envelope. The target can be aHandlerclass or aRouterinstance (sub-router). The target arguments is a dictionary passed to the newly created handler for initialization.Since the rules of the Rule Router are checked sequentially in the order they added, more specific rules should be added first, then generic ones later.
Parameters: rule (Rule) – routing rule instance.
-
Consumer.set_default_rule(rule) Set default rule.
Default rule, when set, it catches all unroutable envelopes.
Parameters: rule (Rule) – default routing rule which catches all unmatched messages. Nonewill unset default rule
-
Consumer.add_handler(matcher_or_pattern, target, target_kwargs=None)[source] Construct and add a routing rule for the provided handler.
Handlers are added as routing rules. A routing rule (
Rule) is an object that contains a matcher (Matcher) instance, target (Handler subclass or a Router instance), and target kwargs. A routing rule is constructed and added routing rules list.matcher is an instance of
Matcher(can be a str explained later) which calling itsmatch()method, passingEnvelopeinstance, returnsTrueorFalseindicating match or no match.target can be a
Handlersubclass or aRouterinstance. When aRouterinstance is provided as a target, it will act as a sub-router which has its own routing logic. This way, a chain of routers can be constructed.target_kwargs is a dict passed to the handler
initialize()hook method.When finding a handler, the Rule Router (which the consumer is based on) goes through the list of rules sequentially in the order they were added, invoking the matcher’s match method of the each rule.
In case the router finds a match whose target is a router (sub-router) instance, its
find_handler()is invoked it find handlerIn case, the match target is a subclass of
Handler, the router creates a handler instance, invokes itsinitialize()method passing the handler kwargs, then returns it.The router stops upon first match, returning the handler to the consumer.
If a default_handler or default_rule is set, a default rule is added to the end of the routing rules list which will catch all unmatched messages.
The router returns
Nonewhen there is no match and no default_rule is set.Since the rules of the Rule Router (which the consumer is based on) are checked sequentially, more specific handlers should be added first, then generic ones later.
When passed matcher is a string, the default matcher
MessageTypeMatchesis constructed and the passed string is its message type regular expression stringParameters: - matcher (Matcher|str) – a
Matcherinstance used to determin the match or a pattern string used for the default matcher typeMessageTypeMatchesas its message type regx pattern. - target (Type[Handler]|Router) – a subclass of
Handleror aRouterinstance. - target_kwargs (dict) – a dict of kwargs that are passed to
handler
initialize()method. Only used when the target is aHandlersubclass
- matcher (Matcher|str) – a
-
Consumer.set_default_handler(default_target, default_target_kwargs=None)[source] Construct and add default routing rule for the given target
This method constructs a routing rule for the given target and pass it to
set_default_rule().Parameters: - default_target (Type[Handler]) – Default target, which will catch all unmatched message. “None” means unset default target, unmached messages will be sent to dlx.
- default_target_kwargs (dict) – a dict of kwargs that are passed to
handler
initialize()method. Only used when the target is aHandlersubclass
-
Consumer.set_retry_policy(retry_policy)[source] Set retry policy.
Retry policy can be an instance of any
RetryPolicysubclass.Parameters: retry_policy (RetryPolicy) – an instance of a retry policy
-
Consumer.unset_retry_policy()[source] Unset retry policy.