SberCommandDispatcher
Интерпретация входящих Sber MQTT-сообщений и диспатч side-эффектов.
Владеет обработчиками handle_command, handle_status_request,
handle_config_request, handle_error, handle_change_group,
handle_rename_device, handle_global_config.
Извлечён из SberBridge в v1.25.1 для изоляции Sber-протокольной
логики от транспорта и HA state forwarding (SRP). Держит ссылку на
родительский SberBridge, так как ряд handler'ов мутирует состояние
моста (entities, redefinitions, acknowledgements) и инициирует publish.
Sber MQTT command dispatcher.
Handles commands, status/config requests, errors, change_group and
rename_device messages from the Sber cloud. Extracted from
:class:SberBridge to isolate Sber-protocol command interpretation
from transport and HA state forwarding (SRP).
The dispatcher holds a reference to its parent :class:SberBridge
because several command handlers need to mutate bridge state
(entities, redefinitions, acknowledgements) and invoke publish
operations. The coupling is explicit and one-way.
BridgeCommandContext
Bases: Protocol
Narrow interface for SberCommandDispatcher's bridge dependency.
Exposes the concrete collaborators owned by SberBridge — the
dispatcher reaches publish, redefinitions, and DevTools flows
through them, not through bridge-proxied wrapper methods.
SberCommandDispatcher
SberCommandDispatcher(bridge)
Interprets incoming Sber MQTT payloads and dispatches side effects.
Each handle_* method corresponds to one topic suffix in the Sber
down/* namespace. The bridge's _mqtt_dispatch table routes
incoming messages to the matching handler.
Initialize the dispatcher bound to a bridge context.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| def __init__(self, bridge: BridgeCommandContext) -> None:
"""Initialize the dispatcher bound to a bridge context."""
self._bridge = bridge
|
handle_command
async
Handle a command from Sber cloud → execute HA service.
During the reconnect grace period, commands are rejected and
current HA states are re-published so that Sber cloud accepts
HA as the authoritative source of truth.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| async def handle_command(self, payload: bytes) -> None:
"""Handle a command from Sber cloud → execute HA service.
During the reconnect grace period, commands are rejected and
current HA states are re-published so that Sber cloud accepts
HA as the authoritative source of truth.
"""
bridge = self._bridge
data = parse_sber_command(payload)
bridge._stats.commands_received += 1
devices = data.get("devices", {})
if await self._handle_reconnect_grace(devices):
return
_LOGGER.debug("Sber command for %d device(s): %s", len(devices), list(devices.keys()))
context = Context()
self._open_command_trace(devices, context)
update_state_ids: list[str] = []
for entity_id, cmd_data in devices.items():
if await self._process_one_entity(entity_id, cmd_data, context):
update_state_ids.append(entity_id)
commanded_ids = [eid for eid in devices if eid in bridge._entities]
if update_state_ids:
await bridge._publisher.publish_states(update_state_ids, force=True)
# Immediate echo ack: publish the received command states back to
# Sber within milliseconds so its ack timer does not expire before
# HA propagates the real state change. Required for integrations
# that delay/omit ``state_changed`` events on no-op commands (e.g.
# HA WLED integration with WLED 16.0.0 — see GitHub issue #35 and
# HA core issue #170435).
if commanded_ids:
await bridge._publisher.publish_command_echo(devices)
self._schedule_confirms(commanded_ids)
# Receiving any command is positive evidence that Sber accepted at
# least one entity — re-evaluate the silent-rejection issue so a
# stale repair tile clears as soon as the user activates the device.
self._refresh_repair_issues()
|
handle_status_request
async
handle_status_request(payload)
Handle a status request from Sber cloud.
If Sber asks about entities not in our current set, automatically
re-publishes the device config so Sber is aware of the correct list.
A status_request also counts as Sber acknowledgment.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| async def handle_status_request(self, payload: bytes) -> None:
"""Handle a status request from Sber cloud.
If Sber asks about entities not in our current set, automatically
re-publishes the device config so Sber is aware of the correct list.
A status_request also counts as Sber acknowledgment.
"""
bridge = self._bridge
requested_ids = parse_sber_status_request(payload)
bridge._stats.status_requests += 1
bridge._ack_audit.acknowledge()
if requested_ids:
unknown = [eid for eid in requested_ids if eid not in bridge._entities and eid != "root"]
if unknown:
_LOGGER.info(
"Sber asked about unknown entities, re-publishing config: %s",
unknown,
)
await bridge._publisher.publish_config()
if requested_ids:
for eid in requested_ids:
bridge._stats.acknowledged_entities.add(eid)
_LOGGER.info(
"Sber status request for %d specific entities: %s",
len(requested_ids),
requested_ids,
)
else:
bridge._stats.acknowledged_entities.update(bridge._enabled_entity_ids)
_LOGGER.info(
"Sber status request for ALL entities (%d)",
len(bridge._enabled_entity_ids),
)
await bridge._publisher.publish_states(requested_ids if requested_ids else None, force=True)
# status_request is the strongest single ack signal we get from
# Sber (it explicitly enumerates accepted entities or asks for
# the whole set). Refresh the repair issues so the silent-
# rejection tile clears in real time, not only on next reload.
self._refresh_repair_issues()
|
handle_config_request
async
Handle config request from Sber cloud — send device list.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| async def handle_config_request(self) -> None:
"""Handle config request from Sber cloud — send device list."""
bridge = self._bridge
bridge._stats.config_requests += 1
bridge._ack_audit.acknowledge()
_LOGGER.info(
"Sber config request received (will publish %d entities)",
len(bridge._enabled_entity_ids),
)
await bridge._publisher.publish_config()
|
handle_error
Handle error message from Sber cloud.
Parses the error payload, stores the detail in stats for repair
issue creation, and logs the error.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| def handle_error(self, payload: bytes) -> None:
"""Handle error message from Sber cloud.
Parses the error payload, stores the detail in stats for repair
issue creation, and logs the error.
"""
bridge = self._bridge
bridge._stats.errors_from_sber += 1
try:
error_data = json.loads(payload)
detail = json.dumps(error_data, ensure_ascii=False)
bridge._stats.last_error_detail = detail[:500]
_LOGGER.warning(
"Sber error (#%d): %s",
bridge._stats.errors_from_sber,
detail,
)
except (json.JSONDecodeError, TypeError):
raw = payload.decode(errors="replace")[:500]
bridge._stats.last_error_detail = raw
_LOGGER.warning(
"Sber error (#%d, raw): %s",
bridge._stats.errors_from_sber,
raw,
)
|
handle_change_group
async
handle_change_group(payload)
Handle device group/room change from Sber.
Only stores the redefinition locally. Does NOT re-publish config
to avoid an infinite loop: Sber sends change_group → we publish
config → Sber sends change_group again → loop forever.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| async def handle_change_group(self, payload: bytes) -> None:
"""Handle device group/room change from Sber.
Only stores the redefinition locally. Does NOT re-publish config
to avoid an infinite loop: Sber sends change_group → we publish
config → Sber sends change_group again → loop forever.
"""
bridge = self._bridge
try:
data = json.loads(payload)
except json.JSONDecodeError as exc:
_LOGGER.debug(
"Malformed change_group_device_request payload (json): %r — %s",
payload[:200] if isinstance(payload, (bytes, str)) else payload,
exc,
)
return
entity_id = data.get("device_id")
if not entity_id:
return
existing = dict(bridge._redef_store.raw.get(entity_id, {}))
existing["home"] = data.get("home")
existing["room"] = data.get("room")
bridge._redef_store.raw[entity_id] = existing
bridge._redef_store.schedule_persist()
_LOGGER.info("Sber group change stored: %s → room=%s", entity_id, data.get("room"))
|
handle_rename_device
async
handle_rename_device(payload)
Handle device rename from Sber.
Only stores the redefinition locally. Does NOT re-publish config
to avoid potential loops.
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| async def handle_rename_device(self, payload: bytes) -> None:
"""Handle device rename from Sber.
Only stores the redefinition locally. Does NOT re-publish config
to avoid potential loops.
"""
bridge = self._bridge
try:
data = json.loads(payload)
except json.JSONDecodeError as exc:
_LOGGER.debug(
"Malformed rename_device_request payload (json): %r — %s",
payload[:200] if isinstance(payload, (bytes, str)) else payload,
exc,
)
return
entity_id = data.get("device_id")
new_name = data.get("new_name")
if entity_id and new_name:
redef = bridge._redef_store.raw.setdefault(entity_id, {})
redef["name"] = new_name
bridge._redef_store.schedule_persist()
_LOGGER.info("Sber rename stored: %s → %s", entity_id, new_name)
|
handle_global_config
handle_global_config(payload)
Handle global config from Sber (http_api_endpoint).
Source code in custom_components/sber_mqtt_bridge/command_dispatcher.py
| def handle_global_config(self, payload: bytes) -> None:
"""Handle global config from Sber (http_api_endpoint)."""
try:
data = json.loads(payload)
endpoint = data.get("http_api_endpoint", "")
if endpoint:
_LOGGER.info("Sber HTTP API endpoint: %s", endpoint)
except json.JSONDecodeError as exc:
_LOGGER.debug(
"Malformed global_config payload (json): %r — %s",
payload[:200] if isinstance(payload, (bytes, str)) else payload,
exc,
)
|