Skip to content

basic_bot.commons.hub_state_monitor

HubStateMonitor Objects

class HubStateMonitor()

This class updates the process local copy of the hub state as subscribed keys are changed. It starts a thread to listen for state updates from the central hub and applies them to the local state via hub_state.update_state_from_message_data.

Before applying the state update, it calls the on_state_update callback if it is provided. This allows the caller to do something with the state update before it is applied to the local state and to see the difference in current state vs. to be applied state.

The state update is applied to the local state via hub_state.update_state_from_message_data regardless of whether the on_state_update callback is provided or the value it returns. To alter the state you should alway send an updateState message to the central hub.

Usage:

from basic_bot.commons.hub_state import HubState
from basic_bot.commons.hub_state_monitor import HubStateMonitor

hub_state = HubState({"test_key": "test_value"})
monitor = HubStateMonitor(hub_state, "test_identity", ["test_key"])
monitor.start()
The above example will start a background thread that listens for state updates to the "test_key" key from the central hub and updates the local state with the new value.

For a more complex example using callbacks, see usage in daphbot example - daphbot_service

__init__

def __init__(
    hub_state: HubState,
    identity: str,
    subscribed_keys: Union[List[str], Literal["*"]],
    on_connect: Optional[Callable[
        [WebSocketClientProtocol],
        None,
    ]] = None,
    on_state_update: Optional[Callable[
        [
            WebSocketClientProtocol,
            str,
            dict[str, Any],
        ],
        None,
    ]] = None
) -> None

Instantiate a HubStateMonitor object.

Note that subscribed_keys may be an empty list if you just want to publish state updates to the central hub and not receive any state updates.