Message Broker Modules
- class action_triggers.message_broker.base.BrokerBase(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]
Bases:
ABC
Base class for a message broker. This class should be subclassed to implement the specific message broker.
- Parameters:
conn_details – The connection parameters to use for establishing the connection.
params – Additional parameters to use for the message broker.
kwargs – Additional keyword arguments to pass to the subclass.
- abstract property conn_class: Type[ConnectionBase]
The connection class to use for establishing a connection to the message broker.
- class action_triggers.message_broker.base.ConnectionBase(config: dict, conn_details: dict, params: dict)[source]
Bases:
ABC
Base class for establishing a connection to a message broker. This class provides the capability to marry the configuration provided in the settings with the connection details and parameters provided by the user. However, this enforces a one-sided relationship where the user cannot overwrite the base configuration as defined in the settings as the base configuration always takes precedence.
- Parameters:
config – The configuration for the message broker as defined in settings.ACTION_TRIGGERS[“brokers”].
conn_details – Additional connection parameters to use for establishing the connection provided by the user.
params – Additional parameters to use for the message broker provided by the user.
- property conn_details: dict
Lazy load the the merged connection details. When merging the connection details, the user provided base connection details take precedence over the base configuration.
- Returns:
The merged connection details.
- property params: dict
Lazy load the the merged parameters. When merging the parameters, the user provided base parameters take precedence over the base configuration.
- Returns:
The merged parameters.
- abstract property required_conn_detail_fields: Sequence[RequiredFieldBase]
The required connection detail fields that must be provided by the user.
- abstract property required_params_fields: Sequence[RequiredFieldBase]
The required parameters fields that must be provided by the user.
- validate_connection_details_not_overwritten() None [source]
Validate that the base connection details are not overwritten.
- validate_params_not_overwritten() None [source]
Validate that the base parameters are not overwritten.
- validate_required_conn_details() None [source]
Validate that the required connection details are present.
- static validate_required_keys(required_fields: Sequence[RequiredFieldBase], settings_context: Dict[str, Any], err_fn: Callable[[str, str], None]) None [source]
Validate that the required keys are present in the connection details and parameters.
- Parameters:
required_fields – The required fields to check.
settings_context – The context to check.
err_fn – The function to call if the required fields are not
- action_triggers.message_broker.broker.get_broker_class(broker_name: str) Type[BrokerBase] [source]
Get the broker class based on the broker name.
- Parameters:
broker_name – The name of the broker.
- Returns:
The broker class.
- Raises:
ValueError – If the broker name is invalid.
- class action_triggers.message_broker.enums.BrokerType(value)[source]
Bases:
Enum
Represents the types of brokers supported by the application.
- AWS_SNS = 'aws_sns'
- AWS_SQS = 'aws_sqs'
- KAFKA = 'kafka'
- RABBITMQ = 'rabbitmq'
- REDIS = 'redis'
Contains the error class for generic message broker errors relating to the connection and parameters.
- class action_triggers.message_broker.error.MessageBrokerError[source]
Bases:
ErrorBase
A class for storing errors for a message broker.
- add_connection_params_error(key: str, message: str)
- add_params_error(key: str, message: str)
- connection_params
A class for storing errors for a specific field.
- Parameters:
field_name – The name of the field.
- error_class
alias of
ConnectionValidationError
- params
A class for storing errors for a specific field.
- Parameters:
field_name – The name of the field.
- exception action_triggers.message_broker.exceptions.ConnectionValidationError(message: dict)[source]
Bases:
RuntimeError
Exception raised when connection parameters are invalid.
Module to support sending messages to Kafka.
- class action_triggers.message_broker.kafka.KafkaBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]
Bases:
BrokerBase
Broker class for Kafka.
- broker_type = 'kafka'
- conn_class
alias of
KafkaConnection
- class action_triggers.message_broker.kafka.KafkaConnection(config: dict, conn_details: dict, params: dict)[source]
Bases:
ConnectionBase
Connection class for Kafka.
- required_conn_detail_fields = (HasField('bootstrap_servers'),)
- required_params_fields = (HasField('topic'),)
Module to support sending messages to RabbitMQ.
- class action_triggers.message_broker.rabbitmq.RabbitMQBroker(broker_key: str, conn_params: dict | None, params: dict | None, **kwargs)[source]
Bases:
BrokerBase
Broker class for RabbitMQ.
- Parameters:
broker_key – The key for the broker (must existing in settings.ACTION_TRIGGERS).
conn_params – The connection parameters to use for establishing the connection.
params – Additional parameters to use for the message broker.
kwargs – Additional keyword arguments to pass to the subclass.
- broker_type = 'rabbitmq'
- conn_class
alias of
RabbitMQConnection
- class action_triggers.message_broker.rabbitmq.RabbitMQConnection(config: dict, conn_details: dict, params: dict)[source]
Bases:
ConnectionBase
Connection class for RabbitMQ.
- required_conn_detail_fields = ()
- required_params_fields = (HasField('queue'),)
Module to support sending messages to Redis.
- class action_triggers.message_broker.redis.RedisBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]
Bases:
BrokerBase
Broker class for Redis.
- Parameters:
broker_key – The key for the broker (must existing in settings.ACTION_TRIGGERS).
conn_params – The connection parameters to use for establishing the connection.
params – Additional parameters to use for the message broker.
kwargs – Additional keyword arguments to pass to the subclass.
- broker_type = 'redis'
- conn_class
alias of
RedisConnection
- class action_triggers.message_broker.redis.RedisConnection(config: dict, conn_details: dict, params: dict)[source]
Bases:
ConnectionBase
Connection class for Redis.
- required_conn_detail_fields = (HasAtLeastOneOffField('url, host'),)
- required_params_fields = (HasField('channel'),)
Module to support sending messages to AWS SQS.
- class action_triggers.message_broker.aws_sqs.AwsSqsBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]
Bases:
BrokerBase
Broker class for AWS SQS.
- Parameters:
broker_key – The key for the broker (must exist in the settings.ACTION_TRIGGERS[“brokers”] dictionary)).
conn_params – The connection parameters to use for establishing the connection to the broker.
- broker_type = 'aws_sqs'
- conn_class
alias of
AwsSqsConnection
- class action_triggers.message_broker.aws_sqs.AwsSqsConnection(config: dict, conn_details: dict, params: dict)[source]
Bases:
ConnectionBase
Connection class for AWS SQS.
- get_queue_url() str [source]
Get the queue URL from the parameters or fetch it from AWS using the queue name.
- Returns:
The queue URL.
- required_conn_detail_fields = (HasField('endpoint_url'),)
- required_params_fields = (HasAtLeastOneOffField('queue_url, queue_name'),)
Module to support sending messages to AWS SNS.
- class action_triggers.message_broker.aws_sns.AwsSnsBroker(broker_key: str, conn_details: dict | None, params: dict | None, **kwargs)[source]
Bases:
BrokerBase
Broker class for AWS SQS.
- Parameters:
broker_key – The key for the broker (must exist in the settings.ACTION_TRIGGERS[“brokers”] dictionary)).
conn_params – The connection parameters to use for establishing the connection to the broker.
- broker_type = 'aws_sns'
- conn_class
alias of
AwsSnsConnection
- class action_triggers.message_broker.aws_sns.AwsSnsConnection(config: dict, conn_details: dict, params: dict)[source]
Bases:
ConnectionBase
Connection class for AWS SNS.
- required_conn_detail_fields = (HasField('endpoint_url'),)
- required_params_fields = (HasField('topic_arn'),)