sunflare.virtual#

class sunflare.virtual.VirtualBus#

VirtualBus: signal router for data exchange.

Supports logging via Loggable.

The VirtualBus is a mechanism to exchange data between different parts of the system. Communication can happen between plugins on the same layer as well as between different layers of the system.

It can be used to emit notifications and carry information to other plugins.

The bus offers two communication mechanisms:

  • psygnal.Signal for control signaling;

  • ZMQ sockets for data exchange.

shutdown()#

Shutdown the virtual bus.

Closes the ZMQ context and terminates the streamer queue.

Return type:

None

register_signals(owner, only=None)#

Register the signals of an object in the virtual bus.

Parameters:
  • owner (object) – The instance whose class’s signals are to be cached.

  • only (Iterable[str], optional) – A list of signal names to cache. If not provided, all signals in the class will be cached automatically by inspecting the class attributes.

Return type:

None

Notes

This method inspects the attributes of the owner’s class to find psygnal.Signal descriptors. For each such descriptor, it retrieves the SignalInstance from the owner using the descriptor protocol and stores it in the registry.

connect_subscriber(topic: str) tuple[Socket[bytes], Poller]#
connect_subscriber(topic: Iterable[str]) tuple[Socket[bytes], Poller]

Connect a subscriber to the virtual bus.

A subscriber can be attached to different topics (or receive all messages if no topic) via the topic parameter.

A subtopic of a given topic can be specified as “<main_topic>/<sub_topic>/…”.

Usage:

# registering to all topics
socket, poller = bus.connect_subscriber()
# or more explicitly
socket, poller = bus.connect_subscriber("")

# registering to a specific topic
socket, poller = bus.connect_subscriber("topic")

# you can also use a list, where the first
# entry is the main topic and the rest are subtopics

# "topic/subtopic"
socket, poller = bus.connect_subscriber("topic/subtopic")
socket, poller = bus.connect_subscriber(["topic", "subtopic"])

# "topic/subtopic/subsubtopic"
socket, poller = bus.connect_subscriber("topic/subtopic/subsubtopic")
socket, poller = bus.connect_subscriber(
    ["topic", "subtopic", "subsubtopic"]
)
Parameters:
  • topic (str | Iterable[str], optional) – The topic(s) to subscribe to. If not provided, socket is subscribed to all topics.

  • is_async (bool, optional) – Whether to return an asyncio-compatible socket. Default is False.

Returns:

A tuple containing the subscriber socket and its poller.

Return type:

tuple[zmq.SyncSocket, zmq.Poller]

connect_publisher()#

Connect a publisher to the virtual bus.

Parameters:

asyncio (bool, keyword-only, optional) – Whether to return an asyncio-compatible socket. Default is False.

Returns:

The publisher socket.

Return type:

zmq.SyncSocket | zmq.asyncio.Socket

sunflare.virtual.slot(func=None, *, private=False)#

Decorate a function (or class method) as a slot.

Parameters:
  • func (F, optional) – The function to decorate. If not provided, the decorator must be applied with arguments.

  • private (bool, optional) – Mark the slot as private. Default is False.

Returns:

Either the decorated function or a callable decorator.

Return type:

F | Callable[[F], F]

class sunflare.virtual.HasConnection(*args, **kwargs)#

Protocol marking your class as requesting connection to other signals.

Tip

This protocol is optional and only usable with Controllers and Widgets. Models will not be affected by this protocol.

abstract connection_phase()#

Connect to other objects via the virtual bus.

At application start-up, objects within Redsun can’t know what signals are available from other parts of the session. This method is invoked after the object’s construction and after registration_phase as well, allowing to connect to all available registered signals in the virtual bus. Objects may be able to connect to other signals even after this phase (provided those signals were registered before).

An implementation example:

def connection_phase(self) -> None:
    # you can connect signals from another controller to your local slots...
    self.virtual_bus["OtherController"]["signal"].connect(self._my_slot)

    # ... or to other signals ...
    self.virtual_bus["OtherController"]["signal"].connect(self.sigMySignal)

    # ... or connect to widgets
    self.virtual_bus["OtherWidget"]["sigWidget"].connect(self._my_slot)
Return type:

None

class sunflare.virtual.HasRegistration(*args, **kwargs)#

Protocol marking your class as capable of emitting signals.

Tip

This protocol is optional and only available for Controllers and Widgets. Models will not be affected by this protocol.

abstract registration_phase()#

Register the signals listed in this method to expose them to the virtual bus.

At application start-up, controllers can’t know what signals are available from other controllers. This method is called after all controllers are initialized to allow them to register their signals. Controllers may be able to register further signals even after this phase (but not before the connection_phase ended).

Only signals defined in your object can be registered.

An implementation example:

def registration_phase(self) -> None:
    # you can register all signals...
    self.virtual_bus.register_signals(self)

    # ... or only a selection of them
    self.virtual_bus.register_signals(self, only=["signal"])
Return type:

None

class sunflare.virtual.HasShutdown(*args, **kwargs)#

Protocol marking your class as capable of shutting down.

Tip

This protocol is optional and only available for Controllers. Widgets and Models will not be affected by this protocol.

abstract shutdown()#

Shutdown an object. Performs cleanup operations.

If the object holds any kind of resources, this method should invoke any equivalent shutdown method for each resource.

Return type:

None

class sunflare.virtual.Publisher(virtual_bus)#

Publisher mixin class.

Creates a publisher socket for the virtual bus, which can be used to send messages to subscribers over the virtual bus.

Provides a built-in reference to the application logger.

Parameters:

virtual_bus (VirtualBus) – Virtual bus.

pub_socket#

Publisher socket.

Type:

zmq.Socket[bytes]

logger#

Reference to application logger.

Type:

logging.Logger

class sunflare.virtual.Subscriber(virtual_bus, topics='')#

Subscriber mixin class.

The synchronous subscriber deploys a background thread which will poll the virtual bus for incoming messages.

Provides a built-in reference to the application logger.

Parameters:
  • virtual_bus (VirtualBus) – Virtual bus.

  • topics (str | Iterable[str], optional) – Subscriber topics. Default is "" (receive all messages).

sub_socket#

Subscriber socket.

Type:

zmq.Socket[bytes]

sub_poller#

Poller for the subscriber socket.

Type:

zmq.Poller

sub_thread#

Subscriber thread.

Type:

threading.Thread

sub_topics#

Subscriber topics.

Type:

str | Iterable[str]

logger#

Reference to application logger.

Type:

logging.Logger

abstract consume(content)#

Consume the incoming message.

The user must implement this method to process incoming messages.

Parameters:

content (list[bytes]) – Incoming message.

Return type:

None

sunflare.virtual.encode(obj)#

Encode an object in msgpack format.

Parameters:

obj (T) – The object to encode.

Returns:

The encoded object.

Return type:

bytes

sunflare.virtual.decode(data)#

Decode a serialized message to an object.

Note

For correct type checking, the decoded object should be casted by the caller of this function using typing.cast.

Parameters:

data (bytes) – The encoded data.

Returns:

The decoded object.

Return type:

object