AMQP

sir.amqp.setup.setup_rabbitmq(args)[source]

Set up the AMQP server.

Parameters:args – will be ignored
sir.amqp.handler.callback_wrapper(f)[source]

Common wrapper for a message callback function that provides basic sanity checking for messages and provides exception handling for a function it wraps.

The following wrapper function is returned:

sir.amqp.handler.wrapper(self, msg, queue)
Parameters:

Calls f with self and an instance of Message. If an exception gets raised by f, it will be caught and the message will be rejected and sent to the search.failed queue (cf. Queues). Then the exception will not be reraised.

If no exception is raised, the message will be acknowledged.

sir.amqp.handler.watch(args)[source]

Watch AMQP queues for messages.

Parameters:entity_type ([str]) – Entity types to watch.
class sir.amqp.handler.Handler(entities)[source]

Bases: object

This class is used to provide callbacks for AMQP messages and access to Solr cores.

ack_message(msg, *args, **kwargs)[source]
connect_to_rabbitmq(reconnect=False)[source]
delete_callback(msg, queue)[source]

Callback for processing delete messages.

Messages for deletion have the following format:

<table name>, <id or gid>

First value is a table name for an entity that has been deleted. Second is GID or ID of the row in that table. For example:

{“_table”: “release”, “gid”: “90d7709d-feba-47e6-a2d1-8770da3c3d9c”}

This callback function is expected to receive messages only from entity tables all of which have a gid column on them except the ones in _ID_DELETE_TABLE_NAMES which are deleted via their id.

Parameters:parsed_message (sir.amqp.message.Message) – Message parsed by the callback_wrapper.
index_callback(msg, queue)[source]

Callback for processing index messages.

Messages for indexing have the following format:

<table name>, keys{<column name>, <value>}

First value is a table name, followed by primary key values for that table. These are then used to lookup values that need to be updated. For example:

{“_table”: “artist_credit_name”, “position”: 0, “artist_credit”: 1}

In this handler we are doing a selection with joins which follow a “path” from a table that the trigger was received from to an entity (later “core”, https://wiki.apache.org/solr/SolrTerminology). To know which data to retrieve we are using PK(s) of a table that was updated. update_map provides us with a view of dependencies between entities (cores) and all the tables. So if data in some table has been updated, we know which entities store this data in the index and need to be refreshed.

Parameters:parsed_message (sir.amqp.message.Message) – Message parsed by the callback_wrapper.
process_messages()[source]
reject_message(msg, *args, **kwargs)[source]
requeue_message(msg, *args, **kwargs)[source]
sir.amqp.handler._DEFAULT_MB_RETRIES = 4

The number of times we’ll try to process a message.

sir.amqp.handler._RETRY_WAIT_SECS = 30

The number of seconds between each connection attempt to the AMQP server.

This module contains functions and classes to parse and represent the content of an AMQP message.

exception sir.amqp.message.InvalidMessageContentException[source]

Bases: exceptions.ValueError

Exception indicating an error with the content of an AMQP message.

class sir.amqp.message.MESSAGE_TYPES

Bases: enum.Enum

delete = 1
index = 2
class sir.amqp.message.Message(message_type, table_name, columns, operation)[source]

Bases: object

A parsed message from AMQP.

Construct a new message object.

A message contains a set of columns (dict) which can be used to determine which row has been updated. In case of messages from the index queue it will be a set of PK columns, and gid column for delete queue messages.

Parameters:
  • message_type – Type of the message. A member of MESSAGE_TYPES.
  • table_name (str) – Name of the table the message is associated with.
  • columns (dict) – Dictionary mapping columns of the table to their values.
classmethod from_amqp_message(queue_name, amqp_message)[source]

Parses an AMQP message.

Parameters:
  • queue_name (str) – Name of the queue where the message originated from.
  • amqp_message (amqp.basic_message.Message) – Message object from the queue.
Return type:

sir.amqp.message.Message