Перейти к содержанию

SberBridge

Ядро интеграции: управление MQTT-соединением к Sber Cloud, диспетчеризация команд, отслеживание состояний HA-устройств.

Sber Smart Home MQTT Bridge - core bridge logic.

Manages: - Async MQTT connection to Sber cloud broker (aiomqtt) - HA state change listening and publishing to Sber - Sber command reception and forwarding to HA services - Connection health monitoring and device acknowledgment tracking

RECONNECT_INTERVAL_MIN module-attribute

RECONNECT_INTERVAL_MIN = SETTINGS_DEFAULTS[CONF_RECONNECT_MIN]

Default minimum seconds to wait before reconnecting after an MQTT connection loss.

RECONNECT_INTERVAL_MAX module-attribute

RECONNECT_INTERVAL_MAX = SETTINGS_DEFAULTS[CONF_RECONNECT_MAX]

Default maximum seconds to wait (5 minutes) with exponential backoff.

MAX_MQTT_PAYLOAD_SIZE module-attribute

MAX_MQTT_PAYLOAD_SIZE = SETTINGS_DEFAULTS[CONF_MAX_MQTT_PAYLOAD]

Default maximum MQTT payload size in bytes (1 MB) to prevent DoS from oversized messages.

RECONNECT_GRACE_TIMEOUT module-attribute

RECONNECT_GRACE_TIMEOUT = 30.0

Maximum seconds to wait for Sber acknowledgment after (re)connect.

After a reconnect, the bridge publishes HA states and waits for Sber to acknowledge them (via status_request or config_request) before accepting commands. This timeout is a fallback in case Sber never sends a request.

BridgeStats dataclass

BridgeStats(connected_since=None, messages_received=0, messages_sent=0, commands_received=0, config_requests=0, status_requests=0, errors_from_sber=0, publish_errors=0, last_message_time=None, reconnect_count=0, acknowledged_entities=set())

Connection statistics and health metrics for the Sber MQTT bridge.

connected_since class-attribute instance-attribute

connected_since = None

Timestamp when the current connection was established.

messages_received class-attribute instance-attribute

messages_received = 0

Total MQTT messages received from Sber.

messages_sent class-attribute instance-attribute

messages_sent = 0

Total MQTT messages published to Sber.

commands_received class-attribute instance-attribute

commands_received = 0

Total Sber commands processed.

config_requests class-attribute instance-attribute

config_requests = 0

Total config requests received from Sber.

status_requests class-attribute instance-attribute

status_requests = 0

Total status requests received from Sber.

errors_from_sber class-attribute instance-attribute

errors_from_sber = 0

Total error messages received from Sber.

publish_errors class-attribute instance-attribute

publish_errors = 0

Total failed publish attempts.

last_message_time class-attribute instance-attribute

last_message_time = None

Timestamp of the last message received.

reconnect_count class-attribute instance-attribute

reconnect_count = 0

Total number of reconnections since startup.

acknowledged_entities class-attribute instance-attribute

acknowledged_entities = field(default_factory=set)

Entity IDs that Sber has acknowledged (via status_request or command).

as_dict

as_dict()

Return stats as a serializable dict.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def as_dict(self) -> dict:
    """Return stats as a serializable dict."""
    now = time.monotonic()
    return {
        "connected_since": self.connected_since,
        "connection_uptime_seconds": (round(now - self.connected_since, 1) if self.connected_since else None),
        "messages_received": self.messages_received,
        "messages_sent": self.messages_sent,
        "commands_received": self.commands_received,
        "config_requests": self.config_requests,
        "status_requests": self.status_requests,
        "errors_from_sber": self.errors_from_sber,
        "publish_errors": self.publish_errors,
        "reconnect_count": self.reconnect_count,
        "acknowledged_entities": sorted(self.acknowledged_entities),
    }

SberBridge

SberBridge(hass, entry)

Bridge between Home Assistant and Sber Smart Home MQTT cloud.

Initialize the bridge.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
    """Initialize the bridge."""
    self._hass = hass
    self._entry = entry

    self._login: str = entry.data[CONF_SBER_LOGIN]
    self._password: str = entry.data[CONF_SBER_PASSWORD]
    self._broker: str = entry.data[CONF_SBER_BROKER]
    self._port: int = entry.data[CONF_SBER_PORT]
    self._verify_ssl: bool = entry.options.get(
        CONF_SBER_VERIFY_SSL, entry.data.get(CONF_SBER_VERIFY_SSL, True)
    )

    self._root_topic = f"{SBER_TOPIC_PREFIX}/{self._login}"
    self._down_topic = f"{self._root_topic}/down"

    self._entities: dict[str, BaseEntity] = {}
    self._enabled_entity_ids: list[str] = []
    self._redefinitions: dict[str, dict] = {}
    self._entity_links: dict[str, dict[str, str]] = {}
    """Primary entity → {role: linked_entity_id}."""
    self._linked_reverse: dict[str, tuple[str, str]] = {}
    """Linked entity_id → (primary_entity_id, role)."""

    self._mqtt_client: aiomqtt.Client | None = None
    self._connection_task: asyncio.Task | None = None
    self._running = False
    self._connected = False

    # Configurable operational settings (from config_entry.options)
    opts = entry.options
    self._reconnect_min: int = opts.get(CONF_RECONNECT_MIN, RECONNECT_INTERVAL_MIN)
    self._reconnect_max: int = opts.get(CONF_RECONNECT_MAX, RECONNECT_INTERVAL_MAX)
    self._reconnect_interval = self._reconnect_min
    self._debounce_delay: float = opts.get(CONF_DEBOUNCE_DELAY, 0.1)
    self._max_payload_size: int = opts.get(CONF_MAX_MQTT_PAYLOAD, MAX_MQTT_PAYLOAD_SIZE)

    self._unsub_state_listeners: list[Callable] = []
    self._unsub_lifecycle_listeners: list[Callable] = []

    self._stats = BridgeStats()
    self._last_config_publish_time: float | None = None

    # Gate: delay initial MQTT publish until HA is fully started so that
    # entity states (and therefore Sber features) are fully populated.
    self._ha_ready = asyncio.Event()

    # Debounce: coalesce rapid state changes into a single publish
    self._pending_publish_ids: set[str] = set()
    self._publish_timer: asyncio.TimerHandle | None = None

    # Reconnect guard: reject Sber commands until Sber acknowledges
    # our published states (via status_request or config_request).
    # This prevents Sber cloud from overriding HA state with stale cache.
    self._awaiting_sber_ack: bool = False
    """True while waiting for Sber to acknowledge our states after (re)connect."""
    self._awaiting_sber_ack_deadline: float = 0.0
    """Fallback deadline: stop waiting even without acknowledgment."""

    # Delayed confirm tasks per entity (dedup: cancel previous on new command)
    self._confirm_tasks: dict[str, asyncio.Task] = {}

    # Debounced redefinitions persistence (avoid reload mid-MQTT-loop)
    self._redef_dirty = False
    self._redef_timer: asyncio.TimerHandle | None = None

    # Ring buffer for MQTT message log (DevTools)
    log_size = opts.get(CONF_MESSAGE_LOG_SIZE, 50)
    self._message_log: deque[dict[str, Any]] = deque(maxlen=log_size)

    # Real-time subscribers for MQTT message log (DevTools WebSocket push)
    self._message_subscribers: set[Callable[[dict], None]] = set()

is_connected property

is_connected

Return True if connected to Sber MQTT.

connection_phase property

connection_phase

Return the current connection lifecycle phase.

Phases

starting — HA not fully loaded, waiting for integrations. connecting — MQTT connection in progress. awaiting_ack — connected, published config, waiting for Sber to acknowledge. ready — fully operational, accepting commands. disconnected — not connected to MQTT broker.

entities_count property

entities_count

Return the number of loaded Sber entities.

entities property

entities

Return the dict of loaded Sber entities (read-only view).

enabled_entity_ids property

enabled_entity_ids

Return a copy of the enabled entity ID list.

redefinitions property

redefinitions

Return a copy of the entity redefinitions mapping.

entity_links

Return the current entity link map.

linked_entity_ids property

linked_entity_ids

Return set of all linked entity IDs (not primary).

stats property

stats

Return bridge statistics as a serializable dict.

unacknowledged_entities property

unacknowledged_entities

Return entity IDs that were published but not yet acknowledged by Sber.

message_log property

message_log

Return a copy of the MQTT message log ring buffer.

clear_message_log

clear_message_log()

Clear the MQTT message log.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def clear_message_log(self) -> None:
    """Clear the MQTT message log."""
    self._message_log.clear()

apply_settings

apply_settings(options)

Apply changed operational settings without full bridge restart.

Settings that take effect immediately: debounce_delay, max_mqtt_payload_size, message_log_size. Settings that take effect on next reconnect: reconnect_min, reconnect_max, verify_ssl.

Parameters:

Name Type Description Default
options dict

Config entry options dict.

required
Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def apply_settings(self, options: dict) -> None:
    """Apply changed operational settings without full bridge restart.

    Settings that take effect immediately: debounce_delay, max_mqtt_payload_size,
    message_log_size.
    Settings that take effect on next reconnect: reconnect_min, reconnect_max, verify_ssl.

    Args:
        options: Config entry options dict.
    """
    self._debounce_delay = options.get(CONF_DEBOUNCE_DELAY, 0.1)
    self._max_payload_size = options.get(CONF_MAX_MQTT_PAYLOAD, MAX_MQTT_PAYLOAD_SIZE)
    self._reconnect_min = options.get(CONF_RECONNECT_MIN, RECONNECT_INTERVAL_MIN)
    self._reconnect_max = options.get(CONF_RECONNECT_MAX, RECONNECT_INTERVAL_MAX)
    self._verify_ssl = options.get(
        CONF_SBER_VERIFY_SSL, self._entry.data.get(CONF_SBER_VERIFY_SSL, True)
    )

    new_log_size = options.get(CONF_MESSAGE_LOG_SIZE, 50)
    if new_log_size != self._message_log.maxlen:
        old_messages = list(self._message_log)
        self._message_log = deque(old_messages[-new_log_size:], maxlen=new_log_size)

    _LOGGER.info("Bridge settings applied (debounce=%.2fs, log=%d)", self._debounce_delay, new_log_size)

async_publish_raw async

async_publish_raw(payload, target)

Publish arbitrary JSON payload to Sber MQTT for debugging.

Parameters:

Name Type Description Default
payload str

Raw JSON string to publish.

required
target str

Topic suffix — either "config" or "status".

required

Raises:

Type Description
RuntimeError

If not connected to MQTT broker.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_publish_raw(self, payload: str, target: str) -> None:
    """Publish arbitrary JSON payload to Sber MQTT for debugging.

    Args:
        payload: Raw JSON string to publish.
        target: Topic suffix — either "config" or "status".

    Raises:
        RuntimeError: If not connected to MQTT broker.
    """
    if not self._connected or self._mqtt_client is None:
        msg = "Not connected to MQTT"
        raise RuntimeError(msg)

    topic = f"{self._root_topic}/up/{target}"
    await self._mqtt_client.publish(topic, payload)
    self._stats.messages_sent += 1
    self._log_message("out", topic, payload)

subscribe_messages

subscribe_messages(callback_fn)

Subscribe to new MQTT messages in real time.

Parameters:

Name Type Description Default
callback_fn Callable[[dict], None]

Called with each new message dict.

required

Returns:

Type Description
Callable[[], None]

Unsubscribe callable.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
def subscribe_messages(self, callback_fn: Callable[[dict], None]) -> Callable[[], None]:
    """Subscribe to new MQTT messages in real time.

    Args:
        callback_fn: Called with each new message dict.

    Returns:
        Unsubscribe callable.
    """
    self._message_subscribers.add(callback_fn)

    def unsub() -> None:
        self._message_subscribers.discard(callback_fn)

    return unsub

async_start async

async_start()

Start the bridge: load entities, subscribe to HA events, connect MQTT.

HA state events are subscribed immediately (independent of MQTT connectivity) so that no state changes are lost while waiting for the first connection. MQTT connection is established in a background task with exponential backoff.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_start(self) -> None:
    """Start the bridge: load entities, subscribe to HA events, connect MQTT.

    HA state events are subscribed immediately (independent of MQTT connectivity)
    so that no state changes are lost while waiting for the first connection.
    MQTT connection is established in a background task with exponential backoff.
    """
    self._running = True
    self._load_exposed_entities()
    self._subscribe_ha_events()
    self._connection_task = asyncio.create_task(self._mqtt_connection_loop())

    # If HA is already running (e.g. integration reload), entities are
    # fully available — mark ready immediately.  Otherwise, wait for
    # EVENT_HOMEASSISTANT_STARTED to reload entities with real states.
    if self._hass.is_running:
        _LOGGER.debug("HA already running — entities loaded, marking ready")
        self._ha_ready.set()
    else:
        self._unsub_lifecycle_listeners.append(
            self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self._on_homeassistant_started)
        )

async_stop async

async_stop()

Stop the bridge: disconnect MQTT, unsubscribe from HA events.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_stop(self) -> None:
    """Stop the bridge: disconnect MQTT, unsubscribe from HA events."""
    self._running = False

    # Cancel pending debounced publish
    if self._publish_timer is not None:
        self._publish_timer.cancel()
        self._publish_timer = None
    self._pending_publish_ids.clear()

    for unsub in self._unsub_state_listeners:
        unsub()
    self._unsub_state_listeners.clear()

    for unsub in self._unsub_lifecycle_listeners:
        unsub()
    self._unsub_lifecycle_listeners.clear()

    if self._connection_task:
        self._connection_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._connection_task
        self._connection_task = None

    self._connected = False

async_publish_entity_status async

async_publish_entity_status(entity_id)

Publish the current state of a single entity to Sber cloud.

Parameters:

Name Type Description Default
entity_id str

HA entity identifier.

required
Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_publish_entity_status(self, entity_id: str) -> None:
    """Publish the current state of a single entity to Sber cloud.

    Args:
        entity_id: HA entity identifier.
    """
    await self._publish_states([entity_id])

async_republish async

async_republish()

Force republish full device config to Sber cloud.

Source code in custom_components/sber_mqtt_bridge/sber_bridge.py
async def async_republish(self) -> None:
    """Force republish full device config to Sber cloud."""
    await self._publish_config()