Message Broker Modules

class action_triggers.message_broker.base.BrokerBase(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: ABC

Base class for a message broker. This class should be subclassed to implement the specific message broker.

  • conn_details – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

abstract property conn_class: Type[ConnectionBase]

The connection class to use for establishing a connection to the message broker.

async send_message(message: str) None[source]

Starts a connection with the message broker and sends a message.


message – The message to send to the message broker.

class action_triggers.message_broker.base.ConnectionBase(config: dict, conn_details: dict, params: dict)[source]

Bases: ABC

Base class for establishing a connection to a message broker. This class provides the capability to marry the configuration provided in the settings with the connection details and parameters provided by the user. However, this enforces a one-sided relationship where the user cannot overwrite the base configuration as defined in the settings as the base configuration always takes precedence.

  • config – The configuration for the message broker as defined in settings.ACTION_TRIGGERS[“brokers”].

  • conn_details – Additional connection parameters to use for establishing the connection provided by the user.

  • params – Additional parameters to use for the message broker provided by the user.

abstract async close() None[source]

Close the connection to the message broker.

property conn_details: dict

Lazy load the the merged connection details. When merging the connection details, the user provided base connection details take precedence over the base configuration.


The merged connection details.

abstract async connect() None[source]

Establish a connection to the message broker.

property params: dict

Lazy load the the merged parameters. When merging the parameters, the user provided base parameters take precedence over the base configuration.


The merged parameters.

abstract property required_conn_detail_fields: Sequence[RequiredFieldBase]

The required connection detail fields that must be provided by the user.

abstract property required_params_fields: Sequence[RequiredFieldBase]

The required parameters fields that must be provided by the user.

validate() None[source]

Validate the connection parameters. Raise an exception if invalid.

validate_connection_details_not_overwritten() None[source]

Validate that the base connection details are not overwritten.

validate_params_not_overwritten() None[source]

Validate that the base parameters are not overwritten.

validate_required_conn_details() None[source]

Validate that the required connection details are present.

static validate_required_keys(required_fields: Sequence[RequiredFieldBase], settings_context: Dict[str, Any], err_fn: Callable[[str, str], None]) None[source]

Validate that the required keys are present in the connection details and parameters.

  • required_fields – The required fields to check.

  • settings_context – The context to check.

  • err_fn – The function to call if the required fields are not

validate_required_params() None[source]

Validate that the required parameters are present. str) Type[BrokerBase][source]

Get the broker class based on the broker name.


broker_name – The name of the broker.


The broker class.


ValueError – If the broker name is invalid.

class action_triggers.message_broker.enums.BrokerType(value)[source]

Bases: Enum

Represents the types of brokers supported by the application.

AWS_SNS = 'aws_sns'
AWS_SQS = 'aws_sqs'
KAFKA = 'kafka'
RABBITMQ = 'rabbitmq'
REDIS = 'redis'

Contains the error class for generic message broker errors relating to the connection and parameters.

class action_triggers.message_broker.error.MessageBrokerError[source]

Bases: ErrorBase

A class for storing errors for a message broker.

add_connection_params_error(key: str, message: str)
add_params_error(key: str, message: str)

A class for storing errors for a specific field.


field_name – The name of the field.


alias of ConnectionValidationError


A class for storing errors for a specific field.


field_name – The name of the field.

exception action_triggers.message_broker.exceptions.ConnectionValidationError(message: dict)[source]

Bases: RuntimeError

Exception raised when connection parameters are invalid.

as_dict() dict[source]

Return the error message as a dictionary.

as_json() str[source]

Return the error message as a JSON string.

Module to support sending messages to Kafka.

class action_triggers.message_broker.kafka.KafkaBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for Kafka.

broker_type = 'kafka'

alias of KafkaConnection

class action_triggers.message_broker.kafka.KafkaConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for Kafka.

async close() None[source]

Close the connection to the message broker.

async connect() None[source]

Establish a connection to the message broker.

required_conn_detail_fields = (HasField('bootstrap_servers'),)
required_params_fields = (HasField('topic'),)

Module to support sending messages to RabbitMQ.

class action_triggers.message_broker.rabbitmq.RabbitMQBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for RabbitMQ.

  • broker_key – The key for the broker (must existing in settings.ACTION_TRIGGERS).

  • conn_params – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

broker_type = 'rabbitmq'

alias of RabbitMQConnection

class action_triggers.message_broker.rabbitmq.RabbitMQConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for RabbitMQ.

async close() None[source]

Close the connection to the message broker.

async connect() None[source]

Establish a connection to the message broker.

required_conn_detail_fields = ()
required_params_fields = (HasField('queue'),)
validate() None[source]

Validate the connection parameters. Raise an exception if invalid.

Module to support sending messages to Redis.

class action_triggers.message_broker.redis.RedisBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for Redis.

  • broker_key – The key for the broker (must existing in settings.ACTION_TRIGGERS).

  • conn_params – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

broker_type = 'redis'

alias of RedisConnection

class action_triggers.message_broker.redis.RedisConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for Redis.

async close() None[source]

Close the connection to the Redis server.

async connect() None[source]

Connect to the Redis server.

required_conn_detail_fields = (HasAtLeastOneOffField('url, host'),)
required_params_fields = (HasField('channel'),)

Module to support sending messages to AWS SQS.

class action_triggers.message_broker.aws_sqs.AwsSqsBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for AWS SQS.

  • broker_key – The key for the broker (must exist in the settings.ACTION_TRIGGERS[“brokers”] dictionary)).

  • conn_params – The connection parameters to use for establishing the connection to the broker.

broker_type = 'aws_sqs'

alias of AwsSqsConnection

class action_triggers.message_broker.aws_sqs.AwsSqsConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for AWS SQS.

async close() None[source]

Close the connection to the AWS SQS service.

async connect() None[source]

Connect to the AWS SQS service.

get_queue_url() str[source]

Get the queue URL from the parameters or fetch it from AWS using the queue name.


The queue URL.

required_conn_detail_fields = (HasField('endpoint_url'),)
required_params_fields = (HasAtLeastOneOffField('queue_url, queue_name'),)

Module to support sending messages to AWS SNS.

class action_triggers.message_broker.aws_sns.AwsSnsBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for AWS SQS.

  • broker_key – The key for the broker (must exist in the settings.ACTION_TRIGGERS[“brokers”] dictionary)).

  • conn_params – The connection parameters to use for establishing the connection to the broker.

broker_type = 'aws_sns'

alias of AwsSnsConnection

class action_triggers.message_broker.aws_sns.AwsSnsConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for AWS SNS.

async close() None[source]

Close the connection to the AWS SNS service.

async connect() None[source]

Connect to the AWS SQS service.

required_conn_detail_fields = (HasField('endpoint_url'),)
required_params_fields = (HasField('topic_arn'),)