Перейти к содержанию

MessageLogger

Ring-buffer для MQTT-сообщений (входящих и исходящих) плюс real-time fan-out подписчикам через WebSocket (DevTools panel).

Извлечён из SberBridge в v1.25.1. Сохраняет последние maxlen сообщений в collections.deque, поддерживает subscribe(callback) → возвращает unsubscribe callable, и resize(new_maxlen) для runtime-изменения размера буфера.

MQTT message ring-buffer logger with real-time subscriber support.

Extracted from SberBridge to isolate DevTools logging from bridge core logic (SRP). MessageLogger stores the last maxlen messages in a deque and fans them out to any number of subscriber callbacks for WebSocket push.

MessageLogger

MessageLogger(maxlen)

Ring-buffer logger for MQTT messages (incoming and outgoing).

Initialize the logger with a ring-buffer of the given size.

Parameters:

Name Type Description Default
maxlen int

Maximum number of messages kept in the ring buffer.

required
Source code in custom_components/sber_mqtt_bridge/message_logger.py
def __init__(self, maxlen: int) -> None:
    """Initialize the logger with a ring-buffer of the given size.

    Args:
        maxlen: Maximum number of messages kept in the ring buffer.
    """
    self._log: deque[dict[str, Any]] = deque(maxlen=maxlen)
    self._subscribers: set[Callable[[dict], None]] = set()

entries property

entries

Return a snapshot of the ring buffer as a list (oldest first).

maxlen property

maxlen

Return the current ring-buffer capacity.

resize

resize(new_maxlen)

Replace the underlying deque with a new one of the requested size.

Keeps the most recent new_maxlen entries.

Parameters:

Name Type Description Default
new_maxlen int

New ring-buffer capacity.

required
Source code in custom_components/sber_mqtt_bridge/message_logger.py
def resize(self, new_maxlen: int) -> None:
    """Replace the underlying deque with a new one of the requested size.

    Keeps the most recent ``new_maxlen`` entries.

    Args:
        new_maxlen: New ring-buffer capacity.
    """
    if new_maxlen == self._log.maxlen:
        return
    old = list(self._log)
    self._log = deque(old[-new_maxlen:], maxlen=new_maxlen)

log

log(direction, topic, payload)

Append a message to the ring buffer and notify subscribers.

Parameters:

Name Type Description Default
direction str

Either "in" (received) or "out" (sent).

required
topic str

MQTT topic string.

required
payload str

Decoded payload (may be raw text for debugging).

required
Source code in custom_components/sber_mqtt_bridge/message_logger.py
def log(self, direction: str, topic: str, payload: str) -> None:
    """Append a message to the ring buffer and notify subscribers.

    Args:
        direction: Either ``"in"`` (received) or ``"out"`` (sent).
        topic: MQTT topic string.
        payload: Decoded payload (may be raw text for debugging).
    """
    msg_dict: dict[str, Any] = {
        "time": time.time(),
        "direction": direction,
        "topic": topic,
        "payload": payload,
    }
    self._log.append(msg_dict)
    for cb in list(self._subscribers):
        try:
            cb(msg_dict)
        except (RuntimeError, ValueError, TypeError, AttributeError):
            _LOGGER.exception("Error in message subscriber callback")

clear

clear()

Remove all messages from the ring buffer.

Source code in custom_components/sber_mqtt_bridge/message_logger.py
def clear(self) -> None:
    """Remove all messages from the ring buffer."""
    self._log.clear()

subscribe

subscribe(callback_fn)

Subscribe to new messages in real time.

Parameters:

Name Type Description Default
callback_fn Callable[[dict], None]

Function called with each new message dict.

required

Returns:

Type Description
Callable[[], None]

Unsubscribe callable — invoke once to detach the listener.

Source code in custom_components/sber_mqtt_bridge/message_logger.py
def subscribe(self, callback_fn: Callable[[dict], None]) -> Callable[[], None]:
    """Subscribe to new messages in real time.

    Args:
        callback_fn: Function called with each new message dict.

    Returns:
        Unsubscribe callable — invoke once to detach the listener.
    """
    self._subscribers.add(callback_fn)

    def unsub() -> None:
        self._subscribers.discard(callback_fn)

    return unsub