Перейти к содержанию

SberBridge

Ядро интеграции: управление MQTT-соединением к Sber Cloud, диспетчеризация команд, отслеживание состояний HA-устройств.

Sber Smart Home MQTT Bridge - core bridge logic.

Manages: - Async MQTT connection to Sber cloud broker (aiomqtt) - HA state change listening and publishing to Sber - Sber command reception and forwarding to HA services - Connection health monitoring and device acknowledgment tracking

RECONNECT_INTERVAL_MIN module-attribute

RECONNECT_INTERVAL_MIN = SETTINGS_DEFAULTS[CONF_RECONNECT_MIN]

Default minimum seconds to wait before reconnecting after an MQTT connection loss.

RECONNECT_INTERVAL_MAX module-attribute

RECONNECT_INTERVAL_MAX = SETTINGS_DEFAULTS[CONF_RECONNECT_MAX]

Default maximum seconds to wait (5 minutes) with exponential backoff.

MAX_MQTT_PAYLOAD_SIZE module-attribute

MAX_MQTT_PAYLOAD_SIZE = SETTINGS_DEFAULTS[CONF_MAX_MQTT_PAYLOAD]

Default maximum MQTT payload size in bytes (1 MB) to prevent DoS from oversized messages.

RECONNECT_GRACE_TIMEOUT module-attribute

RECONNECT_GRACE_TIMEOUT = 30.0

Maximum seconds to wait for Sber acknowledgment after (re)connect.

After a reconnect, the bridge publishes HA states and waits for Sber to acknowledge them (via status_request or config_request) before accepting commands. This timeout is a fallback in case Sber never sends a request.

BridgeStats dataclass

BridgeStats(connected_since=None, messages_received=0, messages_sent=0, commands_received=0, config_requests=0, status_requests=0, errors_from_sber=0, publish_errors=0, last_message_time=None, reconnect_count=0, acknowledged_entities=set(), last_error_detail='', last_ack_time=None, validation_failures=list())

Connection statistics and health metrics for the Sber MQTT bridge.

connected_since class-attribute instance-attribute

connected_since = None

Timestamp when the current connection was established.

messages_received class-attribute instance-attribute

messages_received = 0

Total MQTT messages received from Sber.

messages_sent class-attribute instance-attribute

messages_sent = 0

Total MQTT messages published to Sber.

commands_received class-attribute instance-attribute

commands_received = 0

Total Sber commands processed.

config_requests class-attribute instance-attribute

config_requests = 0

Total config requests received from Sber.

status_requests class-attribute instance-attribute

status_requests = 0

Total status requests received from Sber.

errors_from_sber class-attribute instance-attribute

errors_from_sber = 0

Total error messages received from Sber.

publish_errors class-attribute instance-attribute

publish_errors = 0

Total failed publish attempts.

last_message_time class-attribute instance-attribute

last_message_time = None

Timestamp of the last message received.

reconnect_count class-attribute instance-attribute

reconnect_count = 0

Total number of reconnections since startup.

acknowledged_entities class-attribute instance-attribute

acknowledged_entities = field(default_factory=set)

Entity IDs that Sber has acknowledged (via status_request or command).

last_error_detail class-attribute instance-attribute

last_error_detail = ''

Human-readable detail of the last error message from Sber cloud.

last_ack_time class-attribute instance-attribute

last_ack_time = None

Timestamp (monotonic) of the last Sber acknowledgment received.

validation_failures class-attribute instance-attribute

validation_failures = field(default_factory=list)

Entity IDs that failed pydantic validation and were excluded from last config.

as_dict

as_dict()

Return stats as a serializable dict.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def as_dict(self) -> dict:
    """Return stats as a serializable dict."""
    now = time.monotonic()
    return {
        "connected_since": self.connected_since,
        "connection_uptime_seconds": (round(now - self.connected_since, 1) if self.connected_since else None),
        "messages_received": self.messages_received,
        "messages_sent": self.messages_sent,
        "commands_received": self.commands_received,
        "config_requests": self.config_requests,
        "status_requests": self.status_requests,
        "errors_from_sber": self.errors_from_sber,
        "publish_errors": self.publish_errors,
        "reconnect_count": self.reconnect_count,
        "acknowledged_entities": sorted(self.acknowledged_entities),
        "last_error_detail": self.last_error_detail,
        "validation_failures": list(self.validation_failures),
    }

SberBridge

SberBridge(hass, entry)

Bridge between Home Assistant and Sber Smart Home MQTT cloud.

Initialize the bridge.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
    """Initialize the bridge."""
    self._hass = hass
    self._entry = entry

    self._login: str = entry.data[CONF_SBER_LOGIN]
    self._password: str = entry.data[CONF_SBER_PASSWORD]
    self._broker: str = entry.data[CONF_SBER_BROKER]
    self._port: int = entry.data[CONF_SBER_PORT]
    self._verify_ssl: bool = entry.options.get(CONF_SBER_VERIFY_SSL, entry.data.get(CONF_SBER_VERIFY_SSL, True))

    self._root_topic = f"{SBER_TOPIC_PREFIX}/{self._login}"
    self._down_topic = f"{self._root_topic}/down"

    self._ha_instance_id_prefix: str = ""
    """Cached 8-char prefix of HA instance UUID; populated in ``async_start``."""
    self._entities: dict[str, BaseEntity] = {}
    self._enabled_entity_ids: list[str] = []
    # Persisted redefinitions delegated to RedefinitionsStore (v1.38.4).
    self._redef_store = RedefinitionsStore(self)
    self._entity_links: dict[str, dict[str, str]] = {}
    """Primary entity → {role: linked_entity_id}."""
    self._linked_reverse: dict[str, tuple[str, str]] = {}
    """Linked entity_id → (primary_entity_id, role)."""
    self._entity_loader = SberEntityLoader(hass, entry)

    self._mqtt_client: aiomqtt.Client | None = None
    self._connection_task: asyncio.Task | None = None
    self._running = False
    self._connected = False

    # Configurable operational settings loaded from ``config_entry.options``.
    # All defaults live in ``SETTINGS_DEFAULTS`` (const.py) — this avoids
    # scattered ``opts.get(key, hardcoded_default)`` calls and keeps the
    # canonical values in exactly one place (DRY).
    self._load_settings_from_options(entry.options)

    self._unsub_lifecycle_listeners: list[Callable] = []

    self._stats = BridgeStats()

    # Publish coordinator owns the three Sber publish flows and the
    # last-config timestamp; bridge keeps thin delegators below.
    self._publisher = SberPublisher(self)

    # Gate: delay initial MQTT publish until HA is fully started so that
    # entity states (and therefore Sber features) are fully populated.
    self._ha_ready = asyncio.Event()

    # HA → Sber event forwarder: owns state-change subscription + debouncing
    self._state_forwarder = HaStateForwarder(
        hass=hass,
        debounce_delay=self._debounce_delay,
        get_entities=lambda: self._entities,
        get_linked_reverse=lambda: self._linked_reverse,
        on_publish_states=self._publish_states,
        on_republish_config=self._publish_config,
        create_safe_task=self._create_safe_task,
        on_trace_state_change=self._trace_on_state_change,
    )

    # Sber protocol command dispatcher (commands, status/config request, etc.)
    self._command_dispatcher = SberCommandDispatcher(self)

    # MQTT transport service: owns reconnect loop + publish + subscribe
    self._mqtt_service = MqttClientService(
        hass=hass,
        credentials=SberMqttCredentials(
            login=self._login,
            password=self._password,
            broker=self._broker,
            port=self._port,
            verify_ssl=self._verify_ssl,
        ),
        hooks=MqttServiceHooks(
            on_message=self._handle_mqtt_message,
            on_connected=self._handle_mqtt_connected,
            on_disconnected=self._handle_mqtt_disconnected,
        ),
        reconnect_min=self._reconnect_min,
        reconnect_max=self._reconnect_max,
    )

    # Ack audit owns the reconnect guard AND the silent-rejection
    # scheduler in one place — see ``ack_audit.py`` for the rationale.
    from .ack_audit import AckAudit

    self._ack_audit = AckAudit(
        hass,
        grace_timeout=RECONNECT_GRACE_TIMEOUT,
        audit_delay=self._ack_audit_delay,
        on_audit=self._run_ack_audit,
    )

    # Delayed confirm tasks per entity (dedup: cancel previous on new command)
    self._confirm_tasks: dict[str, asyncio.Task] = {}

    # DevTools collector aggregate (message log, traces, diff, validation).
    self._devtools = DevToolsHub(message_log_size=self._message_log_size)

is_connected property

is_connected

Return True if connected to Sber MQTT.

config_entry property

config_entry

Return the bridge's owning HA config entry (read-only access).

connection_phase property

connection_phase

Return the current connection lifecycle phase.

Phases

starting — HA not fully loaded, waiting for integrations. connecting — MQTT connection in progress. awaiting_ack — connected, published config, waiting for Sber to acknowledge. ready — fully operational, accepting commands. disconnected — not connected to MQTT broker.

entities_count property

entities_count

Return the number of loaded Sber entities.

entities property

entities

Return the dict of loaded Sber entities (read-only view).

enabled_entity_ids property

enabled_entity_ids

Return a copy of the enabled entity ID list.

redefinitions property

redefinitions

Return a copy of the entity redefinitions mapping.

entity_links

Return the current entity link map.

linked_entity_ids property

linked_entity_ids

Return set of all linked entity IDs (not primary).

stats property

stats

Return bridge statistics as a serializable dict.

ha_serial_prefix property

ha_serial_prefix

Return active per-HA serial prefix, or None when feature is off.

unacknowledged_entities property

unacknowledged_entities

Return entity IDs that were published but not yet acknowledged by Sber.

message_log property

message_log

Return the DevTools outbound-message ring buffer (delegates to hub).

trace_collector property

trace_collector

Return the correlation-trace collector (delegates to hub).

diff_collector property

diff_collector

Return the state-diff collector (delegates to hub).

validation_collector property

validation_collector

Return the schema-validation collector (delegates to hub).

async_update_redefinition async

async_update_redefinition(entity_id, fields)

Merge redefinition fields for an entity and trigger config republish.

Public API for frontend / WebSocket handlers to update a device's Sber-side name / room / home without reaching into private state. Delegates data mutation and debounced persistence to :meth:RedefinitionsStore.async_update.

Parameters:

Name Type Description Default
entity_id str

Target Sber entity identifier (must exist in the bridge).

required
fields dict[str, str | None]

Partial mapping with any of name / room / home. An empty string or None for a key removes that field.

required

Returns:

Type Description
dict[str, str]

Resulting redefinitions dict for the entity after merge.

Raises:

Type Description
KeyError

If entity_id is not loaded in the bridge.

HomeAssistantError

If the follow-up config publish fails.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_update_redefinition(self, entity_id: str, fields: dict[str, str | None]) -> dict[str, str]:
    """Merge redefinition fields for an entity and trigger config republish.

    Public API for frontend / WebSocket handlers to update a device's
    Sber-side name / room / home without reaching into private state.
    Delegates data mutation and debounced persistence to
    :meth:`RedefinitionsStore.async_update`.

    Args:
        entity_id: Target Sber entity identifier (must exist in the bridge).
        fields: Partial mapping with any of ``name`` / ``room`` / ``home``.
            An empty string or ``None`` for a key removes that field.

    Returns:
        Resulting redefinitions dict for the entity after merge.

    Raises:
        KeyError: If ``entity_id`` is not loaded in the bridge.
        HomeAssistantError: If the follow-up config publish fails.
    """
    if entity_id not in self._entities:
        raise KeyError(entity_id)
    existing = await self._redef_store.async_update(entity_id, fields)
    await self._publish_config()
    return existing

async_republish_config async

async_republish_config()

Public wrapper for forcing a device config republish to Sber.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_republish_config(self) -> None:
    """Public wrapper for forcing a device config republish to Sber."""
    await self._publish_config()

clear_message_log

clear_message_log()

Clear the DevTools message log (delegates to hub).

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def clear_message_log(self) -> None:
    """Clear the DevTools message log (delegates to hub)."""
    self._devtools.clear_message_log()

apply_settings

apply_settings(options)

Apply changed operational settings without full bridge restart.

Settings that take effect immediately: debounce_delay, max_mqtt_payload_size, message_log_size. Settings that take effect on next reconnect: reconnect_min, reconnect_max, verify_ssl.

Parameters:

Name Type Description Default
options dict

Config entry options dict.

required
Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def apply_settings(self, options: dict) -> None:
    """Apply changed operational settings without full bridge restart.

    Settings that take effect immediately: debounce_delay, max_mqtt_payload_size,
    message_log_size.
    Settings that take effect on next reconnect: reconnect_min, reconnect_max, verify_ssl.

    Args:
        options: Config entry options dict.
    """
    self._load_settings_from_options(options)
    self._state_forwarder.set_debounce_delay(self._debounce_delay)
    self._mqtt_service.update_backoff_limits(self._reconnect_min, self._reconnect_max)
    self._mqtt_service.update_verify_ssl(self._verify_ssl)
    self._devtools.resize(self._message_log_size)

    _LOGGER.info(
        "Bridge settings applied (debounce=%.2fs, log=%d)",
        self._debounce_delay,
        self._message_log_size,
    )

async_publish_raw async

async_publish_raw(payload, target)

Publish arbitrary JSON payload to Sber MQTT for debugging.

Parameters:

Name Type Description Default
payload str

Raw JSON string to publish.

required
target str

Topic suffix — either "config" or "status".

required

Raises:

Type Description
RuntimeError

If not connected to MQTT broker.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_publish_raw(self, payload: str, target: str) -> None:
    """Publish arbitrary JSON payload to Sber MQTT for debugging.

    Args:
        payload: Raw JSON string to publish.
        target: Topic suffix — either "config" or "status".

    Raises:
        RuntimeError: If not connected to MQTT broker.
    """
    if not self._connected or self._mqtt_client is None:
        msg = "Not connected to MQTT"
        raise RuntimeError(msg)

    topic = f"{self._root_topic}/up/{target}"
    await self._mqtt_client.publish(topic, payload)
    self._stats.messages_sent += 1
    self._log_message("out", topic, payload)

async_inject_sber_message async

async_inject_sber_message(topic, payload, *, mark_replay=True)

Feed a synthetic message into the dispatcher as if Sber sent it.

Used by DevTools Replay / Inject: takes a topic (full sbdev/.../down/commands or a bare suffix like commands) and runs it through the normal inbound pipeline — :class:SberCommandDispatcher, correlation trace, state diff, ack audit — without going through the MQTT broker. No network round-trip means an injected command flows even when the bridge is offline, which is exactly what users want when debugging.

Parameters:

Name Type Description Default
topic str

Either the full MQTT topic as it would arrive from Sber cloud, or just the last segment (suffix) which is automatically expanded to {root}/down/{suffix}.

required
payload str | bytes

Raw JSON body. Bytes pass through as-is; strings are UTF-8 encoded to match the real on-wire shape.

required
mark_replay bool

When True (default), the DevTools message log records the direction as "replay" instead of "in" so the UI can visually distinguish synthetic traffic from real Sber commands. Set False to make the injection indistinguishable from real MQTT input (e.g. reproducing a bug for screenshot).

True

Returns:

Type Description
dict[str, Any]

Dict with {"topic": str, "handled": bool, "suffix": str}.

dict[str, Any]

handled is False only when no dispatcher was registered

dict[str, Any]

for the given suffix (unknown topic).

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_inject_sber_message(
    self,
    topic: str,
    payload: str | bytes,
    *,
    mark_replay: bool = True,
) -> dict[str, Any]:
    """Feed a synthetic message into the dispatcher as if Sber sent it.

    Used by DevTools Replay / Inject: takes a topic (full
    ``sbdev/.../down/commands`` or a bare suffix like ``commands``)
    and runs it through the normal inbound pipeline —
    :class:`SberCommandDispatcher`, correlation trace, state diff,
    ack audit — without going through the MQTT broker.  No network
    round-trip means an injected command flows even when the bridge
    is offline, which is exactly what users want when debugging.

    Args:
        topic: Either the full MQTT topic as it would arrive from
            Sber cloud, or just the last segment (suffix) which is
            automatically expanded to ``{root}/down/{suffix}``.
        payload: Raw JSON body.  Bytes pass through as-is; strings
            are UTF-8 encoded to match the real on-wire shape.
        mark_replay: When True (default), the DevTools message log
            records the direction as ``"replay"`` instead of
            ``"in"`` so the UI can visually distinguish synthetic
            traffic from real Sber commands.  Set False to make
            the injection indistinguishable from real MQTT input
            (e.g. reproducing a bug for screenshot).

    Returns:
        Dict with ``{"topic": str, "handled": bool, "suffix": str}``.
        ``handled`` is False only when no dispatcher was registered
        for the given suffix (unknown topic).
    """
    full_topic = topic if "/" in topic else f"{self._down_topic}/{topic}"
    body = payload.encode("utf-8") if isinstance(payload, str) else payload

    # Route through the dispatch table used by the real MQTT handler.
    suffix = full_topic.rsplit("/", 1)[-1] if "/" in full_topic else full_topic
    decoded = body.decode("utf-8", errors="replace")
    self._log_message("replay" if mark_replay else "in", full_topic, decoded)

    if full_topic == SBER_GLOBAL_CONFIG_TOPIC:
        self._handle_global_config(body)
        return {"topic": full_topic, "handled": True, "suffix": "(global_config)"}

    handler = self._mqtt_dispatch.get(suffix)
    if handler is None:
        _LOGGER.warning("Inject: unhandled topic suffix %r", suffix)
        return {"topic": full_topic, "handled": False, "suffix": suffix}

    await handler(body)
    return {"topic": full_topic, "handled": True, "suffix": suffix}

subscribe_messages

subscribe_messages(callback_fn)

Subscribe to new MQTT messages in real time (delegates to hub).

Parameters:

Name Type Description Default
callback_fn Callable[[dict], None]

Called with each new message dict.

required

Returns:

Type Description
Callable[[], None]

Unsubscribe callable.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def subscribe_messages(self, callback_fn: Callable[[dict], None]) -> Callable[[], None]:
    """Subscribe to new MQTT messages in real time (delegates to hub).

    Args:
        callback_fn: Called with each new message dict.

    Returns:
        Unsubscribe callable.
    """
    return self._devtools.subscribe_messages(callback_fn)

async_start async

async_start()

Start the bridge: load entities, subscribe to HA events, connect MQTT.

HA state events are subscribed immediately (independent of MQTT connectivity) so that no state changes are lost while waiting for the first connection. MQTT connection is established in a background task with exponential backoff.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_start(self) -> None:
    """Start the bridge: load entities, subscribe to HA events, connect MQTT.

    HA state events are subscribed immediately (independent of MQTT connectivity)
    so that no state changes are lost while waiting for the first connection.
    MQTT connection is established in a background task with exponential backoff.
    """
    self._running = True
    # Cache the HA instance UUID prefix so the publish hot-path stays sync.
    # Used for the per-HA ``ha_serial_number`` loop-detection marker.
    from homeassistant.helpers import instance_id

    full_uuid = await instance_id.async_get(self._hass)
    self._ha_instance_id_prefix: str = full_uuid[:8]
    self._load_exposed_entities()
    self._subscribe_ha_events()
    self._connection_task = asyncio.create_task(self._mqtt_connection_loop())

    # If HA is already running (e.g. integration reload), entities are
    # fully available — mark ready immediately.  Otherwise, wait for
    # EVENT_HOMEASSISTANT_STARTED to reload entities with real states.
    if self._hass.is_running:
        _LOGGER.debug("HA already running — entities loaded, marking ready")
        self._ha_ready.set()
    else:
        self._unsub_lifecycle_listeners.append(
            self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self._on_homeassistant_started)
        )

async_stop async

async_stop()

Stop the bridge: disconnect MQTT, unsubscribe from HA events.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_stop(self) -> None:
    """Stop the bridge: disconnect MQTT, unsubscribe from HA events."""
    self._running = False

    # HA state-change listeners + debounced publish live in the forwarder
    self._state_forwarder.unsubscribe_all()

    for unsub in self._unsub_lifecycle_listeners:
        unsub()
    self._unsub_lifecycle_listeners.clear()

    # Cancel any pending ack-audit timer so it can't fire after shutdown
    self._ack_audit.cancel()

    # Stop the MQTT service reconnect loop
    await self._mqtt_service.stop()

    if self._connection_task:
        self._connection_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._connection_task
        self._connection_task = None

    self._connected = False

refresh_repair_issues

refresh_repair_issues()

Recompute HA repair issues without awaiting.

Wraps :func:check_and_create_issues in a safe background task so callers (notably the command dispatcher) can fire-and-forget after an ack arrives. No-op when HA is not yet running so we don't fight the early-startup grace window in :meth:_load_exposed_entities.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
@callback
def refresh_repair_issues(self) -> None:
    """Recompute HA repair issues without awaiting.

    Wraps :func:`check_and_create_issues` in a safe background task so
    callers (notably the command dispatcher) can fire-and-forget after
    an ack arrives.  No-op when HA is not yet running so we don't fight
    the early-startup grace window in :meth:`_load_exposed_entities`.
    """
    if not self._hass.is_running:
        return
    self._create_safe_task(
        check_and_create_issues(self._hass, self),
        name="refresh_repair_issues",
    )

async_publish_entity_status async

async_publish_entity_status(entity_id)

Publish the current state of a single entity to Sber cloud.

Parameters:

Name Type Description Default
entity_id str

HA entity identifier.

required
Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_publish_entity_status(self, entity_id: str) -> None:
    """Publish the current state of a single entity to Sber cloud.

    Args:
        entity_id: HA entity identifier.
    """
    await self._publish_states([entity_id])