Перейти к содержанию

HaStateForwarder

Переброс HA state_changed событий в Sber. Владеет подпиской на state-change события через async_track_state_change_event, маршрутизацией linked-сенсоров к primary-сущности, обнаружением изменения feature-листа (триггер config republish), дебаунсингом публикации состояний.

Извлечён из SberBridge в v1.25.1. Не владеет состоянием моста — читает сущности и linked-маппинг через callback'и, так что мост остаётся единственным источником истины.

HA state change → Sber publish forwarder.

Owns the HA state-change subscription, linked-entity routing, debouncing of rapid updates, and the feature-change detection that triggers config republish. Extracted from :class:SberBridge to isolate the HA-facing event loop from the MQTT transport (SRP).

Usage::

forwarder = HaStateForwarder(
    hass=hass,
    debounce_delay=0.1,
    get_entities=lambda: bridge.entities,
    get_linked_reverse=lambda: bridge.linked_reverse_map,
    on_publish_states=bridge.async_publish_entity_ids,
    on_republish_config=bridge.async_republish_config,
    create_safe_task=bridge._create_safe_task,
)
forwarder.subscribe([...])
...
forwarder.unsubscribe_all()

HaStateForwarder

HaStateForwarder(*, hass, debounce_delay, get_entities, get_linked_reverse, on_publish_states, on_republish_config, create_safe_task, on_trace_state_change=None)

Forward HA state_changed events to Sber via bridge callbacks.

Responsibilities
  • Subscribe / unsubscribe to HA state-change events for a set of entity IDs (primary + linked).
  • Route linked sensor updates to their primary entity.
  • Detect unfilled → filled transitions and feature-set changes, triggering a config republish via callback.
  • Debounce rapid state changes so multiple updates within debounce_delay seconds coalesce into a single publish.

This class owns NO bridge state — it reads entities and linked mappings through callbacks so the bridge remains the single source of truth.

Initialize the forwarder.

Parameters:

Name Type Description Default
hass HomeAssistant

Home Assistant core instance.

required
debounce_delay float

Seconds to coalesce rapid state changes.

required
get_entities Callable[[], dict[str, BaseEntity]]

Callable returning the current entities dict.

required
get_linked_reverse Callable[[], dict[str, tuple[str, str]]]

Callable returning linked-reverse mapping.

required
on_publish_states Callable[[list[str]], Awaitable[None]]

Async callback to publish a list of entity IDs.

required
on_republish_config Callable[[], Awaitable[None]]

Async callback to force a config republish.

required
create_safe_task Callable[..., Task]

Bridge helper wrapping hass.async_create_task with error logging.

required
on_trace_state_change Callable[[str | None, str, dict], None] | None

Optional DevTools hook invoked on every processed primary state change with (context_id, entity_id, state_dict) so the correlation-trace collector can attach the event. No-op when None — keeps tests free of DevTools wiring.

None
Source code in custom_components/sber_mqtt_bridge/ha_state_forwarder.py
def __init__(
    self,
    *,
    hass: HomeAssistant,
    debounce_delay: float,
    get_entities: Callable[[], dict[str, BaseEntity]],
    get_linked_reverse: Callable[[], dict[str, tuple[str, str]]],
    on_publish_states: Callable[[list[str]], Awaitable[None]],
    on_republish_config: Callable[[], Awaitable[None]],
    create_safe_task: Callable[..., asyncio.Task],
    on_trace_state_change: Callable[[str | None, str, dict], None] | None = None,
) -> None:
    """Initialize the forwarder.

    Args:
        hass: Home Assistant core instance.
        debounce_delay: Seconds to coalesce rapid state changes.
        get_entities: Callable returning the current entities dict.
        get_linked_reverse: Callable returning linked-reverse mapping.
        on_publish_states: Async callback to publish a list of entity IDs.
        on_republish_config: Async callback to force a config republish.
        create_safe_task: Bridge helper wrapping ``hass.async_create_task``
            with error logging.
        on_trace_state_change: Optional DevTools hook invoked on every
            processed primary state change with ``(context_id, entity_id,
            state_dict)`` so the correlation-trace collector can attach
            the event. No-op when ``None`` — keeps tests free of
            DevTools wiring.
    """
    self._hass = hass
    self._debounce_delay = debounce_delay
    self._get_entities = get_entities
    self._get_linked_reverse = get_linked_reverse
    self._on_publish_states = on_publish_states
    self._on_republish_config = on_republish_config
    self._create_safe_task = create_safe_task
    self._on_trace_state_change = on_trace_state_change

    self._unsub_listeners: list[Callable[[], None]] = []
    self._pending_publish_ids: set[str] = set()
    self._publish_timer: asyncio.TimerHandle | None = None

set_debounce_delay

set_debounce_delay(delay)

Update the debounce delay at runtime.

Source code in custom_components/sber_mqtt_bridge/ha_state_forwarder.py
def set_debounce_delay(self, delay: float) -> None:
    """Update the debounce delay at runtime."""
    self._debounce_delay = delay

subscribe

subscribe(entity_ids)

Replace the set of tracked entities and resubscribe.

Unsubscribes any previous listeners first, so subsequent calls are idempotent.

Parameters:

Name Type Description Default
entity_ids list[str]

Combined list of primary + linked entity IDs to track. Empty list is allowed and results in a no-op.

required
Source code in custom_components/sber_mqtt_bridge/ha_state_forwarder.py
def subscribe(self, entity_ids: list[str]) -> None:
    """Replace the set of tracked entities and resubscribe.

    Unsubscribes any previous listeners first, so subsequent calls
    are idempotent.

    Args:
        entity_ids: Combined list of primary + linked entity IDs to
            track.  Empty list is allowed and results in a no-op.
    """
    self.unsubscribe_all()
    if not entity_ids:
        return
    unsub = async_track_state_change_event(
        self._hass,
        entity_ids,
        self._on_ha_state_changed,
    )
    self._unsub_listeners.append(unsub)

unsubscribe_all

unsubscribe_all()

Unsubscribe from HA state-change events and cancel pending publish.

Source code in custom_components/sber_mqtt_bridge/ha_state_forwarder.py
def unsubscribe_all(self) -> None:
    """Unsubscribe from HA state-change events and cancel pending publish."""
    for unsub in self._unsub_listeners:
        unsub()
    self._unsub_listeners.clear()
    if self._publish_timer is not None:
        self._publish_timer.cancel()
        self._publish_timer = None
    self._pending_publish_ids.clear()