Skip to content

API reference

Classes:

Name Description
Device

Base class for devices.

PDevice

Minimal required protocol for a recognizable device in Redsun.

Device

Bases: PDevice, ABC

Base class for devices.

Users may subclass from this device and implement their own configuration properties and methods.

Parameters:

Name Type Description Default
name str

Name of the device. Serves as a unique identifier for the object created from it.

required
kwargs Any

Additional keyword arguments for device subclasses.

{}

Methods:

Name Description
describe_configuration

Provide a description of the device configuration.

read_configuration

Provide a description of the device configuration.

Attributes:

Name Type Description
name str

The name of the device, serving as a unique identifier.

parent None

Parent of the device. Always returns None for compliance with HasParent protocol.

Source code in src/sunflare/device/_base.py
class Device(PDevice, abc.ABC):
    """Base class for devices.

    Users may subclass from this device and implement their own
    configuration properties and methods.

    Parameters
    ----------
    name : str
        Name of the device. Serves as a unique identifier for the object created from it.
    kwargs : Any, optional
        Additional keyword arguments for device subclasses.
    """

    @abc.abstractmethod
    def __init__(self, name: str, /, **kwargs: Any) -> None:
        self._name = name
        super().__init__(**kwargs)

    @abc.abstractmethod
    def describe_configuration(self) -> dict[str, Descriptor]:
        """Provide a description of the device configuration.

        Subclasses should override this method to provide their own
        configuration description compatible with the Bluesky event model.

        Returns
        -------
        dict[str, Descriptor]
            A dictionary with the description of each field of the device configuration.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def read_configuration(self) -> dict[str, Reading[Any]]:
        """Provide a description of the device configuration.

        Subclasses should override this method to provide their own
        configuration reading compatible with the Bluesky event model.

        Returns
        -------
        dict[str, Reading[Any]]
            A dictionary with the reading of each field of the device configuration.
        """
        raise NotImplementedError

    @property
    def name(self) -> str:
        """The name of the device, serving as a unique identifier."""
        return self._name

    @property
    def parent(self) -> None:
        """Parent of the device. Always returns None for compliance with [`HasParent`]() protocol."""
        return None

name property

name: str

The name of the device, serving as a unique identifier.

parent property

parent: None

Parent of the device. Always returns None for compliance with HasParent protocol.

describe_configuration abstractmethod

describe_configuration() -> dict[str, Descriptor]

Provide a description of the device configuration.

Subclasses should override this method to provide their own configuration description compatible with the Bluesky event model.

Returns:

Type Description
dict[str, Descriptor]

A dictionary with the description of each field of the device configuration.

Source code in src/sunflare/device/_base.py
@abc.abstractmethod
def describe_configuration(self) -> dict[str, Descriptor]:
    """Provide a description of the device configuration.

    Subclasses should override this method to provide their own
    configuration description compatible with the Bluesky event model.

    Returns
    -------
    dict[str, Descriptor]
        A dictionary with the description of each field of the device configuration.
    """
    raise NotImplementedError

read_configuration abstractmethod

read_configuration() -> dict[str, Reading[Any]]

Provide a description of the device configuration.

Subclasses should override this method to provide their own configuration reading compatible with the Bluesky event model.

Returns:

Type Description
dict[str, Reading[Any]]

A dictionary with the reading of each field of the device configuration.

Source code in src/sunflare/device/_base.py
@abc.abstractmethod
def read_configuration(self) -> dict[str, Reading[Any]]:
    """Provide a description of the device configuration.

    Subclasses should override this method to provide their own
    configuration reading compatible with the Bluesky event model.

    Returns
    -------
    dict[str, Reading[Any]]
        A dictionary with the reading of each field of the device configuration.
    """
    raise NotImplementedError

PDevice

Bases: HasName, HasParent, Configurable[Any], Protocol

Minimal required protocol for a recognizable device in Redsun.

Source code in src/sunflare/device/_base.py
@runtime_checkable
class PDevice(HasName, HasParent, Configurable[Any], Protocol):  # pragma: no cover
    """Minimal required protocol for a recognizable device in Redsun."""

Classes:

Name Description
PPresenter

Presenter protocol class.

Presenter

Presenter base class.

PPresenter

Bases: Protocol

Presenter protocol class.

Attributes:

Name Type Description
name str

Identity key of the presenter.

devices Mapping[str, Device]

Reference to the devices used in the presenter.

Notes

Access to the virtual container is optional and should be acquired by implementing :class:~sunflare.virtual.IsProvider or :class:~sunflare.virtual.IsInjectable.

Source code in src/sunflare/presenter/_base.py
@runtime_checkable
class PPresenter(Protocol):  # pragma: no cover
    """Presenter protocol class.

    Attributes
    ----------
    name : str
        Identity key of the presenter.
    devices : Mapping[str, sunflare.device.Device]
        Reference to the devices used in the presenter.

    Notes
    -----
    Access to the virtual container is optional and should be acquired
    by implementing :class:`~sunflare.virtual.IsProvider` or
    :class:`~sunflare.virtual.IsInjectable`.
    """

    name: str
    devices: Mapping[str, Device]

Presenter

Bases: PPresenter, ABC

Presenter base class.

Parameters:

Name Type Description Default
name str

Identity key of the presenter. Passed as positional-only argument.

required
devices Mapping[str, Device]

Reference to the devices used in the presenter.

required
kwargs Any

Additional keyword arguments for presenter subclasses.

{}
Source code in src/sunflare/presenter/_base.py
class Presenter(PPresenter, ABC):
    """Presenter base class.

    Parameters
    ----------
    name : str
        Identity key of the presenter. Passed as positional-only argument.
    devices : Mapping[str, sunflare.device.Device]
        Reference to the devices used in the presenter.
    kwargs : Any, optional
        Additional keyword arguments for presenter subclasses.
    """

    @abstractmethod
    def __init__(
        self,
        name: str,
        devices: Mapping[str, Device],
        /,
        **kwargs: Any,
    ) -> None:
        self.name = name
        self.devices = devices
        super().__init__(**kwargs)

Classes:

Name Description
RedSunConfig

Base configuration schema for Redsun applications.

VirtualContainer

Data exchange and dependency injection layer.

HasShutdown

Protocol marking your class as capable of shutting down.

IsInjectable

Protocol marking a class as injectable with dependencies from the container.

IsProvider

Protocol marking a class as a provider of dependencies.

Attributes:

Name Type Description
CallbackType TypeAlias

Type alias for document callback functions.

SignalCache TypeAlias

Cache type for storing signal instances registered from component classes.

CallbackType module-attribute

CallbackType: TypeAlias = (
    Callable[[str, Document], None] | DocumentRouter
)

Type alias for document callback functions.

SignalCache module-attribute

SignalCache: TypeAlias = dict[str, SignalInstance]

Cache type for storing signal instances registered from component classes.

RedSunConfig

Bases: TypedDict

Base configuration schema for Redsun applications.

Parameters:

Name Type Description Default
schema_version Required[float]

An enumeration.

required
frontend Required[str]

An enumeration.

required
session NotRequired[str]

An enumeration.

required
metadata NotRequired[dict[str, Any]]

An enumeration.

required

Attributes:

Name Type Description
schema_version Required[float]

Plugin schema version.

frontend Required[str]

Frontend toolkit identifier (e.g. "pyqt", "pyside").

session NotRequired[str]

Session display name. If not provided, default is "redsun".

metadata NotRequired[dict[str, Any]]

Additional session-specific metadata to include in the configuration.

Source code in src/sunflare/virtual/_config.py
class RedSunConfig(TypedDict, total=False):
    """Base configuration schema for Redsun applications."""

    schema_version: Required[float]
    """Plugin schema version."""

    frontend: Required[str]
    """Frontend toolkit identifier (e.g. `"pyqt"`, `"pyside"`)."""

    session: NotRequired[str]
    """Session display name. If not provided, default is `"redsun"`."""

    metadata: NotRequired[dict[str, Any]]
    """Additional session-specific metadata to include in the configuration."""

schema_version instance-attribute

schema_version: Required[float]

Plugin schema version.

frontend instance-attribute

frontend: Required[str]

Frontend toolkit identifier (e.g. "pyqt", "pyside").

session instance-attribute

session: NotRequired[str]

Session display name. If not provided, default is "redsun".

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Additional session-specific metadata to include in the configuration.

VirtualContainer

Bases: DynamicContainer, Loggable

Data exchange and dependency injection layer.

VirtualContainer is a [DynamicContainer][dependency_injector.containers.DynamicContainer] that also acts as a runtime signal bus and data sharing layer for an application.

Methods:

Name Description
register_signals

Register the signals of an object in the virtual container.

register_callbacks

Register one or more document callbacks in the virtual container.

Attributes:

Name Type Description
schema_version float

The plugin schema version specified in the configuration.

frontend str

The frontend toolkit identifier specified in the configuration.

session str

The session display name specified in the configuration.

metadata dict[str, object]

The session metadata specified in the configuration.

callbacks dict[str, CallbackType]

The currently registered document callbacks.

signals dict[str, SignalCache]

The currently registered signals.

Source code in src/sunflare/virtual/_container.py
class VirtualContainer(dic.DynamicContainer, Loggable):
    """Data exchange and dependency injection layer.

    `VirtualContainer` is a [`DynamicContainer`][dependency_injector.containers.DynamicContainer]
    that also acts as a runtime signal bus and data sharing layer for an application.
    """

    _signals = dip.Factory(dict[str, SignalCache])
    _callbacks = dip.Factory(dict[str, CallbackType])
    _config = dip.Singleton(_FrozenConfig)

    @property
    def schema_version(self) -> float:
        """The plugin schema version specified in the configuration."""
        return self._config().schema_version

    @property
    def frontend(self) -> str:
        """The frontend toolkit identifier specified in the configuration."""
        return self._config().frontend

    @property
    def session(self) -> str:
        """The session display name specified in the configuration."""
        return self._config().session

    @property
    def metadata(self) -> dict[str, object]:
        """The session metadata specified in the configuration."""
        return self._config().metadata

    def _set_configuration(self, config: RedSunConfig) -> None:
        """Set the application configuration.

        Private for use by the application layer at build time.

        Parameters
        ----------
        config : RedSunConfig
            The application configuration to set.
        """
        self._config.set_kwargs(
            schema_version=config["schema_version"],
            frontend=config["frontend"],
            session=config.get("session", "redsun"),
            metadata=config.get("metadata", {}),
        )

    def register_signals(
        self, owner: HasName, name: str | None = None, only: Iterable[str] | None = None
    ) -> None:
        """Register the signals of an object in the virtual container.

        Parameters
        ----------
        owner : HasName
            The instance whose class's signals are to be cached.
            Must provide a `name` attribute.
        name : str | None
            An optional name to use as the key for caching the signals.
            If not provided, the `name` of `owner` will be used.
        only : Iterable[str], optional
            A list of signal names to cache. If not provided, all
            signals in the class will be cached automatically by inspecting
            the class attributes.

        Notes
        -----
        This method inspects the attributes of the owner's class to find
        [`psygnal.Signal`][psygnal.Signal] descriptors. For each such descriptor, it
        retrieves the [`psygnal.SignalInstance`][psygnal.SignalInstance] from the owner using
        the descriptor protocol and stores it in the registry.
        """
        owner_class = type(owner)
        if name is not None:
            cache_entry = name
        else:
            cache_entry = owner.name

        if only is None:
            only = [
                name
                for name in dir(owner_class)
                if isinstance(getattr(owner_class, name, None), Signal)
            ]

        batch: dict[str, SignalInstance] = {}
        for name in only:
            signal_descriptor = getattr(owner_class, name, None)
            if isinstance(signal_descriptor, Signal):
                signal_instance = getattr(owner, name)
                batch[name] = signal_instance
        if batch:
            self._signals.add_kwargs(**{cache_entry: batch})

    @staticmethod
    def _validate_callback(callback: object) -> CallbackType:
        """Validate that *callback* is an acceptable ``CallbackType``.

        Parameters
        ----------
        callback :
            The object to validate.

        Returns
        -------
        CallbackType
            The validated callback, unchanged.

        Raises
        ------
        TypeError
            If *callback* is not callable, or if it is a callable but
            its call signature is not compatible with ``(str, Document)``.
        """
        if isinstance(callback, DocumentRouter):
            return callback

        if not callable(callback):
            raise TypeError(
                f"{callback!r} is not callable. "
                "A callback must be a DocumentRouter subclass instance or a "
                "callable accepting (str, Document) arguments."
            )

        try:
            inspect.signature(callback.__call__).bind(None, None)
        except TypeError as e:
            raise TypeError(
                f"{callback!r} is callable but its signature is not compatible "
                "with the expected (str, Document) callback interface."
            ) from e

        return callback

    def register_callbacks(
        self,
        owner: HasName,
        name: str | None = None,
        callback_map: dict[str, CallbackType] | None = None,
    ) -> None:
        """Register one or more document callbacks in the virtual container.

        Accepts any object that is a valid ``CallbackType`` and exposes a
        ``name`` attribute used as the registry key.  Two forms are supported:

        * A [DocumentRouter][event_model.DocumentRouter] subclass instance;
        * Any other object that implements ``__call__(self, name, doc)`` with
          the correct two-parameter signature.

        When *callback_map* is provided the owner itself is not registered;
        instead each entry in the mapping is validated and registered
        independently under its own key, allowing a single owner to expose
        multiple callbacks.

        Parameters
        ----------
        owner : HasName
            The component registering callbacks.  Must expose a ``name``
            attribute.  When *callback_map* is ``None``, *owner* itself is
            registered as the callback.
        name : str | None
            Override for the registry key used when registering *owner*
            directly.  Ignored when *callback_map* is provided.
            Defaults to ``owner.name``.
        callback_map : dict[str, CallbackType] | None
            Optional mapping of registry key to callback object.  When
            supplied, each value is validated and registered under its
            corresponding key; *name* is ignored.

        Raises
        ------
        TypeError
            If a callback is not callable or its signature is incompatible
            with ``(str, Document)``.
        """
        if callback_map is not None:
            for key, callback in callback_map.items():
                self._callbacks.add_kwargs(**{key: self._validate_callback(callback)})
            return

        cache_entry = name if name is not None else owner.name
        self._callbacks.add_kwargs(**{cache_entry: self._validate_callback(owner)})

    @property
    def callbacks(self) -> dict[str, CallbackType]:
        """The currently registered document callbacks."""
        return self._callbacks()

    @property
    def signals(self) -> dict[str, SignalCache]:
        """The currently registered signals."""
        return self._signals()

schema_version property

schema_version: float

The plugin schema version specified in the configuration.

frontend property

frontend: str

The frontend toolkit identifier specified in the configuration.

session property

session: str

The session display name specified in the configuration.

metadata property

metadata: dict[str, object]

The session metadata specified in the configuration.

callbacks property

callbacks: dict[str, CallbackType]

The currently registered document callbacks.

signals property

signals: dict[str, SignalCache]

The currently registered signals.

register_signals

register_signals(
    owner: HasName,
    name: str | None = None,
    only: Iterable[str] | None = None,
) -> None

Register the signals of an object in the virtual container.

Parameters:

Name Type Description Default
owner HasName

The instance whose class's signals are to be cached. Must provide a name attribute.

required
name str | None

An optional name to use as the key for caching the signals. If not provided, the name of owner will be used.

None
only Iterable[str]

A list of signal names to cache. If not provided, all signals in the class will be cached automatically by inspecting the class attributes.

None
Notes

This method inspects the attributes of the owner's class to find psygnal.Signal descriptors. For each such descriptor, it retrieves the psygnal.SignalInstance from the owner using the descriptor protocol and stores it in the registry.

Source code in src/sunflare/virtual/_container.py
def register_signals(
    self, owner: HasName, name: str | None = None, only: Iterable[str] | None = None
) -> None:
    """Register the signals of an object in the virtual container.

    Parameters
    ----------
    owner : HasName
        The instance whose class's signals are to be cached.
        Must provide a `name` attribute.
    name : str | None
        An optional name to use as the key for caching the signals.
        If not provided, the `name` of `owner` will be used.
    only : Iterable[str], optional
        A list of signal names to cache. If not provided, all
        signals in the class will be cached automatically by inspecting
        the class attributes.

    Notes
    -----
    This method inspects the attributes of the owner's class to find
    [`psygnal.Signal`][psygnal.Signal] descriptors. For each such descriptor, it
    retrieves the [`psygnal.SignalInstance`][psygnal.SignalInstance] from the owner using
    the descriptor protocol and stores it in the registry.
    """
    owner_class = type(owner)
    if name is not None:
        cache_entry = name
    else:
        cache_entry = owner.name

    if only is None:
        only = [
            name
            for name in dir(owner_class)
            if isinstance(getattr(owner_class, name, None), Signal)
        ]

    batch: dict[str, SignalInstance] = {}
    for name in only:
        signal_descriptor = getattr(owner_class, name, None)
        if isinstance(signal_descriptor, Signal):
            signal_instance = getattr(owner, name)
            batch[name] = signal_instance
    if batch:
        self._signals.add_kwargs(**{cache_entry: batch})

register_callbacks

register_callbacks(
    owner: HasName,
    name: str | None = None,
    callback_map: dict[str, CallbackType] | None = None,
) -> None

Register one or more document callbacks in the virtual container.

Accepts any object that is a valid CallbackType and exposes a name attribute used as the registry key. Two forms are supported:

  • A [DocumentRouter][event_model.DocumentRouter] subclass instance;
  • Any other object that implements __call__(self, name, doc) with the correct two-parameter signature.

When callback_map is provided the owner itself is not registered; instead each entry in the mapping is validated and registered independently under its own key, allowing a single owner to expose multiple callbacks.

Parameters:

Name Type Description Default
owner HasName

The component registering callbacks. Must expose a name attribute. When callback_map is None, owner itself is registered as the callback.

required
name str | None

Override for the registry key used when registering owner directly. Ignored when callback_map is provided. Defaults to owner.name.

None
callback_map dict[str, CallbackType] | None

Optional mapping of registry key to callback object. When supplied, each value is validated and registered under its corresponding key; name is ignored.

None

Raises:

Type Description
TypeError

If a callback is not callable or its signature is incompatible with (str, Document).

Source code in src/sunflare/virtual/_container.py
def register_callbacks(
    self,
    owner: HasName,
    name: str | None = None,
    callback_map: dict[str, CallbackType] | None = None,
) -> None:
    """Register one or more document callbacks in the virtual container.

    Accepts any object that is a valid ``CallbackType`` and exposes a
    ``name`` attribute used as the registry key.  Two forms are supported:

    * A [DocumentRouter][event_model.DocumentRouter] subclass instance;
    * Any other object that implements ``__call__(self, name, doc)`` with
      the correct two-parameter signature.

    When *callback_map* is provided the owner itself is not registered;
    instead each entry in the mapping is validated and registered
    independently under its own key, allowing a single owner to expose
    multiple callbacks.

    Parameters
    ----------
    owner : HasName
        The component registering callbacks.  Must expose a ``name``
        attribute.  When *callback_map* is ``None``, *owner* itself is
        registered as the callback.
    name : str | None
        Override for the registry key used when registering *owner*
        directly.  Ignored when *callback_map* is provided.
        Defaults to ``owner.name``.
    callback_map : dict[str, CallbackType] | None
        Optional mapping of registry key to callback object.  When
        supplied, each value is validated and registered under its
        corresponding key; *name* is ignored.

    Raises
    ------
    TypeError
        If a callback is not callable or its signature is incompatible
        with ``(str, Document)``.
    """
    if callback_map is not None:
        for key, callback in callback_map.items():
            self._callbacks.add_kwargs(**{key: self._validate_callback(callback)})
        return

    cache_entry = name if name is not None else owner.name
    self._callbacks.add_kwargs(**{cache_entry: self._validate_callback(owner)})

HasShutdown

Bases: Protocol

Protocol marking your class as capable of shutting down.

Methods:

Name Description
shutdown

Shutdown an object. Performs cleanup operations.

Source code in src/sunflare/virtual/_protocols.py
@runtime_checkable
class HasShutdown(Protocol):  # pragma: no cover
    """Protocol marking your class as capable of shutting down."""

    @abstractmethod
    def shutdown(self) -> None:
        """Shutdown an object. Performs cleanup operations."""
        ...

shutdown abstractmethod

shutdown() -> None

Shutdown an object. Performs cleanup operations.

Source code in src/sunflare/virtual/_protocols.py
@abstractmethod
def shutdown(self) -> None:
    """Shutdown an object. Performs cleanup operations."""
    ...

IsInjectable

Bases: Protocol

Protocol marking a class as injectable with dependencies from the container.

Methods:

Name Description
inject_dependencies

Inject dependencies from the container.

Source code in src/sunflare/virtual/_protocols.py
@runtime_checkable
class IsInjectable(Protocol):  # pragma: no cover
    """Protocol marking a class as injectable with dependencies from the container."""

    @abstractmethod
    def inject_dependencies(self, container: VirtualContainer) -> None:
        """Inject dependencies from the container."""
        ...

inject_dependencies abstractmethod

inject_dependencies(container: VirtualContainer) -> None

Inject dependencies from the container.

Source code in src/sunflare/virtual/_protocols.py
@abstractmethod
def inject_dependencies(self, container: VirtualContainer) -> None:
    """Inject dependencies from the container."""
    ...

IsProvider

Bases: Protocol

Protocol marking a class as a provider of dependencies.

Methods:

Name Description
register_providers

Register providers in the virtual container.

Source code in src/sunflare/virtual/_protocols.py
@runtime_checkable
class IsProvider(Protocol):  # pragma: no cover
    """Protocol marking a class as a provider of dependencies."""

    @abstractmethod
    def register_providers(self, container: VirtualContainer) -> None:
        """Register providers in the virtual container."""
        ...

register_providers abstractmethod

register_providers(container: VirtualContainer) -> None

Register providers in the virtual container.

Source code in src/sunflare/virtual/_protocols.py
@abstractmethod
def register_providers(self, container: VirtualContainer) -> None:
    """Register providers in the virtual container."""
    ...

Modules:

Name Description
qt

Classes:

Name Description
PView

Minimal protocol a view component should implement.

View

Base view class.

ViewPosition

Supported view positions.

PView

Bases: Protocol

Minimal protocol a view component should implement.

Attributes:

Name Type Description
name str

Identity key of the view.

Notes

Access to the virtual container is optional and should be acquired by implementing :class:~sunflare.virtual.IsInjectable.

Source code in src/sunflare/view/_base.py
@runtime_checkable
class PView(Protocol):
    """Minimal protocol a view component should implement.

    Attributes
    ----------
    name : str
        Identity key of the view.

    Notes
    -----
    Access to the virtual container is optional and should be acquired
    by implementing :class:`~sunflare.virtual.IsInjectable`.
    """

    name: str

    @property
    @abstractmethod
    def view_position(self) -> ViewPosition:
        """Position of the view component in the main view of the UI."""

view_position abstractmethod property

view_position: ViewPosition

Position of the view component in the main view of the UI.

View

Bases: PView, ABC

Base view class.

Parameters:

Name Type Description Default
name str

Identity key of the view. Passed as positional-only argument.

required
kwargs ``Any``

Additional keyword arguments for view subclasses.

{}

Attributes:

Name Type Description
view_position ViewPosition

Position of the view component in the main view of the UI.

Source code in src/sunflare/view/_base.py
class View(PView, ABC):
    """Base view class.

    Parameters
    ----------
    name : str
        Identity key of the view. Passed as positional-only argument.
    kwargs : ``Any``, optional
        Additional keyword arguments for view subclasses.
    """

    @abstractmethod
    def __init__(
        self,
        name: str,
        /,
        **kwargs: Any,
    ) -> None:
        self.name = name
        super().__init__(**kwargs)

    @property
    @abstractmethod
    def view_position(self) -> ViewPosition:
        """Position of the view component in the main view of the UI."""

view_position abstractmethod property

view_position: ViewPosition

Position of the view component in the main view of the UI.

ViewPosition

Bases: str, Enum

Supported view positions.

Used to define the position of a view component in the main view of the UI.

Warning

These values are based on how Qt manages dock widgets. They may change in the future.

Attributes:

Name Type Description
CENTER str

Center view position.

LEFT str

Left view position.

RIGHT str

Right view position.

TOP str

Top view position.

BOTTOM str

Bottom view position.

Source code in src/sunflare/view/__init__.py
@unique
class ViewPosition(str, Enum):
    """Supported view positions.

    Used to define the position of a view component in the main view of the UI.

    !!! warning
        These values are based on how Qt manages dock widgets.
        They may change in the future.

    Attributes
    ----------
    CENTER : str
        Center view position.
    LEFT : str
        Left view position.
    RIGHT : str
        Right view position.
    TOP : str
        Top view position.
    BOTTOM : str
        Bottom view position.
    """

    CENTER = "center"
    LEFT = "left"
    RIGHT = "right"
    TOP = "top"
    BOTTOM = "bottom"

Classes:

Name Description
Loggable

Mixin class that adds a logger to a class instance with extra contextual information.

Loggable

Mixin class that adds a logger to a class instance with extra contextual information.

Attributes:

Name Type Description
logger _LoggerAdapter

Logger instance with contextual information.

Source code in src/sunflare/log.py
class Loggable:
    """Mixin class that adds a logger to a class instance with extra contextual information."""

    @cached_property
    def logger(self) -> _LoggerAdapter:
        """Logger instance with contextual information."""
        return ContextualAdapter(logging.getLogger("redsun"), self)

logger cached property

logger: _LoggerAdapter

Logger instance with contextual information.


Classes:

Name Description
Status

Track the status of a potentially-lengthy action like moving or triggering.

RunEngine

The Run Engine execute messages and emits Documents.

Status

Bases: Status

Track the status of a potentially-lengthy action like moving or triggering.

Parameters:

Name Type Description Default
timeout float | None

The amount of time to wait before marking the Status as failed. If None (default) wait forever. It is strongly encouraged to set a finite timeout. If settle_time below is set, that time is added to the effective timeout.

None
settle_time float | None

The amount of time to wait between the caller specifying that the status has completed to running callbacks. Default is 0.

0
Notes

Theory of operation:

This employs two threading.Event objects, one thread that runs for (timeout + settle_time) seconds, and one thread that runs for settle_time seconds (if settle_time is nonzero).

At init time, a timeout and settle_time are specified. A thread is started, on which user callbacks, registered after init time via add_callback, will eventually be run. The thread waits on an Event be set or (timeout + settle_time) seconds to pass, whichever happens first.

If (timeout + settle_time) expires and the Event has not been set, an internal Exception is set to StatusTimeoutError, and a second Event is set, marking the Status as done and failed. The callbacks are run.

If a callback is registered after the Status is done, it will be run immediately.

If the first Event is set before (timeout + settle_time) expires, then the second Event is set and no internal Exception is set, marking the Status as done and successful. The callbacks are run.

There are two methods that directly set the first Event. One, set_exception, sets it directly after setting the internal Exception. The other, set_finished, starts a threading.Timer that will set it after a delay (the settle_time). One of these methods may be called, and at most once. If one is called twice or if both are called, InvalidState is raised. If they are called too late to prevent a StatusTimeoutError, they are ignored but one call is still allowed. Thus, an external callback, e.g. pyepics, may reports success or failure after the Status object has expired, but to no effect because the callbacks have already been called and the program has moved on.

Methods:

Name Description
set_exception

Mark as finished but failed with the given Exception.

set_finished

Mark as finished successfully.

exception

Return the exception raised by the action.

wait

Block until the action completes.

add_callback

Register a callback to be called once when the Status finishes.

Attributes:

Name Type Description
timeout float | None

The timeout for this action.

settle_time float

A delay between when set_finished is when the Status is done.

done bool

Boolean indicating whether associated operation has completed.

success bool

Boolean indicating whether associated operation has completed.

callbacks deque[Callable[[Status], None]]

Callbacks to be run when the status is marked as finished.

Source code in src/sunflare/engine/_status.py
class Status(PStatus):
    """
    Track the status of a potentially-lengthy action like moving or triggering.

    Parameters
    ----------
    timeout: float, optional
        The amount of time to wait before marking the Status as failed.  If
        ``None`` (default) wait forever. It is strongly encouraged to set a
        finite timeout.  If settle_time below is set, that time is added to the
        effective timeout.
    settle_time: float, optional
        The amount of time to wait between the caller specifying that the
        status has completed to running callbacks. Default is 0.

    Notes
    -----
    Theory of operation:

    This employs two ``threading.Event`` objects, one thread that runs for
    (timeout + settle_time) seconds, and one thread that runs for
    settle_time seconds (if settle_time is nonzero).

    At __init__ time, a *timeout* and *settle_time* are specified. A thread
    is started, on which user callbacks, registered after __init__ time via
    [`add_callback`](), will eventually be run. The thread waits on an
    Event be set or (timeout + settle_time) seconds to pass, whichever
    happens first.

    If (timeout + settle_time) expires and the Event has not
    been set, an internal Exception is set to ``StatusTimeoutError``, and a
    second Event is set, marking the Status as done and failed. The
    callbacks are run.

    If a callback is registered after the Status is done, it will be run
    immediately.

    If the first Event is set before (timeout + settle_time) expires,
    then the second Event is set and no internal Exception is set, marking
    the Status as done and successful. The callbacks are run.

    There are two methods that directly set the first Event. One,
    [`set_exception`](), sets it directly after setting the internal
    Exception.  The other, [`set_finished`](), starts a
    ``threading.Timer`` that will set it after a delay (the settle_time).
    One of these methods may be called, and at most once. If one is called
    twice or if both are called, ``InvalidState`` is raised. If they are
    called too late to prevent a ``StatusTimeoutError``, they are ignored
    but one call is still allowed. Thus, an external callback, e.g. pyepics,
    may reports success or failure after the Status object has expired, but
    to no effect because the callbacks have already been called and the
    program has moved on.
    """

    def __init__(self, *, timeout: float | None = None, settle_time: float | None = 0):
        super().__init__()
        self._logger = logging.getLogger("redsun")
        self._tname = None
        self._lock = threading.RLock()
        self._event = threading.Event()  # state associated with done-ness
        self._settled_event = threading.Event()
        # "Externally initiated" means set_finished() or set_exception(exc) was
        # called, as opposed to completion via an internal timeout.
        self._externally_initiated_completion_lock = threading.Lock()
        self._externally_initiated_completion = False
        self._callbacks: deque[Callable[[Status], None]] = deque()
        self._exception: BaseException | None = None

        if settle_time is None:
            settle_time = 0.0

        self._settle_time = float(settle_time)

        if timeout is not None:
            timeout = float(timeout)
        self._timeout = timeout

        self._callback_thread = threading.Thread(
            target=self._run_callbacks, daemon=True, name=self._tname
        )
        self._callback_thread.start()

    @property
    def timeout(self) -> float | None:
        """The timeout for this action.

        This is set when the Status is created, and it cannot be changed.
        """
        return self._timeout

    @property
    def settle_time(self) -> float:
        """A delay between when [`set_finished`]() is when the Status is done.

        This is set when the Status is created, and it cannot be changed.
        """
        return self._settle_time

    @property
    def done(self) -> bool:
        """Boolean indicating whether associated operation has completed.

        This is set to True at __init__ time or by calling
        [`set_finished`](), [`set_exception`]().
        Once True, it can never become False.
        """
        return self._event.is_set()

    @property
    def success(self) -> bool:
        """Boolean indicating whether associated operation has completed.

        This is set to True at __init__ time or by calling
        [`set_finished`](), [`set_exception`]()
        . Once True, it can never become False.
        """
        return self.done and self._exception is None

    def _handle_failure(self) -> None:
        """Do something if an exception occurred during the action."""
        # TODO: implement this; maybe ophyd has a good example
        pass

    def _settled(self) -> None:
        """Connect to this this when status has completed and settled."""
        # TODO: implement this; maybe ophyd has a good example
        pass

    def _run_callbacks(self) -> None:
        """Set the Event and run the callbacks."""
        if self.timeout is None:
            timeout = None
        else:
            timeout = self.timeout + self.settle_time
        if not self._settled_event.wait(timeout):
            # We have timed out. It's possible that set_finished() has already
            # been called but we got here before the settle_time timer expired.
            # And it's possible that in this space be between the above
            # statement timing out grabbing the lock just below,
            # set_exception(exc) has been called. Both of these possibilties
            # are accounted for.
            self._logger.warning("%r has timed out", self)
            with self._externally_initiated_completion_lock:
                # Set the exception and mark the Status as done, unless
                # set_exception(exc) was called externally before we grabbed
                # the lock.
                if self._exception is None:
                    exc = StatusTimeoutError(
                        f"Status {self!r} failed to complete in specified timeout."
                    )
                    self._exception = exc
        # Mark this as "settled".
        try:
            self._settled()
        except Exception:
            # No alternative but to log this. We can't supersede set_exception,
            # and we have to continue and run the callbacks.
            self._logger.exception("%r encountered error during _settled()", self)
        # Now we know whether or not we have succeed or failed, either by
        # timeout above or by set_exception(exc), so we can set the Event that
        # will mark this Status as done.
        with self._lock:
            self._event.set()
        if self._exception is not None:
            try:
                self._handle_failure()
            except Exception:
                self._logger.exception(
                    "%r encountered an error during _handle_failure()", self
                )
        # The callbacks have access to self, from which they can distinguish
        # success or failure.
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                self._logger.exception(
                    "An error was raised on a background thread while "
                    "running the callback %r(%r).",
                    cb,
                    self,
                )
        self._callbacks.clear()

    def set_exception(self, exc: BaseException) -> None:
        """Mark as finished but failed with the given Exception.

        This method should generally not be called by the *recipient* of this
        Status object, but only by the object that created and returned it.

        Parameters
        ----------
        exc: BaseException
            The exception that caused the failure.
        """
        # Since we rely on this being raise-able later, check proactively to
        # avoid potentially very confusing failures.
        if not (isinstance(exc, BaseException)):
            # Note that Python allows `raise Exception` or raise Exception()`
            # so we allow a class or an instance here too.
            raise ValueError(f"Expected an Exception, got {exc!r}")

        # Ban certain Timeout subclasses that have special significance. This
        # would probably never come up except due to some rare user error, but
        # if it did it could be very confusing indeed!
        for exc_class in (StatusTimeoutError, WaitTimeoutError):
            if isinstance(exc, exc_class):
                raise ValueError(
                    f"{exc_class} has special significance and cannot be set "
                    "as the exception. Use a plain TimeoutError or some other "
                    "subclass thereof."
                )

        with self._externally_initiated_completion_lock:
            if self._externally_initiated_completion:
                raise InvalidState(
                    "Either set_finished() or set_exception() has "
                    f"already been called on {self!r}"
                )
            self._externally_initiated_completion = True
            if isinstance(self._exception, StatusTimeoutError):
                # We have already timed out.
                return
            self._exception = exc
            self._settled_event.set()

    def set_finished(self) -> None:
        """Mark as finished successfully.

        This method should generally not be called by the *recipient* of this
        Status object, but only by the object that created and returned it.
        """
        with self._externally_initiated_completion_lock:
            if self._externally_initiated_completion:
                raise InvalidState(
                    "Either set_finished() or set_exception() has "
                    f"already been called on {self!r}"
                )
            self._externally_initiated_completion = True
        # Note that in either case, the callbacks themselves are run from the
        # same thread. This just sets an Event, either from this thread (the
        # one calling set_finished) or the thread created below.
        if self.settle_time > 0:
            threading.Timer(self.settle_time, self._settled_event.set).start()
        else:
            self._settled_event.set()

    def exception(self, timeout: float | None = None) -> BaseException | None:
        """Return the exception raised by the action.

        If the action has completed successfully, return ``None``. If it has
        finished in error, return the exception.

        Parameters
        ----------
        timeout: float | None, optional
            If None (default) wait indefinitely until the status finishes.

        Returns
        -------
        Exception:
            The exception raised by the action. If the action has completed
            successfully, return ``None``.

        Raises
        ------
        WaitTimeoutError
            If the status has not completed within ``timeout`` (starting from
            when this method was called, not from the beginning of the action).
        """
        if not self._event.wait(timeout=timeout):
            raise WaitTimeoutError("Status has not completed yet.")
        return self._exception

    def wait(self, timeout: float | None = None) -> None:
        """Block until the action completes.

        When the action has finished succesfully, return ``None``. If the
        action has failed, raise the exception.

        Parameters
        ----------
        timeout: ``float``, optional
            If ``None`` (default) wait indefinitely until the status finishes.

        Raises
        ------
        WaitTimeoutError
            If the status has not completed within ``timeout`` (starting from
            when this method was called, not from the beginning of the action).
        StatusTimeoutError
            If the status has failed because the *timeout* that it was
            initialized with has expired.
        Exception
            This is ``status.exception()``, raised if the status has finished
            with an error.  This may include ``TimeoutError``, which
            indicates that the action itself raised ``TimeoutError``, distinct
            from ``WaitTimeoutError`` above.
        """
        if not self._event.wait(timeout=timeout):
            raise WaitTimeoutError("Status has not completed yet.")
        if self._exception is not None:
            raise self._exception

    @property
    def callbacks(self) -> deque[Callable[[Status], None]]:
        """Callbacks to be run when the status is marked as finished."""
        return self._callbacks

    def add_callback(self, callback: Callable[[Status], None]) -> None:
        """Register a callback to be called once when the Status finishes.

        The callback will be called exactly once. If the Status is finished
        before a callback is added, it will be called immediately. This is
        threadsafe.
        The callback will be called regardless of success of failure. The
        callback has access to this status object, so it can distinguish success
        or failure by inspecting the object.

        Parameters
        ----------
        callback: ``Callable[[Status], None]``
            The callback to be called when the status is marked as finished.

            ```python
            def callback(status: Status) -> None:
                # Do something with the status object
                ...
            ```
        """
        with self._lock:
            if self.done:
                # Call it once and do not hold a reference to it.
                callback(self)
            else:
                # Hold a strong reference to this. In other contexts we tend to
                # hold weak references to callbacks, but this is a single-shot
                # callback, so we will hold a strong reference until we call it,
                # and then clear this cache to drop the reference(s).
                self._callbacks.append(callback)

timeout property

timeout: float | None

The timeout for this action.

This is set when the Status is created, and it cannot be changed.

settle_time property

settle_time: float

A delay between when set_finished is when the Status is done.

This is set when the Status is created, and it cannot be changed.

done property

done: bool

Boolean indicating whether associated operation has completed.

This is set to True at init time or by calling set_finished, set_exception. Once True, it can never become False.

success property

success: bool

Boolean indicating whether associated operation has completed.

This is set to True at init time or by calling set_finished, set_exception . Once True, it can never become False.

callbacks property

callbacks: deque[Callable[[Status], None]]

Callbacks to be run when the status is marked as finished.

set_exception

set_exception(exc: BaseException) -> None

Mark as finished but failed with the given Exception.

This method should generally not be called by the recipient of this Status object, but only by the object that created and returned it.

Parameters:

Name Type Description Default
exc BaseException

The exception that caused the failure.

required
Source code in src/sunflare/engine/_status.py
def set_exception(self, exc: BaseException) -> None:
    """Mark as finished but failed with the given Exception.

    This method should generally not be called by the *recipient* of this
    Status object, but only by the object that created and returned it.

    Parameters
    ----------
    exc: BaseException
        The exception that caused the failure.
    """
    # Since we rely on this being raise-able later, check proactively to
    # avoid potentially very confusing failures.
    if not (isinstance(exc, BaseException)):
        # Note that Python allows `raise Exception` or raise Exception()`
        # so we allow a class or an instance here too.
        raise ValueError(f"Expected an Exception, got {exc!r}")

    # Ban certain Timeout subclasses that have special significance. This
    # would probably never come up except due to some rare user error, but
    # if it did it could be very confusing indeed!
    for exc_class in (StatusTimeoutError, WaitTimeoutError):
        if isinstance(exc, exc_class):
            raise ValueError(
                f"{exc_class} has special significance and cannot be set "
                "as the exception. Use a plain TimeoutError or some other "
                "subclass thereof."
            )

    with self._externally_initiated_completion_lock:
        if self._externally_initiated_completion:
            raise InvalidState(
                "Either set_finished() or set_exception() has "
                f"already been called on {self!r}"
            )
        self._externally_initiated_completion = True
        if isinstance(self._exception, StatusTimeoutError):
            # We have already timed out.
            return
        self._exception = exc
        self._settled_event.set()

set_finished

set_finished() -> None

Mark as finished successfully.

This method should generally not be called by the recipient of this Status object, but only by the object that created and returned it.

Source code in src/sunflare/engine/_status.py
def set_finished(self) -> None:
    """Mark as finished successfully.

    This method should generally not be called by the *recipient* of this
    Status object, but only by the object that created and returned it.
    """
    with self._externally_initiated_completion_lock:
        if self._externally_initiated_completion:
            raise InvalidState(
                "Either set_finished() or set_exception() has "
                f"already been called on {self!r}"
            )
        self._externally_initiated_completion = True
    # Note that in either case, the callbacks themselves are run from the
    # same thread. This just sets an Event, either from this thread (the
    # one calling set_finished) or the thread created below.
    if self.settle_time > 0:
        threading.Timer(self.settle_time, self._settled_event.set).start()
    else:
        self._settled_event.set()

exception

exception(
    timeout: float | None = None,
) -> BaseException | None

Return the exception raised by the action.

If the action has completed successfully, return None. If it has finished in error, return the exception.

Parameters:

Name Type Description Default
timeout float | None

If None (default) wait indefinitely until the status finishes.

None

Returns:

Name Type Description
Exception BaseException | None

The exception raised by the action. If the action has completed successfully, return None.

Raises:

Type Description
WaitTimeoutError

If the status has not completed within timeout (starting from when this method was called, not from the beginning of the action).

Source code in src/sunflare/engine/_status.py
def exception(self, timeout: float | None = None) -> BaseException | None:
    """Return the exception raised by the action.

    If the action has completed successfully, return ``None``. If it has
    finished in error, return the exception.

    Parameters
    ----------
    timeout: float | None, optional
        If None (default) wait indefinitely until the status finishes.

    Returns
    -------
    Exception:
        The exception raised by the action. If the action has completed
        successfully, return ``None``.

    Raises
    ------
    WaitTimeoutError
        If the status has not completed within ``timeout`` (starting from
        when this method was called, not from the beginning of the action).
    """
    if not self._event.wait(timeout=timeout):
        raise WaitTimeoutError("Status has not completed yet.")
    return self._exception

wait

wait(timeout: float | None = None) -> None

Block until the action completes.

When the action has finished succesfully, return None. If the action has failed, raise the exception.

Parameters:

Name Type Description Default
timeout float | None

If None (default) wait indefinitely until the status finishes.

None

Raises:

Type Description
WaitTimeoutError

If the status has not completed within timeout (starting from when this method was called, not from the beginning of the action).

StatusTimeoutError

If the status has failed because the timeout that it was initialized with has expired.

Exception

This is status.exception(), raised if the status has finished with an error. This may include TimeoutError, which indicates that the action itself raised TimeoutError, distinct from WaitTimeoutError above.

Source code in src/sunflare/engine/_status.py
def wait(self, timeout: float | None = None) -> None:
    """Block until the action completes.

    When the action has finished succesfully, return ``None``. If the
    action has failed, raise the exception.

    Parameters
    ----------
    timeout: ``float``, optional
        If ``None`` (default) wait indefinitely until the status finishes.

    Raises
    ------
    WaitTimeoutError
        If the status has not completed within ``timeout`` (starting from
        when this method was called, not from the beginning of the action).
    StatusTimeoutError
        If the status has failed because the *timeout* that it was
        initialized with has expired.
    Exception
        This is ``status.exception()``, raised if the status has finished
        with an error.  This may include ``TimeoutError``, which
        indicates that the action itself raised ``TimeoutError``, distinct
        from ``WaitTimeoutError`` above.
    """
    if not self._event.wait(timeout=timeout):
        raise WaitTimeoutError("Status has not completed yet.")
    if self._exception is not None:
        raise self._exception

add_callback

add_callback(callback: Callable[[Status], None]) -> None

Register a callback to be called once when the Status finishes.

The callback will be called exactly once. If the Status is finished before a callback is added, it will be called immediately. This is threadsafe. The callback will be called regardless of success of failure. The callback has access to this status object, so it can distinguish success or failure by inspecting the object.

Parameters:

Name Type Description Default
callback Callable[[Status], None]

The callback to be called when the status is marked as finished.

def callback(status: Status) -> None:
    # Do something with the status object
    ...
required
Source code in src/sunflare/engine/_status.py
def add_callback(self, callback: Callable[[Status], None]) -> None:
    """Register a callback to be called once when the Status finishes.

    The callback will be called exactly once. If the Status is finished
    before a callback is added, it will be called immediately. This is
    threadsafe.
    The callback will be called regardless of success of failure. The
    callback has access to this status object, so it can distinguish success
    or failure by inspecting the object.

    Parameters
    ----------
    callback: ``Callable[[Status], None]``
        The callback to be called when the status is marked as finished.

        ```python
        def callback(status: Status) -> None:
            # Do something with the status object
            ...
        ```
    """
    with self._lock:
        if self.done:
            # Call it once and do not hold a reference to it.
            callback(self)
        else:
            # Hold a strong reference to this. In other contexts we tend to
            # hold weak references to callbacks, but this is a single-shot
            # callback, so we will hold a strong reference until we call it,
            # and then clear this cache to drop the reference(s).
            self._callbacks.append(callback)

RunEngine

Bases: RunEngine

The Run Engine execute messages and emits Documents.

This is a wrapper for the bluesky.run_engine.RunEngine class that allows execution without blocking the main thread. The main difference is that the __call__ method is executed in a separate thread, and it returns a concurrent.futures.Future object representing the result of the plan execution.

Parameters:

Name Type Description Default
md dict[str, Any]

The default is a standard Python dictionary, but fancier objects can be used to store long-term history and persist it between sessions. The standard configuration instantiates a Run Engine with historydict.HistoryDict, a simple interface to a sqlite file. Any object supporting __getitem__, __setitem__, and clear will work.

None
loop AbstractEventLoop | None

An asyncio event loop to be used for executing plans. If not provided, the RunEngine will create a new event loop using asyncio.new_event_loop(); e.g., asyncio.get_event_loop() or asyncio.new_event_loop()

None
preprocessors list

Generator functions that take in a plan (generator instance) and modify its messages on the way out. Suitable examples include the functions in the module bluesky.plans with names ending in 'wrapper'. Functions are composed in order: the preprocessors [f, g] are applied like f(g(plan)).

None
md_validator Callable[dict[str, Any], None]

a function that raises and prevents starting a run if it deems the metadata to be invalid or incomplete Function should raise if md is invalid. What that means is completely up to the user. The function's return value is ignored.

None
md_normalizer Callable[dict[str, Any], dict[str, Any]]

a function that, similar to md_validator, raises and prevents starting a run if it deems the metadata to be invalid or incomplete. If it succeeds, it returns the normalized/transformed version of the original metadata. Function should raise if md is invalid. What that means is completely up to the user. Expected return: normalized metadata

None
scan_id_source Callable[dict[str, Any], int | Awaitable[int]]

a (possibly async) function that will be used to calculate scan_id. Default is to increment scan_id by 1 each time. However you could pass in a customized function to get a scan_id from any source. Expected return: updated scan_id value

default_scan_id_source
call_returns_result bool

A flag that controls the return value of __call__. If True, the RunEngine will return a :class:RunEngineResult object that contains information about the plan that was run. If False, the RunEngine will return a tuple of uids. The potential return value is encapsulated in the returned Future object, accessible via future.result(). Defaults to True.

True

Attributes:

Name Type Description
md

Direct access to the dict-like persistent storage described above

record_interruptions

False by default. Set to True to generate an extra event stream that records any interruptions (pauses, suspensions).

state

{'idle', 'running', 'paused'}

suspenders

Read-only collection of bluesky.suspenders.SuspenderBase objects which can suspend and resume execution; see related methods.

preprocessors list

Generator functions that take in a plan (generator instance) and modify its messages on the way out. Suitable examples include the functions in the module bluesky.plans with names ending in 'wrapper'. Functions are composed in order: the preprocessors [f, g] are applied like f(g(plan)).

msg_hook

Callable that receives all messages before they are processed (useful for logging or other development purposes); expected signature is f(msg) where msg is a bluesky.Msg, a kind of namedtuple; default is None.

state_hook

Callable with signature f(new_state, old_state) that will be called whenever the RunEngine's state attribute is updated; default is None

waiting_hook

Callable with signature f(status_object) that will be called whenever the RunEngine is waiting for long-running commands (trigger, set, kickoff, complete) to complete. This hook is useful to incorporate a progress bar.

ignore_callback_exceptions

Boolean, False by default.

call_returns_result

Boolean, False by default. If False, RunEngine will return uuid list after running a plan. If True, RunEngine will return a RunEngineResult object that contains the plan result, error status, and uuid list.

loop asyncio event loop

e.g., asyncio.get_event_loop() or asyncio.new_event_loop()

max_depth

Maximum stack depth; set this to prevent users from calling the RunEngine inside a function (which can result in unexpected behavior and breaks introspection tools). Default is None. For built-in Python interpreter, set to 2. For IPython, set to 11 (tested on IPython 5.1.0; other versions may vary).

pause_msg str

The message printed when a run is interrupted. This message includes instructions of changing the state of the RunEngine. It is set to bluesky.run_engine.PAUSE_MSG by default and can be modified based on needs.

commands

The list of commands available to Msg.

Methods:

Name Description
resume

Resume the paused plan in a separate thread.

Source code in src/sunflare/engine/_wrapper.py
class RunEngine(BlueskyRunEngine):
    """The Run Engine execute messages and emits Documents.

    This is a wrapper for the `bluesky.run_engine.RunEngine` class that
    allows execution without blocking the main thread.
    The main difference is that the ``__call__`` method
    is executed in a separate thread,
    and it returns a concurrent.futures.Future object
    representing the result of the plan execution.

    Parameters
    ----------
    md : dict[str, Any], optional
        The default is a standard Python dictionary, but fancier
        objects can be used to store long-term history and persist
        it between sessions. The standard configuration
        instantiates a Run Engine with historydict.HistoryDict, a
        simple interface to a sqlite file. Any object supporting
        `__getitem__`, `__setitem__`, and `clear` will work.

    loop: asyncio.AbstractEventLoop, optional
        An asyncio event loop to be used for executing plans. If not provided,
        the RunEngine will create a new event loop using ``asyncio.new_event_loop()``;
        e.g., ``asyncio.get_event_loop()`` or ``asyncio.new_event_loop()``

    preprocessors : list, optional
        Generator functions that take in a plan (generator instance) and
        modify its messages on the way out. Suitable examples include
        the functions in the module ``bluesky.plans`` with names ending in
        'wrapper'.  Functions are composed in order: the preprocessors
        ``[f, g]`` are applied like ``f(g(plan))``.

    md_validator : Callable[dict[str, Any], None], optional
        a function that raises and prevents starting a run if it deems
        the metadata to be invalid or incomplete
        Function should raise if md is invalid. What that means is
        completely up to the user. The function's return value is
        ignored.

    md_normalizer : Callable[dict[str, Any], dict[str, Any]], optional
        a function that, similar to md_validator, raises and prevents starting
        a run if it deems the metadata to be invalid or incomplete.
        If it succeeds, it returns the normalized/transformed version of
        the original metadata.
        Function should raise if md is invalid. What that means is
        completely up to the user.
        Expected return: normalized metadata

    scan_id_source : Callable[dict[str, Any], int | Awaitable[int]], optional
        a (possibly async) function that will be used to calculate scan_id.
        Default is to increment scan_id by 1 each time. However you could pass
        in a customized function to get a scan_id from any source.
        Expected return: updated scan_id value

    call_returns_result : bool, default True
        A flag that controls the return value of ``__call__``.
        If ``True``, the ``RunEngine`` will return a :class:``RunEngineResult``
        object that contains information about the plan that was run.
        If ``False``, the ``RunEngine`` will return a tuple of uids.
        The potential return value is encapsulated in the returned Future object,
        accessible via ``future.result()``.
        Defaults to ``True``.


    Attributes
    ----------
    md
        Direct access to the dict-like persistent storage described above

    record_interruptions
        False by default. Set to True to generate an extra event stream
        that records any interruptions (pauses, suspensions).

    state
        {'idle', 'running', 'paused'}

    suspenders
        Read-only collection of `bluesky.suspenders.SuspenderBase` objects
        which can suspend and resume execution; see related methods.

    preprocessors : list
        Generator functions that take in a plan (generator instance) and
        modify its messages on the way out. Suitable examples include
        the functions in the module ``bluesky.plans`` with names ending in
        'wrapper'.  Functions are composed in order: the preprocessors
        ``[f, g]`` are applied like ``f(g(plan))``.

    msg_hook
        Callable that receives all messages before they are processed
        (useful for logging or other development purposes); expected
        signature is ``f(msg)`` where ``msg`` is a ``bluesky.Msg``, a
        kind of namedtuple; default is None.

    state_hook
        Callable with signature ``f(new_state, old_state)`` that will be
        called whenever the RunEngine's state attribute is updated; default
        is None

    waiting_hook
        Callable with signature ``f(status_object)`` that will be called
        whenever the RunEngine is waiting for long-running commands
        (trigger, set, kickoff, complete) to complete. This hook is useful to
        incorporate a progress bar.

    ignore_callback_exceptions
        Boolean, False by default.

    call_returns_result
        Boolean, False by default. If False, RunEngine will return uuid list
        after running a plan. If True, RunEngine will return a RunEngineResult
        object that contains the plan result, error status, and uuid list.

    loop : asyncio event loop
        e.g., ``asyncio.get_event_loop()`` or ``asyncio.new_event_loop()``

    max_depth
        Maximum stack depth; set this to prevent users from calling the
        RunEngine inside a function (which can result in unexpected
        behavior and breaks introspection tools). Default is None.
        For built-in Python interpreter, set to 2. For IPython, set to 11
        (tested on IPython 5.1.0; other versions may vary).

    pause_msg : str
        The message printed when a run is interrupted. This message
        includes instructions of changing the state of the RunEngine.
        It is set to ``bluesky.run_engine.PAUSE_MSG`` by default and
        can be modified based on needs.

    commands:
        The list of commands available to Msg.

    """

    def __init__(
        self,
        md: dict[str, Any] | None = None,
        *,
        loop: asyncio.AbstractEventLoop | None = None,
        preprocessors: list[Preprocessor] | None = None,
        md_validator: MDValidator | None = None,
        md_normalizer: MDNormalizer | None = None,
        scan_id_source: MDScanIDSource | None = default_scan_id_source,
        call_returns_result: bool = True,
    ):
        # force the context_managers to be empty,
        # otherwise the RunEngine will try to use the
        # SignalHandler context manager
        self._executor = ThreadPoolExecutor(max_workers=1)

        super().__init__(
            md=md,
            loop=loop,
            preprocessors=preprocessors,
            md_validator=md_validator,
            md_normalizer=md_normalizer,
            scan_id_source=scan_id_source,  # type: ignore[arg-type]
            call_returns_result=call_returns_result,
            context_managers=[],
        )

        # override pause message to be an empty string
        self.pause_msg = ""

    def __call__(  # type: ignore[override]
        self,
        plan: Iterable[Msg],
        subs: Subscribers | None = None,
        /,
        **metadata_kw: Any,
    ) -> Future[RunEngineResult | tuple[str, ...]]:
        """Execute a plan.

        Any keyword arguments will be interpreted as metadata and recorded with
        any run(s) created by executing the plan. Notice that the plan
        (required) and extra subscriptions (optional) must be given as
        positional arguments.

        Parameters
        ----------
        plan : typing.Iterable[`bluesky.utils.Msg`]
            A generator or that yields ``Msg`` objects (or an iterable that
            returns such a generator).
        subs : `bluesky.utils.Subscribers`, optional (positional only)
            Temporary subscriptions (a.k.a. callbacks) to be used on this run.
            For convenience, any of the following are accepted:

            * a callable, which will be subscribed to 'all'
            * a list of callables, which again will be subscribed to 'all'
            * a dictionary, mapping specific subscriptions to callables or
              lists of callables; valid keys are {'all', 'start', 'stop',
              'event', 'descriptor'}

        Returns
        -------
        Future[RunEngineResult | tuple[str, ...]]
            Future object representing the result of the plan execution.

        The result contained in the future is either:
        uids : tuple
            list of uids (i.e. RunStart Document uids) of run(s)
            if :attr:`RunEngine._call_returns_result` is ``False``
        result : :class:`RunEngineResult`
            if :attr:`RunEngine._call_returns_result` is ``True``
        """
        return self._executor.submit(
            super().__call__,
            plan,
            subs,
            **metadata_kw,
        )

    def resume(self) -> Future[RunEngineResult | tuple[str, ...]]:
        """Resume the paused plan in a separate thread.

        If the plan has been paused, the initial
        future returned by ``__call__`` will be set as completed.

        With this method, the plan is resumed in a separate thread,
        and a new future is returned.

        Returns
        -------
        ``Future[RunEngineResult | tuple[str, ...]]``
            Future object representing the result of the resumed plan.
        """
        return self._executor.submit(super().resume)

resume

resume() -> Future[RunEngineResult | tuple[str, ...]]

Resume the paused plan in a separate thread.

If the plan has been paused, the initial future returned by __call__ will be set as completed.

With this method, the plan is resumed in a separate thread, and a new future is returned.

Returns:

Type Description
``Future[RunEngineResult | tuple[str, ...]]``

Future object representing the result of the resumed plan.

Source code in src/sunflare/engine/_wrapper.py
def resume(self) -> Future[RunEngineResult | tuple[str, ...]]:
    """Resume the paused plan in a separate thread.

    If the plan has been paused, the initial
    future returned by ``__call__`` will be set as completed.

    With this method, the plan is resumed in a separate thread,
    and a new future is returned.

    Returns
    -------
    ``Future[RunEngineResult | tuple[str, ...]]``
        Future object representing the result of the resumed plan.
    """
    return self._executor.submit(super().resume)

Storage infrastructure for sunflare devices.

This subpackage provides the dependency-free primitives for storage:

Concrete backend classes (e.g. ZarrWriter) are internal implementation details and are not exported from this package. The application container is responsible for selecting and instantiating the correct backend based on the session configuration.

Devices that require storage declare it explicitly in their class body:

from sunflare.storage import StorageDescriptor


class MyDetector(Device):
    storage = StorageDescriptor()

Classes:

Name Description
FrameSink

Device-facing handle for pushing frames to a storage backend.

SourceInfo

Metadata for a registered data source.

Writer

Abstract base class for data writers.

AutoIncrementFilenameProvider

Returns a numerically incrementing filename on each call.

FilenameProvider

Callable that produces a filename (without extension) for a device.

PathInfo

Where and how a storage backend should write data for one device.

PathProvider

Callable that produces PathInfo for a device.

StaticFilenameProvider

Always returns the same filename.

StaticPathProvider

Provides PathInfo rooted at a fixed base URI.

UUIDFilenameProvider

Returns a fresh UUID4 string on every call.

HasStorage

Protocol for devices that have opted in to storage.

StorageDescriptor

Descriptor that manages the storage slot on a device.

StorageProxy

Protocol that devices use to interact with a storage backend.

FrameSink

Device-facing handle for pushing frames to a storage backend.

Returned by Writer.prepare. Devices write frames by calling write; the sink routes each frame to the correct array inside the shared Writer and updates the frame counter atomically.

Calling close is equivalent to calling Writer.complete for this source — it signals that no more frames will arrive and triggers backend finalisation once all active sinks have been closed.

Parameters:

Name Type Description Default
writer Writer

The shared writer that owns this sink.

required
name str

Source name this sink is bound to.

required

Methods:

Name Description
write

Push frame to the storage backend.

close

Signal that no more frames will be written from this sink.

Source code in src/sunflare/storage/_base.py
class FrameSink:
    """Device-facing handle for pushing frames to a storage backend.

    Returned by [`Writer.prepare`][sunflare.storage.Writer.prepare].
    Devices write frames by calling [`write`][sunflare.storage.FrameSink.write];
    the sink routes each frame to the correct array inside the shared
    [`Writer`][sunflare.storage.Writer] and updates the frame counter atomically.

    Calling [`close`][sunflare.storage.FrameSink.close] is equivalent to calling
    [`Writer.complete`][sunflare.storage.Writer.complete] for this source — it
    signals that no more frames will arrive and triggers backend finalisation
    once all active sinks have been closed.

    Parameters
    ----------
    writer : Writer
        The shared writer that owns this sink.
    name : str
        Source name this sink is bound to.
    """

    def __init__(self, writer: Writer, name: str) -> None:
        self._writer = writer
        self._name = name

    def write(self, frame: npt.NDArray[np.generic]) -> None:
        """Push *frame* to the storage backend.

        Thread-safe; multiple sinks may call `write` concurrently.

        Parameters
        ----------
        frame : npt.NDArray[np.generic]
            Array data to write.  dtype and shape must match the source
            registration from [`Writer.update_source`][sunflare.storage.Writer.update_source].
        """
        with self._writer._lock:
            self._writer._write_frame(self._name, frame)
            self._writer._sources[self._name].frames_written += 1

    def close(self) -> None:
        """Signal that no more frames will be written from this sink.

        Delegates to [`Writer.complete`][sunflare.storage.Writer.complete].
        The backend is finalised once all active sinks have called `close`.
        """
        self._writer.complete(self._name)

write

write(frame: NDArray[generic]) -> None

Push frame to the storage backend.

Thread-safe; multiple sinks may call write concurrently.

Parameters:

Name Type Description Default
frame NDArray[generic]

Array data to write. dtype and shape must match the source registration from Writer.update_source.

required
Source code in src/sunflare/storage/_base.py
def write(self, frame: npt.NDArray[np.generic]) -> None:
    """Push *frame* to the storage backend.

    Thread-safe; multiple sinks may call `write` concurrently.

    Parameters
    ----------
    frame : npt.NDArray[np.generic]
        Array data to write.  dtype and shape must match the source
        registration from [`Writer.update_source`][sunflare.storage.Writer.update_source].
    """
    with self._writer._lock:
        self._writer._write_frame(self._name, frame)
        self._writer._sources[self._name].frames_written += 1

close

close() -> None

Signal that no more frames will be written from this sink.

Delegates to Writer.complete. The backend is finalised once all active sinks have called close.

Source code in src/sunflare/storage/_base.py
def close(self) -> None:
    """Signal that no more frames will be written from this sink.

    Delegates to [`Writer.complete`][sunflare.storage.Writer.complete].
    The backend is finalised once all active sinks have called `close`.
    """
    self._writer.complete(self._name)

SourceInfo dataclass

Metadata for a registered data source.

Attributes:

Name Type Description

Parameters:

Name Type Description Default
name str

An enumeration.

required
dtype dtype[generic]

An enumeration.

required
shape tuple[int, ...]

An enumeration.

required
data_key str

An enumeration.

required
mimetype str

An enumeration.

'application/octet-stream'
frames_written int

An enumeration.

0
collection_counter int

An enumeration.

0
stream_resource_uid str
'df3cf11f-1711-461f-bc52-3f5d6d7b787f'
extra dict[str, Any]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>
Source code in src/sunflare/storage/_base.py
@dataclass
class SourceInfo:
    """Metadata for a registered data source.

    Attributes
    ----------
    name : str
        Name of the data source (e.g. the device name).
    dtype : np.dtype[np.generic]
        NumPy data type of the source frames.
    shape : tuple[int, ...]
        Shape of individual frames from the source.
    data_key : str
        Bluesky data key for stream documents.
    mimetype : str
        MIME type hint for the storage backend.
    frames_written : int
        Running count of frames written so far.
    collection_counter : int
        Frames reported in the current collection cycle.
    stream_resource_uid : str
        UID of the current `StreamResource` document.
    extra : dict[str, Any]
        Optional extra metadata for backend-specific use (e.g. OME-Zarr
        axis labels, physical units).  Base [`Writer`][sunflare.storage.Writer]
        ignores this field; specialised subclasses may read it.
    """

    name: str
    dtype: np.dtype[np.generic]
    shape: tuple[int, ...]
    data_key: str
    mimetype: str = "application/octet-stream"
    frames_written: int = 0
    collection_counter: int = 0
    stream_resource_uid: str = field(default_factory=lambda: str(uuid.uuid4()))
    extra: dict[str, Any] = field(default_factory=dict)

Writer

Bases: ABC, Loggable

Abstract base class for data writers.

This interface loosely follows the Bluesky Flyable protocol while remaining generic — methods do not need to return a Status object; that is left to the device that owns the writer.

A single Writer instance is shared by all devices in a session. Each device registers itself as a source via update_source and obtains a dedicated FrameSink via prepare.

Call order per acquisition:

  1. update_source(name, dtype, shape) — register the device
  2. prepare(name, capacity) — returns a FrameSink
  3. kickoff() — opens the backend
  4. sink.write(frame) — push frames (thread-safe)
  5. sink.close() — signals completion (calls complete)

Subclasses must implement:

  • mimetype — MIME type string for this backend
  • prepare — source-specific setup; must call super().prepare()
  • kickoff — open the backend; must call super().kickoff()
  • _write_frame — write one frame to the backend
  • _finalize — close the backend when all sources are complete

Parameters:

Name Type Description Default
name str

Unique name for this writer instance (used for logging).

required

Methods:

Name Description
update_source

Register or update a data source.

clear_source

Remove a registered data source.

get_indices_written

Return the number of frames written for a source.

reset_collection_state

Reset the collection counter for a new acquisition.

kickoff

Open the storage backend for a new acquisition.

prepare

Prepare storage for a specific source and return a frame sink.

complete

Mark acquisition complete for name.

collect_stream_docs

Yield StreamResource and StreamDatum documents for name.

Attributes:

Name Type Description
is_open bool

Return whether the writer is currently open.

name str

Return the name of this writer.

mimetype str

Return the MIME type string for this backend.

sources MappingProxyType[str, SourceInfo]

Return a read-only view of the registered data sources.

Source code in src/sunflare/storage/_base.py
class Writer(abc.ABC, Loggable):
    """Abstract base class for data writers.

    This interface loosely follows the Bluesky `Flyable` protocol while
    remaining generic — methods do not need to return a `Status` object;
    that is left to the device that owns the writer.

    A single `Writer` instance is shared by all devices in a session.
    Each device registers itself as a *source* via
    [`update_source`][sunflare.storage.Writer.update_source] and obtains
    a dedicated [`FrameSink`][sunflare.storage.FrameSink] via
    [`prepare`][sunflare.storage.Writer.prepare].

    Call order per acquisition:

    1. `update_source(name, dtype, shape)` — register the device
    2. `prepare(name, capacity)` — returns a [`FrameSink`][sunflare.storage.FrameSink]
    3. `kickoff()` — opens the backend
    4. `sink.write(frame)` — push frames (thread-safe)
    5. `sink.close()` — signals completion (calls [`complete`][sunflare.storage.Writer.complete])

    Subclasses must implement:

    - [`mimetype`][sunflare.storage.Writer.mimetype] — MIME type string for this backend
    - [`prepare`][sunflare.storage.Writer.prepare] — source-specific setup; must call `super().prepare()`
    - [`kickoff`][sunflare.storage.Writer.kickoff] — open the backend; must call `super().kickoff()`
    - `_write_frame` — write one frame to the backend
    - `_finalize` — close the backend when all sources are complete

    Parameters
    ----------
    name : str
        Unique name for this writer instance (used for logging).
    """

    def __init__(self, name: str) -> None:
        self._name = name
        self._store_path = ""
        self._lock = th.Lock()
        self._is_open = False
        self._sources: dict[str, SourceInfo] = {}
        self._active_sinks: set[str] = set()

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def is_open(self) -> bool:
        """Return whether the writer is currently open."""
        return self._is_open

    @property
    def name(self) -> str:
        """Return the name of this writer."""
        return self._name

    @property
    @abc.abstractmethod
    def mimetype(self) -> str:
        """Return the MIME type string for this backend."""
        ...

    @property
    def sources(self) -> MappingProxyType[str, SourceInfo]:
        """Return a read-only view of the registered data sources."""
        return MappingProxyType(self._sources)

    # ------------------------------------------------------------------
    # Source management
    # ------------------------------------------------------------------

    def update_source(
        self,
        name: str,
        dtype: np.dtype[np.generic],
        shape: tuple[int, ...],
        extra: dict[str, Any] | None = None,
    ) -> None:
        """Register or update a data source.

        Parameters
        ----------
        name : str
            Source name (typically the device name).
        dtype : np.dtype[np.generic]
            NumPy data type of the frames.
        shape : tuple[int, ...]
            Shape of individual frames.
        extra : dict[str, Any] | None
            Optional backend-specific metadata forwarded to
            [`SourceInfo`][sunflare.storage.SourceInfo].

        Raises
        ------
        RuntimeError
            If the writer is currently open.
        """
        if self._is_open:
            raise RuntimeError("Cannot update sources while writer is open.")

        data_key = f"{name}:buffer:stream"
        self._sources[name] = SourceInfo(
            name=name,
            dtype=dtype,
            shape=shape,
            data_key=data_key,
            mimetype=self.mimetype,
            extra=extra or {},
        )
        self.logger.debug(f"Updated source '{name}' with shape {shape}")

    def clear_source(self, name: str, *, raise_if_missing: bool = False) -> None:
        """Remove a registered data source.

        Parameters
        ----------
        name : str
            Source name to remove.
        raise_if_missing : bool
            If `True`, raise `KeyError` when the source is absent.

        Raises
        ------
        RuntimeError
            If the writer is currently open.
        KeyError
            If *raise_if_missing* is `True` and the source is absent.
        """
        if self._is_open:
            raise RuntimeError("Cannot clear sources while writer is open.")

        try:
            del self._sources[name]
            self.logger.debug(f"Cleared source '{name}'")
        except KeyError as exc:
            self.logger.error(f"Source '{name}' not found.")
            if raise_if_missing:
                raise exc

    def get_indices_written(self, name: str | None = None) -> int:
        """Return the number of frames written for a source.

        Parameters
        ----------
        name : str | None
            Source name.  If `None`, returns the minimum across all
            sources (useful for synchronisation checks).

        Raises
        ------
        KeyError
            If *name* is not registered.
        """
        if name is None:
            if not self._sources:
                return 0
            return min(s.frames_written for s in self._sources.values())

        if name not in self._sources:
            raise KeyError(f"Unknown source '{name}'")
        return self._sources[name].frames_written

    def reset_collection_state(self, name: str) -> None:
        """Reset the collection counter for a new acquisition.

        Parameters
        ----------
        name : str
            Source name to reset.
        """
        source = self._sources[name]
        source.collection_counter = 0
        source.stream_resource_uid = str(uuid.uuid4())

    # ------------------------------------------------------------------
    # Acquisition lifecycle
    # ------------------------------------------------------------------

    @abc.abstractmethod
    def kickoff(self) -> None:
        """Open the storage backend for a new acquisition.

        Subclasses must call `super().kickoff()` to set
        [`is_open`][sunflare.storage.Writer.is_open].
        Subsequent calls while already open must be no-ops.
        """
        if not self._is_open:
            self._is_open = True

    @abc.abstractmethod
    def prepare(self, name: str, capacity: int = 0) -> FrameSink:
        """Prepare storage for a specific source and return a frame sink.

        Called once per device per acquisition.  Resets per-source counters
        and returns a [`FrameSink`][sunflare.storage.FrameSink] bound to *name*.

        Parameters
        ----------
        name : str
            Source name.
        capacity : int
            Maximum frames to accept (`0` = unlimited).

        Returns
        -------
        FrameSink
            Bound sink; call `sink.write(frame)` to push frames.

        Raises
        ------
        KeyError
            If *name* has not been registered via
            [`update_source`][sunflare.storage.Writer.update_source].
        """
        source = self._sources[name]
        source.frames_written = 0
        source.collection_counter = 0
        source.stream_resource_uid = str(uuid.uuid4())
        self._active_sinks.add(name)
        return FrameSink(self, name)

    def complete(self, name: str) -> None:
        """Mark acquisition complete for *name*.

        Called automatically by [`FrameSink.close`][sunflare.storage.FrameSink.close].
        The backend is finalised once all active sinks have called `close`.

        Parameters
        ----------
        name : str
            Source name.
        """
        self._active_sinks.discard(name)
        if not self._active_sinks:
            self._finalize()
            self._is_open = False

    # ------------------------------------------------------------------
    # Backend hooks (subclass responsibility)
    # ------------------------------------------------------------------

    @abc.abstractmethod
    def _write_frame(self, name: str, frame: npt.NDArray[np.generic]) -> None:
        """Write one frame to the backend.

        Called by [`FrameSink.write`][sunflare.storage.FrameSink.write]
        under the writer lock.

        Parameters
        ----------
        name : str
            Source name.
        frame : npt.NDArray[np.generic]
            Frame data to write.
        """
        ...

    @abc.abstractmethod
    def _finalize(self) -> None:
        """Close the backend after all sinks have been closed."""
        ...

    # ------------------------------------------------------------------
    # Stream document generation
    # ------------------------------------------------------------------

    def collect_stream_docs(
        self,
        name: str,
        indices_written: int,
    ) -> Iterator[StreamAsset]:
        """Yield `StreamResource` and `StreamDatum` documents for *name*.

        Parameters
        ----------
        name : str
            Source name.
        indices_written : int
            Number of frames to report in this call.

        Yields
        ------
        StreamAsset
            Tuples of `("stream_resource", doc)` or `("stream_datum", doc)`.

        Raises
        ------
        KeyError
            If *name* is not registered.
        """
        if name not in self._sources:
            raise KeyError(f"Unknown source '{name}'")

        source = self._sources[name]

        if indices_written == 0:
            return

        frames_to_report = min(indices_written, source.frames_written)

        if source.collection_counter >= frames_to_report:
            return

        if source.collection_counter == 0:
            stream_resource: StreamResource = {
                "data_key": source.data_key,
                "mimetype": source.mimetype,
                "parameters": {"array_name": source.name},
                "uid": source.stream_resource_uid,
                "uri": self._store_path,
            }
            yield ("stream_resource", stream_resource)

        stream_datum: StreamDatum = {
            "descriptor": "",
            "indices": {"start": source.collection_counter, "stop": frames_to_report},
            "seq_nums": {"start": 0, "stop": 0},
            "stream_resource": source.stream_resource_uid,
            "uid": f"{source.stream_resource_uid}/{source.collection_counter}",
        }
        yield ("stream_datum", stream_datum)

        source.collection_counter = frames_to_report

is_open property

is_open: bool

Return whether the writer is currently open.

name property

name: str

Return the name of this writer.

mimetype abstractmethod property

mimetype: str

Return the MIME type string for this backend.

sources property

sources: MappingProxyType[str, SourceInfo]

Return a read-only view of the registered data sources.

update_source

update_source(
    name: str,
    dtype: dtype[generic],
    shape: tuple[int, ...],
    extra: dict[str, Any] | None = None,
) -> None

Register or update a data source.

Parameters:

Name Type Description Default
name str

Source name (typically the device name).

required
dtype dtype[generic]

NumPy data type of the frames.

required
shape tuple[int, ...]

Shape of individual frames.

required
extra dict[str, Any] | None

Optional backend-specific metadata forwarded to SourceInfo.

None

Raises:

Type Description
RuntimeError

If the writer is currently open.

Source code in src/sunflare/storage/_base.py
def update_source(
    self,
    name: str,
    dtype: np.dtype[np.generic],
    shape: tuple[int, ...],
    extra: dict[str, Any] | None = None,
) -> None:
    """Register or update a data source.

    Parameters
    ----------
    name : str
        Source name (typically the device name).
    dtype : np.dtype[np.generic]
        NumPy data type of the frames.
    shape : tuple[int, ...]
        Shape of individual frames.
    extra : dict[str, Any] | None
        Optional backend-specific metadata forwarded to
        [`SourceInfo`][sunflare.storage.SourceInfo].

    Raises
    ------
    RuntimeError
        If the writer is currently open.
    """
    if self._is_open:
        raise RuntimeError("Cannot update sources while writer is open.")

    data_key = f"{name}:buffer:stream"
    self._sources[name] = SourceInfo(
        name=name,
        dtype=dtype,
        shape=shape,
        data_key=data_key,
        mimetype=self.mimetype,
        extra=extra or {},
    )
    self.logger.debug(f"Updated source '{name}' with shape {shape}")

clear_source

clear_source(
    name: str, *, raise_if_missing: bool = False
) -> None

Remove a registered data source.

Parameters:

Name Type Description Default
name str

Source name to remove.

required
raise_if_missing bool

If True, raise KeyError when the source is absent.

False

Raises:

Type Description
RuntimeError

If the writer is currently open.

KeyError

If raise_if_missing is True and the source is absent.

Source code in src/sunflare/storage/_base.py
def clear_source(self, name: str, *, raise_if_missing: bool = False) -> None:
    """Remove a registered data source.

    Parameters
    ----------
    name : str
        Source name to remove.
    raise_if_missing : bool
        If `True`, raise `KeyError` when the source is absent.

    Raises
    ------
    RuntimeError
        If the writer is currently open.
    KeyError
        If *raise_if_missing* is `True` and the source is absent.
    """
    if self._is_open:
        raise RuntimeError("Cannot clear sources while writer is open.")

    try:
        del self._sources[name]
        self.logger.debug(f"Cleared source '{name}'")
    except KeyError as exc:
        self.logger.error(f"Source '{name}' not found.")
        if raise_if_missing:
            raise exc

get_indices_written

get_indices_written(name: str | None = None) -> int

Return the number of frames written for a source.

Parameters:

Name Type Description Default
name str | None

Source name. If None, returns the minimum across all sources (useful for synchronisation checks).

None

Raises:

Type Description
KeyError

If name is not registered.

Source code in src/sunflare/storage/_base.py
def get_indices_written(self, name: str | None = None) -> int:
    """Return the number of frames written for a source.

    Parameters
    ----------
    name : str | None
        Source name.  If `None`, returns the minimum across all
        sources (useful for synchronisation checks).

    Raises
    ------
    KeyError
        If *name* is not registered.
    """
    if name is None:
        if not self._sources:
            return 0
        return min(s.frames_written for s in self._sources.values())

    if name not in self._sources:
        raise KeyError(f"Unknown source '{name}'")
    return self._sources[name].frames_written

reset_collection_state

reset_collection_state(name: str) -> None

Reset the collection counter for a new acquisition.

Parameters:

Name Type Description Default
name str

Source name to reset.

required
Source code in src/sunflare/storage/_base.py
def reset_collection_state(self, name: str) -> None:
    """Reset the collection counter for a new acquisition.

    Parameters
    ----------
    name : str
        Source name to reset.
    """
    source = self._sources[name]
    source.collection_counter = 0
    source.stream_resource_uid = str(uuid.uuid4())

kickoff abstractmethod

kickoff() -> None

Open the storage backend for a new acquisition.

Subclasses must call super().kickoff() to set is_open. Subsequent calls while already open must be no-ops.

Source code in src/sunflare/storage/_base.py
@abc.abstractmethod
def kickoff(self) -> None:
    """Open the storage backend for a new acquisition.

    Subclasses must call `super().kickoff()` to set
    [`is_open`][sunflare.storage.Writer.is_open].
    Subsequent calls while already open must be no-ops.
    """
    if not self._is_open:
        self._is_open = True

prepare abstractmethod

prepare(name: str, capacity: int = 0) -> FrameSink

Prepare storage for a specific source and return a frame sink.

Called once per device per acquisition. Resets per-source counters and returns a FrameSink bound to name.

Parameters:

Name Type Description Default
name str

Source name.

required
capacity int

Maximum frames to accept (0 = unlimited).

0

Returns:

Type Description
FrameSink

Bound sink; call sink.write(frame) to push frames.

Raises:

Type Description
KeyError

If name has not been registered via update_source.

Source code in src/sunflare/storage/_base.py
@abc.abstractmethod
def prepare(self, name: str, capacity: int = 0) -> FrameSink:
    """Prepare storage for a specific source and return a frame sink.

    Called once per device per acquisition.  Resets per-source counters
    and returns a [`FrameSink`][sunflare.storage.FrameSink] bound to *name*.

    Parameters
    ----------
    name : str
        Source name.
    capacity : int
        Maximum frames to accept (`0` = unlimited).

    Returns
    -------
    FrameSink
        Bound sink; call `sink.write(frame)` to push frames.

    Raises
    ------
    KeyError
        If *name* has not been registered via
        [`update_source`][sunflare.storage.Writer.update_source].
    """
    source = self._sources[name]
    source.frames_written = 0
    source.collection_counter = 0
    source.stream_resource_uid = str(uuid.uuid4())
    self._active_sinks.add(name)
    return FrameSink(self, name)

complete

complete(name: str) -> None

Mark acquisition complete for name.

Called automatically by FrameSink.close. The backend is finalised once all active sinks have called close.

Parameters:

Name Type Description Default
name str

Source name.

required
Source code in src/sunflare/storage/_base.py
def complete(self, name: str) -> None:
    """Mark acquisition complete for *name*.

    Called automatically by [`FrameSink.close`][sunflare.storage.FrameSink.close].
    The backend is finalised once all active sinks have called `close`.

    Parameters
    ----------
    name : str
        Source name.
    """
    self._active_sinks.discard(name)
    if not self._active_sinks:
        self._finalize()
        self._is_open = False

collect_stream_docs

collect_stream_docs(
    name: str, indices_written: int
) -> Iterator[StreamAsset]

Yield StreamResource and StreamDatum documents for name.

Parameters:

Name Type Description Default
name str

Source name.

required
indices_written int

Number of frames to report in this call.

required

Yields:

Type Description
StreamAsset

Tuples of ("stream_resource", doc) or ("stream_datum", doc).

Raises:

Type Description
KeyError

If name is not registered.

Source code in src/sunflare/storage/_base.py
def collect_stream_docs(
    self,
    name: str,
    indices_written: int,
) -> Iterator[StreamAsset]:
    """Yield `StreamResource` and `StreamDatum` documents for *name*.

    Parameters
    ----------
    name : str
        Source name.
    indices_written : int
        Number of frames to report in this call.

    Yields
    ------
    StreamAsset
        Tuples of `("stream_resource", doc)` or `("stream_datum", doc)`.

    Raises
    ------
    KeyError
        If *name* is not registered.
    """
    if name not in self._sources:
        raise KeyError(f"Unknown source '{name}'")

    source = self._sources[name]

    if indices_written == 0:
        return

    frames_to_report = min(indices_written, source.frames_written)

    if source.collection_counter >= frames_to_report:
        return

    if source.collection_counter == 0:
        stream_resource: StreamResource = {
            "data_key": source.data_key,
            "mimetype": source.mimetype,
            "parameters": {"array_name": source.name},
            "uid": source.stream_resource_uid,
            "uri": self._store_path,
        }
        yield ("stream_resource", stream_resource)

    stream_datum: StreamDatum = {
        "descriptor": "",
        "indices": {"start": source.collection_counter, "stop": frames_to_report},
        "seq_nums": {"start": 0, "stop": 0},
        "stream_resource": source.stream_resource_uid,
        "uid": f"{source.stream_resource_uid}/{source.collection_counter}",
    }
    yield ("stream_datum", stream_datum)

    source.collection_counter = frames_to_report

AutoIncrementFilenameProvider

Returns a numerically incrementing filename on each call.

Parameters:

Name Type Description Default
base str

Optional base prefix for the filename.

''
max_digits int

Zero-padding width for the counter.

5
start int

Initial counter value.

0
step int

Increment per call.

1
delimiter str

Separator between base and counter.

'_'
Source code in src/sunflare/storage/_path.py
class AutoIncrementFilenameProvider:
    """Returns a numerically incrementing filename on each call.

    Parameters
    ----------
    base : str
        Optional base prefix for the filename.
    max_digits : int
        Zero-padding width for the counter.
    start : int
        Initial counter value.
    step : int
        Increment per call.
    delimiter : str
        Separator between *base* and counter.
    """

    def __init__(
        self,
        base: str = "",
        max_digits: int = 5,
        start: int = 0,
        step: int = 1,
        delimiter: str = "_",
    ) -> None:
        self._base = base
        self._max_digits = max_digits
        self._current = start
        self._step = step
        self._delimiter = delimiter

    def __call__(self, device_name: str | None = None) -> str:
        """Return the next incremented filename."""
        if len(str(self._current)) > self._max_digits:
            raise ValueError(f"Counter exceeded maximum of {self._max_digits} digits")
        padded = f"{self._current:0{self._max_digits}}"
        name = f"{self._base}{self._delimiter}{padded}" if self._base else padded
        self._current += self._step
        return name

FilenameProvider

Bases: Protocol

Callable that produces a filename (without extension) for a device.

Source code in src/sunflare/storage/_path.py
@runtime_checkable
class FilenameProvider(Protocol):
    """Callable that produces a filename (without extension) for a device."""

    def __call__(self, device_name: str | None = None) -> str:
        """Return a filename for the given device.

        Parameters
        ----------
        device_name : str | None
            Name of the device requesting a filename.  Implementations may
            ignore this if the filename is device-agnostic.

        Returns
        -------
        str
            A filename string without extension.
        """
        ...

PathInfo dataclass

Where and how a storage backend should write data for one device.

Attributes:

Name Type Description

Parameters:

Name Type Description Default
store_uri str

An enumeration.

required
array_key str

An enumeration.

required
capacity int

An enumeration.

0
mimetype_hint str

An enumeration.

'application/x-zarr'
extra dict[str, Any]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>
Source code in src/sunflare/storage/_path.py
@dataclass
class PathInfo:
    """Where and how a storage backend should write data for one device.

    Attributes
    ----------
    store_uri : str
        URI of the store root.  For local Zarr this is a ``file://`` URI.
        Example: ``"file:///data/scan001.zarr"``.
    array_key : str
        Key (array name) within the store for this device's data.
        Defaults to the device name.
    capacity : int
        Maximum number of frames to accept.  `0` means unlimited.
    mimetype_hint : str
        MIME type hint for the backend.  Consumers may use this to select
        the correct reader.
    extra : dict[str, Any]
        Optional backend-specific metadata (e.g. OME-Zarr axis labels,
        physical units).  Base writers ignore this field.
    """

    store_uri: str
    array_key: str
    capacity: int = 0
    mimetype_hint: str = "application/x-zarr"
    extra: dict[str, Any] = field(default_factory=dict)

PathProvider

Bases: Protocol

Callable that produces PathInfo for a device.

Source code in src/sunflare/storage/_path.py
@runtime_checkable
class PathProvider(Protocol):
    """Callable that produces [`PathInfo`][sunflare.storage.PathInfo] for a device."""

    def __call__(self, device_name: str | None = None) -> PathInfo:
        """Return path information for the given device.

        Parameters
        ----------
        device_name : str | None
            Name of the device requesting path information.

        Returns
        -------
        PathInfo
            Complete path and storage metadata for the device.
        """
        ...

StaticFilenameProvider

Always returns the same filename.

Parameters:

Name Type Description Default
filename str

The filename string to return on every call.

required
Source code in src/sunflare/storage/_path.py
class StaticFilenameProvider:
    """Always returns the same filename.

    Parameters
    ----------
    filename : str
        The filename string to return on every call.
    """

    def __init__(self, filename: str) -> None:
        self._filename = filename

    def __call__(self, device_name: str | None = None) -> str:
        """Return the static filename."""
        return self._filename

StaticPathProvider

Provides PathInfo rooted at a fixed base URI.

Composes a FilenameProvider (for the array key / filename) with a fixed base_uri (for the store location).

Parameters:

Name Type Description Default
filename_provider FilenameProvider

Callable that returns a filename for each device.

required
base_uri str

Base URI for the store root (e.g. "file:///data").

required
mimetype_hint str

MIME type hint forwarded to PathInfo.

'application/x-zarr'
capacity int

Default frame capacity forwarded to PathInfo.

0
Source code in src/sunflare/storage/_path.py
class StaticPathProvider:
    """Provides [`PathInfo`][sunflare.storage.PathInfo] rooted at a fixed base URI.

    Composes a [`FilenameProvider`][sunflare.storage.FilenameProvider]
    (for the array key / filename) with a fixed *base_uri* (for the store location).

    Parameters
    ----------
    filename_provider : FilenameProvider
        Callable that returns a filename for each device.
    base_uri : str
        Base URI for the store root (e.g. `"file:///data"`).
    mimetype_hint : str
        MIME type hint forwarded to [`PathInfo`][sunflare.storage.PathInfo].
    capacity : int
        Default frame capacity forwarded to [`PathInfo`][sunflare.storage.PathInfo].
    """

    def __init__(
        self,
        filename_provider: FilenameProvider,
        base_uri: str,
        mimetype_hint: str = "application/x-zarr",
        capacity: int = 0,
    ) -> None:
        self._filename_provider = filename_provider
        self._base_uri = base_uri.rstrip("/")
        self._mimetype_hint = mimetype_hint
        self._capacity = capacity

    def __call__(self, device_name: str | None = None) -> PathInfo:
        """Return [`PathInfo`][sunflare.storage.PathInfo] for *device_name*."""
        filename = self._filename_provider(device_name)
        store_uri = f"{self._base_uri}/{filename}"
        array_key = device_name or filename
        return PathInfo(
            store_uri=store_uri,
            array_key=array_key,
            capacity=self._capacity,
            mimetype_hint=self._mimetype_hint,
        )

UUIDFilenameProvider

Returns a fresh UUID4 string on every call.

Each call produces a new UUID, so files from different acquisitions are never overwritten.

Source code in src/sunflare/storage/_path.py
class UUIDFilenameProvider:
    """Returns a fresh UUID4 string on every call.

    Each call produces a new UUID, so files from different acquisitions
    are never overwritten.
    """

    def __call__(self, device_name: str | None = None) -> str:
        """Return a new UUID4 filename."""
        return str(uuid.uuid4())

HasStorage

Bases: Protocol

Protocol for devices that have opted in to storage.

Source code in src/sunflare/storage/_proxy.py
@runtime_checkable
class HasStorage(Protocol, metaclass=_HasStorageMeta):
    """Protocol for devices that have opted in to storage."""

    storage: StorageProxy

StorageDescriptor

Descriptor that manages the storage slot on a device.

The private attribute name is derived from the descriptor's own name at class-creation time via __set_name__ (e.g. a class attribute named storage produces a backing attribute _storage). Reading and writing go through object.__getattribute__ and object.__setattr__ rather than __dict__ access, so the descriptor works correctly on classes that define __slots__ as long as the backing slot is declared.

This descriptor is public so users can reference it explicitly in custom device classes:

from sunflare.device import Device
from sunflare.storage import StorageDescriptor


class MyDevice(Device):
    storage = StorageDescriptor()
Source code in src/sunflare/storage/_proxy.py
class StorageDescriptor:
    """Descriptor that manages the `storage` slot on a device.

    The private attribute name is derived from the descriptor's own name
    at class-creation time via `__set_name__` (e.g. a class attribute
    named `storage` produces a backing attribute `_storage`).  Reading
    and writing go through `object.__getattribute__` and
    `object.__setattr__` rather than `__dict__` access, so the descriptor
    works correctly on classes that define `__slots__` as long as the
    backing slot is declared.

    This descriptor is public so users can reference it explicitly in
    custom device classes:

    ```python
    from sunflare.device import Device
    from sunflare.storage import StorageDescriptor


    class MyDevice(Device):
        storage = StorageDescriptor()
    ```
    """

    def __init__(self) -> None:
        # Fallback name used when the descriptor is instantiated outside a
        # class body (e.g. in tests) before __set_name__ is called.
        self._private_name: str = "_storage"

    def __set_name__(self, owner: type, name: str) -> None:
        self._private_name = f"_{name}"

    @overload
    def __get__(self, obj: None, objtype: type) -> StorageDescriptor: ...

    @overload
    def __get__(self, obj: Any, objtype: type | None) -> StorageProxy | None: ...

    def __get__(
        self,
        obj: Any,
        objtype: type | None = None,
    ) -> StorageDescriptor | StorageProxy | None:
        if obj is None:
            return self
        try:
            result: StorageProxy | None = object.__getattribute__(
                obj, self._private_name
            )
        except AttributeError:
            result = None
        return result

    def __set__(self, obj: Any, value: StorageProxy | None) -> None:
        object.__setattr__(obj, self._private_name, value)

StorageProxy

Bases: Protocol

Protocol that devices use to interact with a storage backend.

Writer instances implement this protocol, so device code remains independent of the concrete backend.

Devices access the backend via their storage attribute, which is None when no backend has been configured for the session.

Methods:

Name Description
update_source

Register or update a data source on the backend.

prepare

Prepare the backend for name and return a FrameSink.

kickoff

Open the storage backend.

complete

Signal that name has finished writing.

get_indices_written

Return the number of frames written for name.

collect_stream_docs

Yield Bluesky stream documents for name.

Source code in src/sunflare/storage/_proxy.py
@runtime_checkable
class StorageProxy(Protocol):
    """Protocol that devices use to interact with a storage backend.

    [`Writer`][sunflare.storage.Writer] instances implement this protocol,
    so device code remains independent of the concrete backend.

    Devices access the backend via their `storage` attribute, which is
    `None` when no backend has been configured for the session.
    """

    def update_source(
        self,
        name: str,
        dtype: np.dtype[np.generic],
        shape: tuple[int, ...],
        extra: dict[str, Any] | None = None,
    ) -> None:
        """Register or update a data source on the backend."""
        ...

    def prepare(self, name: str, capacity: int = 0) -> FrameSink:
        """Prepare the backend for *name* and return a [`FrameSink`][sunflare.storage.FrameSink]."""
        ...

    def kickoff(self) -> None:
        """Open the storage backend."""
        ...

    def complete(self, name: str) -> None:
        """Signal that *name* has finished writing."""
        ...

    def get_indices_written(self, name: str | None = None) -> int:
        """Return the number of frames written for *name*."""
        ...

    def collect_stream_docs(
        self,
        name: str,
        indices_written: int,
    ) -> Iterator[StreamAsset]:
        """Yield Bluesky stream documents for *name*."""
        ...

update_source

update_source(
    name: str,
    dtype: dtype[generic],
    shape: tuple[int, ...],
    extra: dict[str, Any] | None = None,
) -> None

Register or update a data source on the backend.

Source code in src/sunflare/storage/_proxy.py
def update_source(
    self,
    name: str,
    dtype: np.dtype[np.generic],
    shape: tuple[int, ...],
    extra: dict[str, Any] | None = None,
) -> None:
    """Register or update a data source on the backend."""
    ...

prepare

prepare(name: str, capacity: int = 0) -> FrameSink

Prepare the backend for name and return a FrameSink.

Source code in src/sunflare/storage/_proxy.py
def prepare(self, name: str, capacity: int = 0) -> FrameSink:
    """Prepare the backend for *name* and return a [`FrameSink`][sunflare.storage.FrameSink]."""
    ...

kickoff

kickoff() -> None

Open the storage backend.

Source code in src/sunflare/storage/_proxy.py
def kickoff(self) -> None:
    """Open the storage backend."""
    ...

complete

complete(name: str) -> None

Signal that name has finished writing.

Source code in src/sunflare/storage/_proxy.py
def complete(self, name: str) -> None:
    """Signal that *name* has finished writing."""
    ...

get_indices_written

get_indices_written(name: str | None = None) -> int

Return the number of frames written for name.

Source code in src/sunflare/storage/_proxy.py
def get_indices_written(self, name: str | None = None) -> int:
    """Return the number of frames written for *name*."""
    ...

collect_stream_docs

collect_stream_docs(
    name: str, indices_written: int
) -> Iterator[StreamAsset]

Yield Bluesky stream documents for name.

Source code in src/sunflare/storage/_proxy.py
def collect_stream_docs(
    self,
    name: str,
    indices_written: int,
) -> Iterator[StreamAsset]:
    """Yield Bluesky stream documents for *name*."""
    ...