sunflare.virtual
#
- class sunflare.virtual.VirtualBus#
VirtualBus
: signal router for data exchange.Supports logging via
Loggable
.The
VirtualBus
is a mechanism to exchange data between different parts of the system. Communication can happen between plugins on the same layer as well as between different layers of the system.It can be used to emit notifications and carry information to other plugins.
The bus offers two communication mechanisms:
psygnal.Signal for control signaling;
ZMQ sockets for data exchange.
- shutdown()#
Shutdown the virtual bus.
Closes the ZMQ context and terminates the streamer queue.
- Return type:
None
- register_signals(owner, only=None)#
Register the signals of an object in the virtual bus.
- Parameters:
owner (
object
) – The instance whose class’s signals are to be cached.only (
Iterable[str]
, optional) – A list of signal names to cache. If not provided, all signals in the class will be cached automatically by inspecting the class attributes.
- Return type:
None
Notes
This method inspects the attributes of the owner’s class to find
psygnal.Signal
descriptors. For each such descriptor, it retrieves theSignalInstance
from the owner using the descriptor protocol and stores it in the registry.
- connect_subscriber(topic: str) tuple[Socket[bytes], Poller] #
- connect_subscriber(topic: Iterable[str]) tuple[Socket[bytes], Poller]
Connect a subscriber to the virtual bus.
A subscriber can be attached to different topics (or receive all messages if no topic) via the
topic
parameter.A subtopic of a given topic can be specified as “<main_topic>/<sub_topic>/…”.
Usage:
# registering to all topics socket, poller = bus.connect_subscriber() # or more explicitly socket, poller = bus.connect_subscriber("") # registering to a specific topic socket, poller = bus.connect_subscriber("topic") # you can also use a list, where the first # entry is the main topic and the rest are subtopics # "topic/subtopic" socket, poller = bus.connect_subscriber("topic/subtopic") socket, poller = bus.connect_subscriber(["topic", "subtopic"]) # "topic/subtopic/subsubtopic" socket, poller = bus.connect_subscriber("topic/subtopic/subsubtopic") socket, poller = bus.connect_subscriber( ["topic", "subtopic", "subsubtopic"] )
- Parameters:
topic (
str | Iterable[str]
, optional) – The topic(s) to subscribe to. If not provided, socket is subscribed to all topics.is_async (
bool
, optional) – Whether to return an asyncio-compatible socket. Default isFalse
.
- Returns:
A tuple containing the subscriber socket and its poller.
- Return type:
tuple[zmq.SyncSocket, zmq.Poller]
- connect_publisher()#
Connect a publisher to the virtual bus.
- Parameters:
asyncio (
bool
, keyword-only, optional) – Whether to return an asyncio-compatible socket. Default isFalse
.- Returns:
The publisher socket.
- Return type:
zmq.SyncSocket | zmq.asyncio.Socket
- sunflare.virtual.slot(func=None, *, private=False)#
Decorate a function (or class method) as a slot.
- Parameters:
func (
F
, optional) – The function to decorate. If not provided, the decorator must be applied with arguments.private (
bool
, optional) – Mark the slot as private. Default isFalse
.
- Returns:
Either the decorated function or a callable decorator.
- Return type:
F | Callable[[F], F]
- class sunflare.virtual.HasConnection(*args, **kwargs)#
Protocol marking your class as requesting connection to other signals.
Tip
This protocol is optional and only usable with
Controllers
andWidgets
.Models
will not be affected by this protocol.- abstract connection_phase()#
Connect to other objects via the virtual bus.
At application start-up, objects within Redsun can’t know what signals are available from other parts of the session. This method is invoked after the object’s construction and after registration_phase as well, allowing to connect to all available registered signals in the virtual bus. Objects may be able to connect to other signals even after this phase (provided those signals were registered before).
An implementation example:
def connection_phase(self) -> None: # you can connect signals from another controller to your local slots... self.virtual_bus["OtherController"]["signal"].connect(self._my_slot) # ... or to other signals ... self.virtual_bus["OtherController"]["signal"].connect(self.sigMySignal) # ... or connect to widgets self.virtual_bus["OtherWidget"]["sigWidget"].connect(self._my_slot)
- Return type:
None
- class sunflare.virtual.HasRegistration(*args, **kwargs)#
Protocol marking your class as capable of emitting signals.
Tip
This protocol is optional and only available for
Controllers
andWidgets
.Models
will not be affected by this protocol.- abstract registration_phase()#
Register the signals listed in this method to expose them to the virtual bus.
At application start-up, controllers can’t know what signals are available from other controllers. This method is called after all controllers are initialized to allow them to register their signals. Controllers may be able to register further signals even after this phase (but not before the connection_phase ended).
Only signals defined in your object can be registered.
An implementation example:
def registration_phase(self) -> None: # you can register all signals... self.virtual_bus.register_signals(self) # ... or only a selection of them self.virtual_bus.register_signals(self, only=["signal"])
- Return type:
None
- class sunflare.virtual.HasShutdown(*args, **kwargs)#
Protocol marking your class as capable of shutting down.
Tip
This protocol is optional and only available for
Controllers
.Widgets
andModels
will not be affected by this protocol.- abstract shutdown()#
Shutdown an object. Performs cleanup operations.
If the object holds any kind of resources, this method should invoke any equivalent shutdown method for each resource.
- Return type:
None
- class sunflare.virtual.Publisher(virtual_bus)#
Publisher mixin class.
Creates a publisher socket for the virtual bus, which can be used to send messages to subscribers over the virtual bus.
Provides a built-in reference to the application logger.
- Parameters:
virtual_bus (
VirtualBus
) – Virtual bus.
- pub_socket#
Publisher socket.
- Type:
zmq.Socket[bytes]
- logger#
Reference to application logger.
- Type:
logging.Logger
- class sunflare.virtual.Subscriber(virtual_bus, topics='')#
Subscriber mixin class.
The synchronous subscriber deploys a background thread which will poll the virtual bus for incoming messages.
Provides a built-in reference to the application logger.
- Parameters:
virtual_bus (
VirtualBus
) – Virtual bus.topics (
str | Iterable[str]
, optional) – Subscriber topics. Default is""
(receive all messages).
- sub_socket#
Subscriber socket.
- Type:
zmq.Socket[bytes]
- sub_poller#
Poller for the subscriber socket.
- Type:
zmq.Poller
- sub_thread#
Subscriber thread.
- Type:
threading.Thread
- sub_topics#
Subscriber topics.
- Type:
str | Iterable[str]
- logger#
Reference to application logger.
- Type:
logging.Logger
- abstract consume(content)#
Consume the incoming message.
The user must implement this method to process incoming messages.
- Parameters:
content (
list[bytes]
) – Incoming message.- Return type:
None
- sunflare.virtual.encode(obj)#
Encode an object in msgpack format.
- Parameters:
obj (
T
) – The object to encode.- Returns:
The encoded object.
- Return type:
bytes
- sunflare.virtual.decode(data)#
Decode a serialized message to an object.
Note
For correct type checking, the decoded object should be casted by the caller of this function using
typing.cast
.- Parameters:
data (
bytes
) – The encoded data.- Returns:
The decoded object.
- Return type:
object