Source code for sir.amqp.message

#!/usr/bin/env python
# coding: utf-8
# Copyright (c) 2014 Wieland Hoffmann
# License: MIT, see LICENSE for details
"""
This module contains functions and classes to parse and represent the content
of an AMQP message.
"""
from sir.trigger_generation.sql_generator import MSG_JSON_TABLE_NAME_KEY, MSG_JSON_OPERATION_TYPE
from enum import Enum
import ujson

MESSAGE_TYPES = Enum("MESSAGE_TYPES", "delete index")

QUEUE_TO_TYPE = {
    "search.delete": MESSAGE_TYPES.delete,
    "search.index": MESSAGE_TYPES.index,
}


[docs]class Message(object): """ A parsed message from AMQP. """ def __init__(self, message_type, table_name, columns, operation): """ Construct a new message object. A message contains a set of columns (dict) which can be used to determine which row has been updated. In case of messages from the `index` queue it will be a set of PK columns, and `gid` column for `delete` queue messages. :param message_type: Type of the message. A member of :class:`MESSAGE_TYPES`. :param str table_name: Name of the table the message is associated with. :param dict columns: Dictionary mapping columns of the table to their values. """ self.message_type = message_type self.table_name = table_name self.columns = columns self.operation = operation
[docs] @classmethod def from_amqp_message(cls, queue_name, amqp_message): """ Parses an AMQP message. :param str queue_name: Name of the queue where the message originated from. :param amqp.basic_message.Message amqp_message: Message object from the queue. :rtype: :class:`sir.amqp.message.Message` """ if queue_name not in QUEUE_TO_TYPE.keys(): raise ValueError("Unknown queue: %s" % queue_name) else: message_type = QUEUE_TO_TYPE[queue_name] try: data = ujson.loads(amqp_message.body) except ValueError as e: raise InvalidMessageContentException("Invalid message format (expected JSON): %s" % e) table_name = data.pop(MSG_JSON_TABLE_NAME_KEY, None) if not table_name: raise InvalidMessageContentException("Table name is missing") # After table name is extracted from the message only PK(s) should be left. if not data: # For the `index` queue the data will be a set of PKs, and for `delete` # queue it will be a GID value. raise InvalidMessageContentException("Reference values are not specified") operation = data.pop(MSG_JSON_OPERATION_TYPE, "") return cls(message_type, table_name, data, operation)
[docs]class InvalidMessageContentException(ValueError): """ Exception indicating an error with the content of an AMQP message. """ pass