rabbitleap package¶
Submodules¶
rabbitleap.consumer module¶
The Consumer
is the main part of RabbitLeap framework. It’s responsible
for connecting to RabbitMQ, besides, receiving, acknowledging, and rejecting messages.
It handles connection failures and automatically reconnects to RabbitMQ. The consumer
consumes a share or exclusive RabbitMQ queue.
When the Consumer
receives a message from RabbitMQ, it packages the message
properties, payload, and delivery information in an Envelope
object,
preparing it for handling. The envelope then, is routed to its handler by the
Router
, which the consumer relies on to route envelopes to their handlers.
The consumer can be configured with a retry policy. The RetryPolicy
defines
how to do handling retries incase a timeout or an error happens.
-
class
rabbitleap.consumer.
Consumer
(amqp_url, queue_name, durable=True, exclusive=False, dlx_name=None, auto_reconnect=True, auto_reconnect_delay=3)[source]¶ Bases:
rabbitleap.routing.RuleRouter
RabbitMQ Consumer.
Public Methods:
start()
: start consumer.stop()
: stop consumer.restart()
: restart consumer.abort()
: reject message.skip()
: skip message handling.error()
: raiseHandlingError
exception to report a handling error.add_exchange_bind()
: add exchange bind to the bindings list.add_rule()
: add a routing rule to the routing rules list.set_default_rule()
: set default routing rule to catch all unmatched messagesadd_handler()
: add handler, it creates a routing rule then add it to the routing rules list.set_default_handler()
: set default handler, it creates a rule then set it as default routing ruleset_retry_policy()
: set a retry policyunset_retry_policy()
: un-set retry policy-
abort
(reason=None)[source]¶ Abort handling the message.
This can be called during
pre_handle()
orhandle()
to abort handling. It raisesAbortHandling
exception. The exception is handled by the consumer, and causes the consumer to reject the message.Raise: AbortHandling
when calledNOTE: when called inside the handler, the handler MUST re-raise
AbortHandling
exception to the consumer, in case the exception is handled inside.Parameters: reason (str) – Reason for aborting handling the message
-
add_exchange_bind
(exchange_name, routing_key, declare_exchange=False, declare_kwargs=None)[source]¶ Add exchange binding to the bindings list
NOTE: Actual exchange binding is happening during the consumer start up, when the connection with RabbitMQ established. Invoking this method after the connection is already established won’t take affect until the consumer re-establishes the connection. The method
restart()
can be called to force the consumer to disconnect and reconnect again, doing exchange binding as part of that process.Raise: AssertionError
if declare_exchange isTrue
and declare_kwargs isNone
or ‘type’ is not in declare_kwargsParameters: - exchange_name (str) – name of the exchange to bind to.
- routing_key (str) – binding routing key.
- declare_exchange (bool) – should declare exchange before binding.
- declare_kwargs (dict) – is exchange_declare() arguments.
declare_kwargs dict:
'exchange_type': `required`, 'durable': `optional`, default to ``False``. 'auto_delete': `optional`, default to ``False`` 'internal': `optional`, default to ``False`` `arguments`: `optional` additional exchange declaration arguments
-
add_handler
(matcher_or_pattern, target, target_kwargs=None)[source]¶ Construct and add a routing rule for the provided handler.
Handlers are added as routing rules. A routing rule (
Rule
) is an object that contains a matcher (Matcher
) instance, target (Handler subclass or a Router instance), and target kwargs. A routing rule is constructed and added routing rules list.matcher is an instance of
Matcher
(can be a str explained later) which calling itsmatch()
method, passingEnvelope
instance, returnsTrue
orFalse
indicating match or no match.target can be a
Handler
subclass or aRouter
instance. When aRouter
instance is provided as a target, it will act as a sub-router which has its own routing logic. This way, a chain of routers can be constructed.target_kwargs is a dict passed to the handler
initialize()
hook method.When finding a handler, the Rule Router (which the consumer is based on) goes through the list of rules sequentially in the order they were added, invoking the matcher’s match method of the each rule.
In case the router finds a match whose target is a router (sub-router) instance, its
find_handler()
is invoked it find handlerIn case, the match target is a subclass of
Handler
, the router creates a handler instance, invokes itsinitialize()
method passing the handler kwargs, then returns it.The router stops upon first match, returning the handler to the consumer.
If a default_handler or default_rule is set, a default rule is added to the end of the routing rules list which will catch all unmatched messages.
The router returns
None
when there is no match and no default_rule is set.Since the rules of the Rule Router (which the consumer is based on) are checked sequentially, more specific handlers should be added first, then generic ones later.
When passed matcher is a string, the default matcher
MessageTypeMatches
is constructed and the passed string is its message type regular expression stringParameters: - matcher (Matcher|str) – a
Matcher
instance used to determin the match or a pattern string used for the default matcher typeMessageTypeMatches
as its message type regx pattern. - target (Type[Handler]|Router) – a subclass of
Handler
or aRouter
instance. - target_kwargs (dict) – a dict of kwargs that are passed to
handler
initialize()
method. Only used when the target is aHandler
subclass
- matcher (Matcher|str) – a
-
error
(error_msg=None)[source]¶ Raise
HandlingError
exception.This method can be called inside the handler or invoked by the consumer in case of an error happened while handling the message. This method raises
HandlingError
which is handled by the consumer. The consumer will retry handling the message if a retry policy is set or reject the message incase no retry policy is set. When calling this method inside the handler, it should be called duringpre_handle()
orhandle()
.Raise: HandlingError
when calledNOTE: when called inside the handler, the handler should re-raise
HandlingError
exception to the consumer, in case the exception is handled inside.Parameters: error_msg (str) – Error message
-
set_default_handler
(default_target, default_target_kwargs=None)[source]¶ Construct and add default routing rule for the given target
This method constructs a routing rule for the given target and pass it to
set_default_rule()
.Parameters: - default_target (Type[Handler]) – Default target, which will catch all unmatched message. “None” means unset default target, unmached messages will be sent to dlx.
- default_target_kwargs (dict) – a dict of kwargs that are passed to
handler
initialize()
method. Only used when the target is aHandler
subclass
-
set_retry_policy
(retry_policy)[source]¶ Set retry policy.
Retry policy can be an instance of any
RetryPolicy
subclass.Parameters: retry_policy (RetryPolicy) – an instance of a retry policy
-
skip
(reason=None)[source]¶ Skip handling the message.
This can be called during
pre_handle()
orhandle()
to skip handling. It raisesSkipHandling
exception. The exception is handled by the consumer, and causes the consumer to ack and skip the message.Raise: SkipHandling
when calledNOTE: when called inside the handler, the handler should re-raise
SkipHandling
exception to the consumer, in case the exception is handled inside.Parameters: reason (str) – Reason for skipping handling the message
-
rabbitleap.envelope module¶
-
class
rabbitleap.envelope.
Envelope
(properties, payload, delivery_info)[source]¶ Bases:
object
Message envelope
Message properties, payload, and delivery information are all contained in this Envelope class. Envelope is what is passed to the handler instance for handling.
-
app_id
¶ Retrun message app id.
This is a shortcut for self.properties.app_id.
-
cluster_id
¶ Retrun message cluster id.
This is a shortcut for self.properties.cluster_id.
-
consumer_tag
¶ Retrun message consumer tag.
This is a shortcut for self.delivery_info.consumer_tag.
-
content_encoding
¶ Retrun message content encoding.
This is a shortcut for self.properties.content_encoding.
-
content_type
¶ Retrun message content type.
This is a shortcut for self.properties.content_type.
-
correlation_id
¶ Retrun message correlation id.
This is a shortcut for self.properties.correlation_id.
-
delivery_mode
¶ Retrun message delivery mode.
This is a shortcut for self.properties.delivery_mode.
-
delivery_tag
¶ Retrun message delivery tag.
This is a shortcut for self.delivery_info.delivery_tag.
-
exchange
¶ Retrun message exchange.
This is a shortcut for self.delivery_info.exchange.
-
expiration
¶ Retrun message expiration.
This is a shortcut for self.properties.expiration.
-
headers
¶ Retrun message headers.
This is a shortcut for self.properties.headers.
-
message_id
¶ Retrun message message id.
This is a shortcut for self.properties.message_id.
-
priority
¶ Retrun message priority.
This is a shortcut for self.properties.priority.
-
redelivered
¶ Retrun message redelivered.
This is a shortcut for self.delivery_info.redelivered.
-
reply_to
¶ Retrun message reply_to.
This is a shortcut for self.properties.reply_to.
-
routing_key
¶ Retrun message routing key.
This is a shortcut for self.delivery_info.routing_key.
-
timestamp
¶ Retrun message timestamp.
This is a shortcut for self.properties.timestamp.
-
type
¶ Retrun message type.
This is a shortcut for self.properties.type.
-
user_id
¶ Retrun user id.
This is a shortcut for self.properties.user_id.
-
rabbitleap.exceptions module¶
-
exception
rabbitleap.exceptions.
AbortHandling
(reason=None)[source]¶ Bases:
Exception
This exception is raised when
abort()
method is called.This exception is handled by the consumer to abort handling the message and reject it.
-
exception
rabbitleap.exceptions.
HandlingError
(error_msg=None)[source]¶ Bases:
Exception
This exception is raised when
error()
method is called.This exception is raise when
error()
method is called upon error in handling message. The exception is handled by the consumer to retry the handling message if a retry policy is set, or reject it otherwise.
rabbitleap.handling module¶
Handlers what actually consume message envelopes. When the router routes an
envelope and returns a handler instance to the Consumer
, the
Consumer
executes the handler by invoking its pre_handle()
,
handle()
, and post_handle()
methods respectively.
-
class
rabbitleap.handling.
Handler
(envelope, **kwargs)[source]¶ Bases:
object
Base class for envelope handlers.
envelope
is a reference to the given message envelope.Subclasses MUST implement
handle()
method.Methods:
initialize()
: initialization hook used to initialize the handler with kwargs.pre_handle()
: pre handling method invoked beforehandle()
.handle()
: to be implemented by the subclasses for the actual handling logic.post_handle()
: post handling method invoked afterhandle()
.-
handle
()[source]¶ Handle message envelope.
This method is invoked by the
Consumer
after invokingpre_handle()
. The actual envelope handling should happen in this method.Handler
subclasses MUST implement this method.
-
initialize
(**kwargs)[source]¶ Initialize handler.
This method is an initialization hook for the handler.
-
-
class
rabbitleap.handling.
MessageHandler
(consumer, envelope, **kwargs)[source]¶ Bases:
rabbitleap.handling.Handler
Message handler.
This class extens the
Handler
class with methods used to reject and skip envelopes, also report handling error. It hold a reference to:class:.Consumer instance which does the execution.Methods:
initialize()
: initialization method, a hook initialize the handler with kwargs.pre_handle()
: pre handling method invoked beforehandle()
.handle()
: overridden by the subclass implementing the handling logic.post_handle()
: post handling method invoked afterhandle()
.error()
: a shortcut forConsumer.error()
to raiseHandlingError
exception.abort()
: a shortcut forConsumer.abort()
to reject message.skip()
: a shortcut forConsumer.skip()
to skip message handling.-
abort
(reason=None)[source]¶ Abort handling the message.
This method is a shortcut for
Consumer.abort()
.NOTE: when called inside the handler, the handler should re-raise
AbortHandling
exception to the consumer if the exception is handled inside it.Parameters: reason (str) – Reason for aborting handling the message.
-
channel
¶ Shortcut for self.consumer.channel.
-
error
(error_msg=None)[source]¶ Raise
HandlingError
exception.This method is a shortcut for
Consumer.error()
.Raise: HandlingError
when called.NOTE: when called inside the handler, the handler should re-raise
HandlingError
exception to the consumer, in case the exception is handled inside.Parameters: error_msg (str) – Error message.
-
skip
(reason=None)[source]¶ Skip handling the message.
This method is a shortcut for
Consumer.skip()
.NOTE: when called inside the handler, the handler should re-raise
SkipHandling
exception to the consumer if the exception is handled inside it.Parameters: reason (str) – Reason for skipping handling the message.
-
rabbitleap.retry_policies module¶
Noting is perfect, errors and timeouts may happen, and when such failures happen, the
consumer has to decide what to do with that. By default, the consumer would reject the
envelope (RabbitMQ message) when a failure happens. However, errors and timeouts
issues, unless there is a software bug, usually solved with retries. Just like the
routing, the consumer doesn’t make the retry decision itself, the consumer delegates
it to a retry policy. Retry policy defines how the retry is performed. Retries
usually happens with back-offs to avoid worsening the situation by hammering other
services with more requests, especially if it was a timeout issue. The consumer can be
configured to use a retry policy by calling Consumer.set_retry_policy()
, passing
an instance of RetryPolicy
. When a retry policy is set, the consumer won’t
reject messages, but rather, it send them to the retry policy to deal with the
situation by invoking RetryPolicy.retry()
method. Based on it’s implementation,
The retry policy decides how to do retries.
There are 4 different retry policies available:
UnlimitedRetriesPolicy
, Unlimited retries policyLimitedRetriesPolicy
, Limited retries policyFixedDelayUnlimitedRetriesPolicy
, Fixed delay unlimited retries policyFixedDelayLimitedRetriesPolicy
, Fixed delay limited retries policy
Custom retry policies can be created by implementing the base class
RetryPolicy
-
class
rabbitleap.retry_policies.
BaseRetryPolicy
(consumer, retry_queue_suffix='retry', **kwargs)[source]¶ Bases:
rabbitleap.retry_policies.RetryPolicy
Base retry policy class for
UnlimitedRetriesPolicy
andLimitedRetriesPolicy
.It has implementation for geting mesage death count and retry queue creation.
-
declare_retry_queue
(delay)[source]¶ Declare a retry queue for the provided delay.
Each different delay has a different queue where all retry messages with the same delay will be sent to till they expire and get sent back to the original queue for handling retry. The queue is declared with a TTL and automatically gets deleted. The queue TTL is equal to the provided delay. The retry queue’s dead letter exchange is (default) direct exchange and the dead letter routing key is the original queue name where the messages originally came from. The messages will be sent back to the original queue when they reach their TTL, for handling retry.
The retry queue is redeclared before every a new message is sent to it. Redeclaration resets the queue’s TTL, preventing it from being destroyed.
Parameters: delay (int) – Retry delay in seconds Returns: retry queue name Return type: str
-
-
class
rabbitleap.retry_policies.
FixedDelayLimitedRetriesPolicy
(consumer, delay, retries_limit, retry_queue_suffix='retry', **kwargs)[source]¶ Bases:
rabbitleap.retry_policies.LimitedRetriesPolicy
Fixed delay limited retries policy.
This is an implementation of
RetryPolicy
which does fix backoff delay, limited number of retries.consumer
: consumer instancedelay
: retry delay in seconds.retries_limit
: retries limit count.retry_queue_suffix
: suffix str used when naming retry queues.
-
class
rabbitleap.retry_policies.
FixedDelayUnlimitedRetriesPolicy
(consumer, delay, retry_queue_suffix='retry', **kwargs)[source]¶ Bases:
rabbitleap.retry_policies.UnlimitedRetriesPolicy
Fixed delay unlimited retries policy.
This is an implementation of
RetryPolicy
which does fix backoff delay, unlimited retries.consumer
: consumer instancedelay
: retry delay in secondsretry_queue_suffix
: suffix str used when naming retry queues.
-
class
rabbitleap.retry_policies.
LimitedRetriesPolicy
(consumer, retry_delays, retry_queue_suffix='retry', **kwargs)[source]¶ Bases:
rabbitleap.retry_policies.BaseRetryPolicy
Limited Retries Policy.
This is an implementation of
RetryPolicy
which does incremental backoff, limited number of retries.consumer
: message consumer instanceretry_delays
: immutable list of retry backoff delays in seconds. Message is sent to dlx when this list is exhausted. e.g(1, 5, 10, 60, 5 * 60)
retry_queue_suffix
: suffix str used when naming retry queues.-
retry
(envelope)[source]¶ Send message to retry queue to retry handling it later.
Death count is calculated by examining ‘x-death’ header. Based on the death count, the message is sent to a retry queue where it waits there till it expires and gets sent back to the original queue for handling retry.
The death count is used as an index for retry_delays list. Where each item in the list represents a retry delay in seconds.
The message will be rejected if the death count exceeded the length of retry_delays list.
Parameters: envelope (Envelope) – Message envelope
-
-
class
rabbitleap.retry_policies.
RetryPolicy
(**kwargs)[source]¶ Bases:
object
Base class for retry policies.
Subclasses MUST implement
retry()
method.
-
class
rabbitleap.retry_policies.
UnlimitedRetriesPolicy
(consumer, initial_delay, max_delay, delay_incremented_by, retry_queue_suffix='retry', **kwargs)[source]¶ Bases:
rabbitleap.retry_policies.BaseRetryPolicy
Unlimited Retries Policy.
This is an implementation of
RetryPolicy
which does incremental backoff, unlimited retries.initial_delay
: is the initial/first backoff delay in secondsdelay_incremented_by
: is number of seconds the backoff should be incremented by after each deathmax_delay
: is the final/maximum backoff delay in seconds that should net be exceeded-
retry
(envelope)[source]¶ Send message to retry queue to retry handling it later.
Death count is calculated by examining ‘x-death’ header. Based on the death count, the message is sent to a retry queue where it waits there till it expires and gets sent back to the original queue for handling retry.
Parameters: envelope (Envelope) – Message envelope
-
rabbitleap.routing module¶
When the Consumer
receives a message from RabbitMQ, it prepares an
Envelope
object of that message, for the handler. However, the prepared
envelope somehow needs to be routed to its handler, where it’s actually consumed.
The envelope may be routed based on the message type, payload, or any other criteria;
It depends on the routing logic. For this reason, the consumer doesn’t make the
routing decisions itself, it delegates the routing to a router. The router sits
between the consumer and handlers. Its responsibility is, to route each incoming
envelope to its handler, returning the handler to the consumer for execution.
-
class
rabbitleap.routing.
AnyMatches
[source]¶ Bases:
rabbitleap.routing.Matcher
Match all messages macher
This matcher matches nothing. It always returns
False
. It’s used inRuleRouter
when a default rule is set to catch all unroutable envelopes
-
class
rabbitleap.routing.
Matcher
[source]¶ Bases:
object
Base class for matchers
This is the base class for matcher. Matcher is used by
Rule
to check if its target can handle the givenEnvelope
or notSubclasses MUST implement
match()
method.-
match
(envelope)[source]¶ Return
True
orFalse
indicating the target can handle the message.This method accepts an
Envelope
object as an argument and returns boolean, indicating whether the target can handle the envelope or not.Subclasses MUST implement this method
Parameters: envelope (Envelope) – Message envelope Returns: can handle or not Return type: bool
-
-
class
rabbitleap.routing.
MessageTypeMatches
(message_type_pattern)[source]¶ Bases:
rabbitleap.routing.Matcher
Match messages based on message type macher
This matcher does match based on the message type.
The message type is provided as a regular expression string or a compiled
re.Pattern
The matcher returns
True
when find a match in the message type, orFalse
otherwise.-
match
(envelope)[source]¶ Return
True
orFalse
indicating the target can handle the message.This method accepts an
Envelope
object as an argument and returns boolean, indicating whether the target can handle the envelope or not.Subclasses MUST implement this method
Parameters: envelope (Envelope) – Message envelope Returns: can handle or not Return type: bool
-
-
class
rabbitleap.routing.
NoneMatches
[source]¶ Bases:
rabbitleap.routing.Matcher
Match noting macher
This matcher matches nothing. It always returns
False
.
-
class
rabbitleap.routing.
Router
(**kwargs)[source]¶ Bases:
object
Base class for routers.
Subclasses MUST implement
find_handler()
-
class
rabbitleap.routing.
Rule
(matcher, target, target_kwargs=None)[source]¶ Bases:
object
A matching rule.
A rule (routing rule) is an object that links a matcher (
Matcher
) instance, to a target.The matcher is used to determines whether the target can handler the envelope. The target can be a
Handler
class or aRouter
instance (sub-router). The target arguments is a dictionary passed to the newly created handler for initialization.
-
class
rabbitleap.routing.
RuleRouter
(consumer, default_rule=None)[source]¶ Bases:
rabbitleap.routing.Router
Rule router.
Rule Router is an implementation of the base class
Router
. It uses routing rules to route envelopes to handlers. The rule router maintains a list routing rules, through which it goes sequentially to find a handler for a given envelope. Rules added to the router are appended to the end of its rules list, and since the router goes through the routing rules sequentially in the order they’re added, more specific rules should be added first, then the general ones later.A routing rule is an instance of the class
Rule
. It has 3 fields, aMatcher
, target, and target arguments field. The matcher is used to determines whether the target can handle the envelope. The target can be aHandler
class or aRouter
instance (sub-router). The target arguments is a dictionary passed to the newly created handler for initialization.When the target is a
Handler
class, the router creates an instance of it, then returns the instance to the caller. However, if the target is aRouter
instance, it would act as a sub-router (child router). The parent router delegates finding the handler to the child router. The sub-router doesn’t have to be of the same type, it can be anyRouter
implementation. Chained routers let one router delegates the routing to the next one.The rule router returns
None
when the given envelope is unroutable (no handler can handle it). However, the rule router may be configured with a default routing rule which catches all unroutable envelopes, checkset_default_rule()
.Rule router always creates a new handler instance when its
find_handler()
is called, even for the same envelope, except when the rule’s target is aRouter
instance, which may have different implementation and may not return a new instance.Actually, the
Consumer
itself is a Rule Router. It implements extra stuff to communicate with RabbitMQ.-
add_rule
(rule)[source]¶ Add a routing rule to the routing rules list.
A routing rule is added to the end of routing rules list, just before the default rule which is always the last one.
A routing rule is an instance of the class
Rule
. It has 3 fields, aMatcher
, target, and target arguments field. The matcher is used to determines whether the target can handler the envelope. The target can be aHandler
class or aRouter
instance (sub-router). The target arguments is a dictionary passed to the newly created handler for initialization.Since the rules of the Rule Router are checked sequentially in the order they added, more specific rules should be added first, then generic ones later.
Parameters: rule (Rule) – routing rule instance.
-
find_handler
(envelope)[source]¶ Find and return a handler
The router goes through the routing rules list sequentially to find a handler for the given envelope.
When the target, in matched
Rule
, is aHandler
class, an instance of it is created and returned. However, if the target is aRouter
instance, it would act as a sub-router. The sub-router’sfind_handler()
is invoked to get aHandler
instance.None
is returned when the given envelope is unroutable (no handler can handle it). However, if a default rule is set, its handler instance will be returnedNOTE: The router always creates a new handler instance for each find handler call, even for the same message.
Parameters: envelope (Envelope) – Message envelope Return Handler: Handler
instance
-