Message Broker Modules

class action_triggers.message_broker.base.BrokerBase(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: ABC

Base class for a message broker. This class should be subclassed to implement the specific message broker.

Parameters:
  • conn_details – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

abstract property conn_class: Type[ConnectionBase]

The connection class to use for establishing a connection to the message broker.

async send_message(message: str) None[source]

Starts a connection with the message broker and sends a message.

Parameters:

message – The message to send to the message broker.

class action_triggers.message_broker.base.ConnectionBase(config: dict, conn_details: dict, params: dict)[source]

Bases: ABC

Base class for establishing a connection to a message broker. This class provides the capability to marry the configuration provided in the settings with the connection details and parameters provided by the user. However, this enforces a one-sided relationship where the user cannot overwrite the base configuration as defined in the settings as the base configuration always takes precedence.

Parameters:
  • config – The configuration for the message broker as defined in settings.ACTION_TRIGGERS[“brokers”].

  • conn_details – Additional connection parameters to use for establishing the connection provided by the user.

  • params – Additional parameters to use for the message broker provided by the user.

abstract async close() None[source]

Close the connection to the message broker.

property conn_details: dict

Lazy load the the merged connection details. When merging the connection details, the user provided base connection details take precedence over the base configuration.

Returns:

The merged connection details.

abstract async connect() None[source]

Establish a connection to the message broker.

property params: dict

Lazy load the the merged parameters. When merging the parameters, the user provided base parameters take precedence over the base configuration.

Returns:

The merged parameters.

abstract property required_conn_detail_fields: Sequence[RequiredFieldBase]

The required connection detail fields that must be provided by the user.

abstract property required_params_fields: Sequence[RequiredFieldBase]

The required parameters fields that must be provided by the user.

validate() None[source]

Validate the connection parameters. Raise an exception if invalid.

validate_connection_details_not_overwritten() None[source]

Validate that the base connection details are not overwritten.

validate_params_not_overwritten() None[source]

Validate that the base parameters are not overwritten.

validate_required_conn_details() None[source]

Validate that the required connection details are present.

static validate_required_keys(required_fields: Sequence[RequiredFieldBase], settings_context: Dict[str, Any], err_fn: Callable[[str, str], None]) None[source]

Validate that the required keys are present in the connection details and parameters.

Parameters:
  • required_fields – The required fields to check.

  • settings_context – The context to check.

  • err_fn – The function to call if the required fields are not

validate_required_params() None[source]

Validate that the required parameters are present.

action_triggers.message_broker.broker.get_broker_class(broker_name: str) Type[BrokerBase][source]

Get the broker class based on the broker name.

Parameters:

broker_name – The name of the broker.

Returns:

The broker class.

Raises:

ValueError – If the broker name is invalid.

class action_triggers.message_broker.enums.BrokerType(value)[source]

Bases: Enum

Represents the types of brokers supported by the application.

AWS_SNS = 'aws_sns'
AWS_SQS = 'aws_sqs'
KAFKA = 'kafka'
RABBITMQ = 'rabbitmq'
REDIS = 'redis'

Contains the error class for generic message broker errors relating to the connection and parameters.

class action_triggers.message_broker.error.MessageBrokerError[source]

Bases: ErrorBase

A class for storing errors for a message broker.

add_connection_params_error(key: str, message: str)
add_params_error(key: str, message: str)
connection_params

A class for storing errors for a specific field.

Parameters:

field_name – The name of the field.

error_class

alias of ConnectionValidationError

params

A class for storing errors for a specific field.

Parameters:

field_name – The name of the field.

exception action_triggers.message_broker.exceptions.ConnectionValidationError(message: dict)[source]

Bases: RuntimeError

Exception raised when connection parameters are invalid.

as_dict() dict[source]

Return the error message as a dictionary.

as_json() str[source]

Return the error message as a JSON string.

Module to support sending messages to Kafka.

class action_triggers.message_broker.kafka.KafkaBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for Kafka.

broker_type = 'kafka'
conn_class

alias of KafkaConnection

class action_triggers.message_broker.kafka.KafkaConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for Kafka.

async close() None[source]

Close the connection to the message broker.

async connect() None[source]

Establish a connection to the message broker.

required_conn_detail_fields = (HasField('bootstrap_servers'),)
required_params_fields = (HasField('topic'),)

Module to support sending messages to RabbitMQ.

class action_triggers.message_broker.rabbitmq.RabbitMQBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for RabbitMQ.

Parameters:
  • broker_key – The key for the broker (must existing in settings.ACTION_TRIGGERS).

  • conn_params – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

broker_type = 'rabbitmq'
conn_class

alias of RabbitMQConnection

class action_triggers.message_broker.rabbitmq.RabbitMQConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for RabbitMQ.

async close() None[source]

Close the connection to the message broker.

async connect() None[source]

Establish a connection to the message broker.

required_conn_detail_fields = ()
required_params_fields = (HasField('queue'),)
validate() None[source]

Validate the connection parameters. Raise an exception if invalid.

Module to support sending messages to Redis.

class action_triggers.message_broker.redis.RedisBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for Redis.

Parameters:
  • broker_key – The key for the broker (must existing in settings.ACTION_TRIGGERS).

  • conn_params – The connection parameters to use for establishing the connection.

  • params – Additional parameters to use for the message broker.

  • kwargs – Additional keyword arguments to pass to the subclass.

broker_type = 'redis'
conn_class

alias of RedisConnection

class action_triggers.message_broker.redis.RedisConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for Redis.

async close() None[source]

Close the connection to the Redis server.

async connect() None[source]

Connect to the Redis server.

required_conn_detail_fields = (HasAtLeastOneOffField('url, host'),)
required_params_fields = (HasField('channel'),)

Module to support sending messages to AWS SQS.

class action_triggers.message_broker.aws_sqs.AwsSqsBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for AWS SQS.

Parameters:
  • broker_key – The key for the broker (must exist in the settings.ACTION_TRIGGERS[“brokers”] dictionary)).

  • conn_params – The connection parameters to use for establishing the connection to the broker.

broker_type = 'aws_sqs'
conn_class

alias of AwsSqsConnection

class action_triggers.message_broker.aws_sqs.AwsSqsConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for AWS SQS.

async close() None[source]

Close the connection to the AWS SQS service.

async connect() None[source]

Connect to the AWS SQS service.

get_queue_url() str[source]

Get the queue URL from the parameters or fetch it from AWS using the queue name.

Returns:

The queue URL.

required_conn_detail_fields = (HasField('endpoint_url'),)
required_params_fields = (HasAtLeastOneOffField('queue_url, queue_name'),)

Module to support sending messages to AWS SNS.

class action_triggers.message_broker.aws_sns.AwsSnsBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]

Bases: BrokerBase

Broker class for AWS SQS.

Parameters:
  • broker_key – The key for the broker (must exist in the settings.ACTION_TRIGGERS[“brokers”] dictionary)).

  • conn_params – The connection parameters to use for establishing the connection to the broker.

broker_type = 'aws_sns'
conn_class

alias of AwsSnsConnection

class action_triggers.message_broker.aws_sns.AwsSnsConnection(config: dict, conn_details: dict, params: dict)[source]

Bases: ConnectionBase

Connection class for AWS SNS.

async close() None[source]

Close the connection to the AWS SNS service.

async connect() None[source]

Connect to the AWS SQS service.

required_conn_detail_fields = (HasField('endpoint_url'),)
required_params_fields = (HasField('topic_arn'),)