Перейти к содержанию

MqttClientService

Транспортный слой: персистентное MQTT-соединение к брокеру Sber, цикл реконнекта с экспоненциальным backoff, примитивы publish / subscribe. Извлечён из SberBridge в рамках рефакторинга v1.25.1 для изоляции транспортных забот от оркестрации моста (SRP).

Сервис управляется через SberMqttCredentials (value object с учётными данными) и MqttServiceHooks (коллбэки для on_message / on_connected / on_disconnected). Не знает о сущностях, командах или состоянии HA — вся высокоуровневая логика инжектируется через хуки.

Async MQTT transport layer for the Sber Smart Home bridge.

Owns the persistent connection to the Sber MQTT broker, the reconnect loop with exponential backoff, the topic subscriptions and the raw publish operations. Extracted from :class:SberBridge to isolate transport concerns from bridge orchestration (SRP).

The service is driven by a SberMqttCredentials value object and a MqttServiceHooks struct of callbacks, so it does NOT know about entities, commands or HA state. All higher-level logic (initial publish, ack-guard, message routing) is injected via hooks.

SberMqttCredentials dataclass

SberMqttCredentials(login, password, broker, port, verify_ssl)

Connection credentials for the Sber MQTT broker.

MqttServiceHooks dataclass

MqttServiceHooks(on_message, on_connected, on_disconnected)

Callbacks invoked by :class:MqttClientService.

Attributes:

Name Type Description
on_message Callable[[str, bytes], Awaitable[None]]

Invoked for every incoming MQTT message (topic, payload).

on_connected Callable[[Client], Awaitable[None]]

Invoked once per successful handshake; used by the bridge to perform initial publish + subscription setup.

on_disconnected Callable[[Exception, bool], Awaitable[bool]]

Invoked after an MQTT / network error; returns True to continue reconnect loop, False to stop.

get_connected_since Callable[[Exception, bool], Awaitable[bool]]

Stats hook — called when the connection is established so callers can tag the timestamp.

MqttClientService

MqttClientService(*, hass, credentials, hooks, reconnect_min, reconnect_max)

Async MQTT transport with exponential backoff reconnect.

Typical usage (inside SberBridge.async_start)::

self._mqtt = MqttClientService(
    hass=hass,
    credentials=SberMqttCredentials(...),
    hooks=MqttServiceHooks(
        on_message=self._handle_mqtt_message,
        on_connected=self._handle_connected,
        on_disconnected=self._handle_disconnect,
    ),
    reconnect_min=reconnect_min,
    reconnect_max=reconnect_max,
)
self._connection_task = hass.async_create_task(
    self._mqtt.run(), eager_start=True,
)

Initialize the service.

Parameters:

Name Type Description Default
hass HomeAssistant

Home Assistant core instance (used for executor offload).

required
credentials SberMqttCredentials

Broker connection credentials.

required
hooks MqttServiceHooks

Higher-level callbacks injected by the bridge.

required
reconnect_min int

Initial reconnect backoff in seconds.

required
reconnect_max int

Upper bound for exponential backoff in seconds.

required
Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
def __init__(
    self,
    *,
    hass: HomeAssistant,
    credentials: SberMqttCredentials,
    hooks: MqttServiceHooks,
    reconnect_min: int,
    reconnect_max: int,
) -> None:
    """Initialize the service.

    Args:
        hass: Home Assistant core instance (used for executor offload).
        credentials: Broker connection credentials.
        hooks: Higher-level callbacks injected by the bridge.
        reconnect_min: Initial reconnect backoff in seconds.
        reconnect_max: Upper bound for exponential backoff in seconds.
    """
    self._hass = hass
    self._credentials = credentials
    self._hooks = hooks
    self._reconnect_min = reconnect_min
    self._reconnect_max = reconnect_max
    self._reconnect_interval = reconnect_min

    self._client: aiomqtt.Client | None = None
    self._connected = False
    self._running = False

client property

client

Return the current aiomqtt.Client or None when disconnected.

is_connected property

is_connected

Return True while a live session is active.

reconnect_interval property

reconnect_interval

Return the current exponential-backoff delay (read-only).

update_backoff_limits

update_backoff_limits(reconnect_min, reconnect_max)

Update exponential-backoff bounds at runtime.

The new minimum takes effect on the next connect; the new maximum clamps subsequent backoff steps immediately.

Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
def update_backoff_limits(self, reconnect_min: int, reconnect_max: int) -> None:
    """Update exponential-backoff bounds at runtime.

    The new minimum takes effect on the next connect; the new
    maximum clamps subsequent backoff steps immediately.
    """
    self._reconnect_min = reconnect_min
    self._reconnect_max = reconnect_max

update_verify_ssl

update_verify_ssl(verify_ssl)

Update the verify_ssl flag for the next reconnect.

Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
def update_verify_ssl(self, verify_ssl: bool) -> None:
    """Update the ``verify_ssl`` flag for the next reconnect."""
    self._credentials = SberMqttCredentials(
        login=self._credentials.login,
        password=self._credentials.password,
        broker=self._credentials.broker,
        port=self._credentials.port,
        verify_ssl=verify_ssl,
    )

run async

run()

Maintain a persistent MQTT connection until stop() is called.

On successful handshake, invokes hooks.on_connected(client) and then blocks on the inbound message stream, delegating each message to hooks.on_message. On error, invokes hooks.on_disconnected and applies exponential backoff before retrying.

Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
async def run(self) -> None:
    """Maintain a persistent MQTT connection until ``stop()`` is called.

    On successful handshake, invokes ``hooks.on_connected(client)`` and
    then blocks on the inbound message stream, delegating each message
    to ``hooks.on_message``.  On error, invokes ``hooks.on_disconnected``
    and applies exponential backoff before retrying.
    """
    from .config_flow import create_ssl_context

    self._running = True
    ssl_context = await self._hass.async_add_executor_job(create_ssl_context, self._credentials.verify_ssl)
    while self._running:
        try:
            async with self._build_client(ssl_context) as client:
                self._client = client
                self._connected = True
                self._reconnect_interval = self._reconnect_min
                await self._hooks.on_connected(client)
                await self._consume_messages(client)
        except aiomqtt.MqttError as err:
            if not await self._after_error(err, unexpected=False):
                break
        except asyncio.CancelledError:
            break
        except (OSError, ValueError, RuntimeError) as err:
            if not await self._after_error(err, unexpected=True):
                break

    self._client = None
    self._connected = False

stop async

stop()

Request the reconnect loop to exit at the next opportunity.

Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
async def stop(self) -> None:
    """Request the reconnect loop to exit at the next opportunity."""
    self._running = False

publish async

publish(topic, payload)

Publish a raw payload to the given topic.

Raises:

Type Description
RuntimeError

If called while disconnected.

MqttError

Propagated on transport errors.

Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
async def publish(self, topic: str, payload: str | bytes) -> None:
    """Publish a raw payload to the given topic.

    Raises:
        RuntimeError: If called while disconnected.
        aiomqtt.MqttError: Propagated on transport errors.
    """
    client = self._client
    if not self._connected or client is None:
        raise RuntimeError("Not connected to MQTT")
    await client.publish(topic, payload)

subscribe async

subscribe(topic_pattern)

Subscribe to a topic / topic pattern on the active session.

Source code in custom_components/sber_mqtt_bridge/mqtt_client_service.py
async def subscribe(self, topic_pattern: str) -> None:
    """Subscribe to a topic / topic pattern on the active session."""
    client = self._client
    if not self._connected or client is None:
        raise RuntimeError("Not connected to MQTT")
    await client.subscribe(topic_pattern)