Source code for action_triggers.message_broker.kafka

"""Module to support sending messages to Kafka."""

import typing as _t

from action_triggers.config_required_fields import HasField
from action_triggers.message_broker.base import BrokerBase, ConnectionBase
from action_triggers.message_broker.enums import BrokerType
from action_triggers.utils.module_import import MissingImportWrapper

try:
    from aiokafka import AIOKafkaProducer  # type: ignore[import-untyped]
except ImportError:  # pragma: no cover
    AIOKafkaProducer = MissingImportWrapper("AIOKafkaProducer")


[docs] class KafkaConnection(ConnectionBase): """Connection class for Kafka.""" required_conn_detail_fields = (HasField("bootstrap_servers", str),) required_params_fields = (HasField("topic", str),)
[docs] async def connect(self) -> None: self.conn = AIOKafkaProducer(**self.conn_details) await self.conn.start()
[docs] async def close(self) -> None: if self.conn: await self.conn.stop() self.conn = None
[docs] class KafkaBroker(BrokerBase): """Broker class for Kafka.""" broker_type = BrokerType.KAFKA conn_class = KafkaConnection def __init__( self, broker_key: str, conn_params: _t.Union[dict, None], params: _t.Union[dict, None], **kwargs, ): super().__init__(broker_key, conn_params, params, **kwargs) self.topic = self.params.get("topic") async def _send_message_impl(self, conn: _t.Any, message: str) -> None: """Send a message to the Kafka broker. :param conn: The connection to the broker. :param message: The message to send. """ await conn.conn.send_and_wait(self.topic, message.encode())