Перейти к содержанию

SberCommandDispatcher

Интерпретация входящих Sber MQTT-сообщений и диспатч side-эффектов. Владеет обработчиками handle_command, handle_status_request, handle_config_request, handle_error, handle_change_group, handle_rename_device, handle_global_config.

Извлечён из SberBridge в v1.25.1 для изоляции Sber-протокольной логики от транспорта и HA state forwarding (SRP). Держит ссылку на родительский SberBridge, так как ряд handler'ов мутирует состояние моста (entities, redefinitions, acknowledgements) и инициирует publish.

Sber MQTT command dispatcher.

Handles commands, status/config requests, errors, change_group and rename_device messages from the Sber cloud. Extracted from :class:SberBridge to isolate Sber-protocol command interpretation from transport and HA state forwarding (SRP).

The dispatcher holds a reference to its parent :class:SberBridge because several command handlers need to mutate bridge state (entities, redefinitions, acknowledgements) and invoke publish operations. The coupling is explicit and one-way.

BridgeCommandContext

Bases: Protocol

Narrow interface for SberCommandDispatcher's bridge dependency.

Exposes the concrete collaborators owned by SberBridge — the dispatcher reaches publish, redefinitions, and DevTools flows through them, not through bridge-proxied wrapper methods.

SberCommandDispatcher

SberCommandDispatcher(bridge)

Interprets incoming Sber MQTT payloads and dispatches side effects.

Each handle_* method corresponds to one topic suffix in the Sber down/* namespace. The bridge's _mqtt_dispatch table routes incoming messages to the matching handler.

Initialize the dispatcher bound to a bridge context.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
def __init__(self, bridge: BridgeCommandContext) -> None:
    """Initialize the dispatcher bound to a bridge context."""
    self._bridge = bridge

handle_command async

handle_command(payload)

Handle a command from Sber cloud → execute HA service.

During the reconnect grace period, commands are rejected and current HA states are re-published so that Sber cloud accepts HA as the authoritative source of truth.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
async def handle_command(self, payload: bytes) -> None:
    """Handle a command from Sber cloud → execute HA service.

    During the reconnect grace period, commands are rejected and
    current HA states are re-published so that Sber cloud accepts
    HA as the authoritative source of truth.
    """
    bridge = self._bridge
    data = parse_sber_command(payload)
    bridge._stats.commands_received += 1
    devices = data.get("devices", {})

    if await self._handle_reconnect_grace(devices):
        return

    _LOGGER.debug("Sber command for %d device(s): %s", len(devices), list(devices.keys()))

    context = Context()
    self._open_command_trace(devices, context)

    update_state_ids: list[str] = []
    for entity_id, cmd_data in devices.items():
        if await self._process_one_entity(entity_id, cmd_data, context):
            update_state_ids.append(entity_id)

    commanded_ids = [eid for eid in devices if eid in bridge._entities]

    if update_state_ids:
        await bridge._publisher.publish_states(update_state_ids, force=True)

    # Immediate echo ack: publish the received command states back to
    # Sber within milliseconds so its ack timer does not expire before
    # HA propagates the real state change.  Required for integrations
    # that delay/omit ``state_changed`` events on no-op commands (e.g.
    # HA WLED integration with WLED 16.0.0 — see GitHub issue #35 and
    # HA core issue #170435).
    if commanded_ids:
        await bridge._publisher.publish_command_echo(devices)

    self._schedule_confirms(commanded_ids)

    # Receiving any command is positive evidence that Sber accepted at
    # least one entity — re-evaluate the silent-rejection issue so a
    # stale repair tile clears as soon as the user activates the device.
    self._refresh_repair_issues()

handle_status_request async

handle_status_request(payload)

Handle a status request from Sber cloud.

If Sber asks about entities not in our current set, automatically re-publishes the device config so Sber is aware of the correct list. A status_request also counts as Sber acknowledgment.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
async def handle_status_request(self, payload: bytes) -> None:
    """Handle a status request from Sber cloud.

    If Sber asks about entities not in our current set, automatically
    re-publishes the device config so Sber is aware of the correct list.
    A status_request also counts as Sber acknowledgment.
    """
    bridge = self._bridge
    requested_ids = parse_sber_status_request(payload)
    bridge._stats.status_requests += 1

    bridge._ack_audit.acknowledge()

    if requested_ids:
        unknown = [eid for eid in requested_ids if eid not in bridge._entities and eid != "root"]
        if unknown:
            _LOGGER.info(
                "Sber asked about unknown entities, re-publishing config: %s",
                unknown,
            )
            await bridge._publisher.publish_config()

    if requested_ids:
        for eid in requested_ids:
            bridge._stats.acknowledged_entities.add(eid)
        _LOGGER.info(
            "Sber status request for %d specific entities: %s",
            len(requested_ids),
            requested_ids,
        )
    else:
        bridge._stats.acknowledged_entities.update(bridge._enabled_entity_ids)
        _LOGGER.info(
            "Sber status request for ALL entities (%d)",
            len(bridge._enabled_entity_ids),
        )

    await bridge._publisher.publish_states(requested_ids if requested_ids else None, force=True)

    # status_request is the strongest single ack signal we get from
    # Sber (it explicitly enumerates accepted entities or asks for
    # the whole set).  Refresh the repair issues so the silent-
    # rejection tile clears in real time, not only on next reload.
    self._refresh_repair_issues()

handle_config_request async

handle_config_request()

Handle config request from Sber cloud — send device list.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
async def handle_config_request(self) -> None:
    """Handle config request from Sber cloud — send device list."""
    bridge = self._bridge
    bridge._stats.config_requests += 1
    bridge._ack_audit.acknowledge()
    _LOGGER.info(
        "Sber config request received (will publish %d entities)",
        len(bridge._enabled_entity_ids),
    )
    await bridge._publisher.publish_config()

handle_error

handle_error(payload)

Handle error message from Sber cloud.

Parses the error payload, stores the detail in stats for repair issue creation, and logs the error.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
def handle_error(self, payload: bytes) -> None:
    """Handle error message from Sber cloud.

    Parses the error payload, stores the detail in stats for repair
    issue creation, and logs the error.
    """
    bridge = self._bridge
    bridge._stats.errors_from_sber += 1
    try:
        error_data = json.loads(payload)
        detail = json.dumps(error_data, ensure_ascii=False)
        bridge._stats.last_error_detail = detail[:500]
        _LOGGER.warning(
            "Sber error (#%d): %s",
            bridge._stats.errors_from_sber,
            detail,
        )
    except (json.JSONDecodeError, TypeError):
        raw = payload.decode(errors="replace")[:500]
        bridge._stats.last_error_detail = raw
        _LOGGER.warning(
            "Sber error (#%d, raw): %s",
            bridge._stats.errors_from_sber,
            raw,
        )

handle_change_group async

handle_change_group(payload)

Handle device group/room change from Sber.

Only stores the redefinition locally. Does NOT re-publish config to avoid an infinite loop: Sber sends change_group → we publish config → Sber sends change_group again → loop forever.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
async def handle_change_group(self, payload: bytes) -> None:
    """Handle device group/room change from Sber.

    Only stores the redefinition locally. Does NOT re-publish config
    to avoid an infinite loop: Sber sends change_group → we publish
    config → Sber sends change_group again → loop forever.
    """
    bridge = self._bridge
    try:
        data = json.loads(payload)
    except json.JSONDecodeError as exc:
        _LOGGER.debug(
            "Malformed change_group_device_request payload (json): %r%s",
            payload[:200] if isinstance(payload, (bytes, str)) else payload,
            exc,
        )
        return
    entity_id = data.get("device_id")
    if not entity_id:
        return
    existing = dict(bridge._redef_store.raw.get(entity_id, {}))
    existing["home"] = data.get("home")
    existing["room"] = data.get("room")
    bridge._redef_store.raw[entity_id] = existing
    bridge._redef_store.schedule_persist()
    _LOGGER.info("Sber group change stored: %s → room=%s", entity_id, data.get("room"))

handle_rename_device async

handle_rename_device(payload)

Handle device rename from Sber.

Only stores the redefinition locally. Does NOT re-publish config to avoid potential loops.

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
async def handle_rename_device(self, payload: bytes) -> None:
    """Handle device rename from Sber.

    Only stores the redefinition locally. Does NOT re-publish config
    to avoid potential loops.
    """
    bridge = self._bridge
    try:
        data = json.loads(payload)
    except json.JSONDecodeError as exc:
        _LOGGER.debug(
            "Malformed rename_device_request payload (json): %r%s",
            payload[:200] if isinstance(payload, (bytes, str)) else payload,
            exc,
        )
        return
    entity_id = data.get("device_id")
    new_name = data.get("new_name")
    if entity_id and new_name:
        redef = bridge._redef_store.raw.setdefault(entity_id, {})
        redef["name"] = new_name
        bridge._redef_store.schedule_persist()
        _LOGGER.info("Sber rename stored: %s%s", entity_id, new_name)

handle_global_config

handle_global_config(payload)

Handle global config from Sber (http_api_endpoint).

Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
def handle_global_config(self, payload: bytes) -> None:
    """Handle global config from Sber (http_api_endpoint)."""
    try:
        data = json.loads(payload)
        endpoint = data.get("http_api_endpoint", "")
        if endpoint:
            _LOGGER.info("Sber HTTP API endpoint: %s", endpoint)
    except json.JSONDecodeError as exc:
        _LOGGER.debug(
            "Malformed global_config payload (json): %r%s",
            payload[:200] if isinstance(payload, (bytes, str)) else payload,
            exc,
        )