Consumer

The Consumer is the main part of RabbitLeap framework. It’s responsible for connecting to RabbitMQ, besides, receiving, acknowledging, and rejecting messages. It handles connection failures and automatically reconnects to RabbitMQ. The consumer consumes a share or exclusive RabbitMQ queue.

When the Consumer receives a message from RabbitMQ, it packages the message properties, payload, and delivery information in an Envelope object, preparing it for handling. The envelope then, is routed to its handler by the Router, which the consumer relies on to route envelopes to their handlers.

The consumer can be configured with a retry policy. The RetryPolicy defines how to do handling retries incase a timeout or an error happens.

Consumer

class rabbitleap.consumer.Consumer(amqp_url, queue_name, durable=True, exclusive=False, dlx_name=None, auto_reconnect=True, auto_reconnect_delay=3)[source]

Bases: rabbitleap.routing.RuleRouter

RabbitMQ Consumer.

Public Methods:

start(): start consumer.

stop(): stop consumer.

restart(): restart consumer.

abort(): reject message.

skip(): skip message handling.

error(): raise HandlingError exception to report a handling error.

add_exchange_bind(): add exchange bind to the bindings list.

add_rule(): add a routing rule to the routing rules list.

set_default_rule(): set default routing rule to catch all unmatched messages

add_handler(): add handler, it creates a routing rule then add it to the routing rules list.

set_default_handler(): set default handler, it creates a rule then set it as default routing rule

set_retry_policy(): set a retry policy

unset_retry_policy(): un-set retry policy

Consumer.start()[source]

Start consumer.

Consumer.stop()[source]

Stop consumer.

Consumer.restart()[source]

Restart consumer.

Close the connection and reconnect again.

Consumer.abort(reason=None)[source]

Abort handling the message.

This can be called during pre_handle() or handle() to abort handling. It raises AbortHandling exception. The exception is handled by the consumer, and causes the consumer to reject the message.

Raise:AbortHandling when called

NOTE: when called inside the handler, the handler MUST re-raise AbortHandling exception to the consumer, in case the exception is handled inside.

Parameters:reason (str) – Reason for aborting handling the message
Consumer.skip(reason=None)[source]

Skip handling the message.

This can be called during pre_handle() or handle() to skip handling. It raises SkipHandling exception. The exception is handled by the consumer, and causes the consumer to ack and skip the message.

Raise:SkipHandling when called

NOTE: when called inside the handler, the handler should re-raise SkipHandling exception to the consumer, in case the exception is handled inside.

Parameters:reason (str) – Reason for skipping handling the message
Consumer.error(error_msg=None)[source]

Raise HandlingError exception.

This method can be called inside the handler or invoked by the consumer in case of an error happened while handling the message. This method raises HandlingError which is handled by the consumer. The consumer will retry handling the message if a retry policy is set or reject the message incase no retry policy is set. When calling this method inside the handler, it should be called during pre_handle() or handle().

Raise:HandlingError when called

NOTE: when called inside the handler, the handler should re-raise HandlingError exception to the consumer, in case the exception is handled inside.

Parameters:error_msg (str) – Error message
Consumer.add_exchange_bind(exchange_name, routing_key, declare_exchange=False, declare_kwargs=None)[source]

Add exchange binding to the bindings list

NOTE: Actual exchange binding is happening during the consumer start up, when the connection with RabbitMQ established. Invoking this method after the connection is already established won’t take affect until the consumer re-establishes the connection. The method restart() can be called to force the consumer to disconnect and reconnect again, doing exchange binding as part of that process.

Raise:

AssertionError if declare_exchange is True and declare_kwargs is None or ‘type’ is not in declare_kwargs

Parameters:
  • exchange_name (str) – name of the exchange to bind to.
  • routing_key (str) – binding routing key.
  • declare_exchange (bool) – should declare exchange before binding.
  • declare_kwargs (dict) – is exchange_declare() arguments.

declare_kwargs dict:

'exchange_type': `required`,
'durable': `optional`, default to ``False``.
'auto_delete': `optional`, default to ``False``
'internal': `optional`, default to ``False``
`arguments`: `optional` additional exchange declaration arguments
Consumer.add_rule(rule)

Add a routing rule to the routing rules list.

A routing rule is added to the end of routing rules list, just before the default rule which is always the last one.

A routing rule is an instance of the class Rule. It has 3 fields, a Matcher, target, and target arguments field. The matcher is used to determines whether the target can handler the envelope. The target can be a Handler class or a Router instance (sub-router). The target arguments is a dictionary passed to the newly created handler for initialization.

Since the rules of the Rule Router are checked sequentially in the order they added, more specific rules should be added first, then generic ones later.

Parameters:rule (Rule) – routing rule instance.
Consumer.set_default_rule(rule)

Set default rule.

Default rule, when set, it catches all unroutable envelopes.

Parameters:rule (Rule) – default routing rule which catches all unmatched messages. None will unset default rule
Consumer.add_handler(matcher_or_pattern, target, target_kwargs=None)[source]

Construct and add a routing rule for the provided handler.

Handlers are added as routing rules. A routing rule (Rule) is an object that contains a matcher (Matcher) instance, target (Handler subclass or a Router instance), and target kwargs. A routing rule is constructed and added routing rules list.

matcher is an instance of Matcher (can be a str explained later) which calling its match() method, passing Envelope instance, returns True or False indicating match or no match.

target can be a Handler subclass or a Router instance. When a Router instance is provided as a target, it will act as a sub-router which has its own routing logic. This way, a chain of routers can be constructed.

target_kwargs is a dict passed to the handler initialize() hook method.

When finding a handler, the Rule Router (which the consumer is based on) goes through the list of rules sequentially in the order they were added, invoking the matcher’s match method of the each rule.

In case the router finds a match whose target is a router (sub-router) instance, its find_handler() is invoked it find handler

In case, the match target is a subclass of Handler, the router creates a handler instance, invokes its initialize() method passing the handler kwargs, then returns it.

The router stops upon first match, returning the handler to the consumer.

If a default_handler or default_rule is set, a default rule is added to the end of the routing rules list which will catch all unmatched messages.

The router returns None when there is no match and no default_rule is set.

Since the rules of the Rule Router (which the consumer is based on) are checked sequentially, more specific handlers should be added first, then generic ones later.

When passed matcher is a string, the default matcher MessageTypeMatches is constructed and the passed string is its message type regular expression string

Parameters:
  • matcher (Matcher|str) – a Matcher instance used to determin the match or a pattern string used for the default matcher type MessageTypeMatches as its message type regx pattern.
  • target (Type[Handler]|Router) – a subclass of Handler or a Router instance.
  • target_kwargs (dict) – a dict of kwargs that are passed to handler initialize() method. Only used when the target is a Handler subclass
Consumer.set_default_handler(default_target, default_target_kwargs=None)[source]

Construct and add default routing rule for the given target

This method constructs a routing rule for the given target and pass it to set_default_rule().

Parameters:
  • default_target (Type[Handler]) – Default target, which will catch all unmatched message. “None” means unset default target, unmached messages will be sent to dlx.
  • default_target_kwargs (dict) – a dict of kwargs that are passed to handler initialize() method. Only used when the target is a Handler subclass
Consumer.set_retry_policy(retry_policy)[source]

Set retry policy.

Retry policy can be an instance of any RetryPolicy subclass.

Parameters:retry_policy (RetryPolicy) – an instance of a retry policy
Consumer.unset_retry_policy()[source]

Unset retry policy.