Skip to content

Engine

Run engine

RunEngine

Bases: RunEngine

The Run Engine execute messages and emits Documents.

This is a wrapper for the bluesky.run_engine.RunEngine class that allows execution without blocking the main thread. The main difference is that the __call__ method is executed in a separate thread, and it returns a concurrent.futures.Future object representing the result of the plan execution.

Parameters:

Name Type Description Default
md dict[str, Any]

The default is a standard Python dictionary, but fancier objects can be used to store long-term history and persist it between sessions. The standard configuration instantiates a Run Engine with historydict.HistoryDict, a simple interface to a sqlite file. Any object supporting __getitem__, __setitem__, and clear will work.

None
loop AbstractEventLoop

An asyncio event loop to be used for executing plans. If not provided, the RunEngine will create a new event loop using asyncio.new_event_loop(); e.g., asyncio.get_event_loop() or asyncio.new_event_loop()

get_shared_loop()
preprocessors list

Generator functions that take in a plan (generator instance) and modify its messages on the way out. Suitable examples include the functions in the module bluesky.plans with names ending in 'wrapper'. Functions are composed in order: the preprocessors [f, g] are applied like f(g(plan)).

None
md_validator Callable[dict[str, Any], None]

a function that raises and prevents starting a run if it deems the metadata to be invalid or incomplete Function should raise if md is invalid. What that means is completely up to the user. The function's return value is ignored.

None
md_normalizer Callable[dict[str, Any], dict[str, Any]]

a function that, similar to md_validator, raises and prevents starting a run if it deems the metadata to be invalid or incomplete. If it succeeds, it returns the normalized/transformed version of the original metadata. Function should raise if md is invalid. What that means is completely up to the user. Expected return: normalized metadata

None
scan_id_source Callable[dict[str, Any], int | Awaitable[int]]

a (possibly async) function that will be used to calculate scan_id. Default is to increment scan_id by 1 each time. However you could pass in a customized function to get a scan_id from any source. Expected return: updated scan_id value

default_scan_id_source
call_returns_result bool

A flag that controls the return value of __call__. If True, the RunEngine will return a :class:RunEngineResult object that contains information about the plan that was run. If False, the RunEngine will return a tuple of uids. The potential return value is encapsulated in the returned Future object, accessible via future.result(). Defaults to True.

True

Attributes:

Name Type Description
md

Direct access to the dict-like persistent storage described above

record_interruptions

False by default. Set to True to generate an extra event stream that records any interruptions (pauses, suspensions).

state

{'idle', 'running', 'paused'}

suspenders

Read-only collection of bluesky.suspenders.SuspenderBase objects which can suspend and resume execution; see related methods.

preprocessors list

Generator functions that take in a plan (generator instance) and modify its messages on the way out. Suitable examples include the functions in the module bluesky.plans with names ending in 'wrapper'. Functions are composed in order: the preprocessors [f, g] are applied like f(g(plan)).

msg_hook

Callable that receives all messages before they are processed (useful for logging or other development purposes); expected signature is f(msg) where msg is a bluesky.Msg, a kind of namedtuple; default is None.

state_hook

Callable with signature f(new_state, old_state) that will be called whenever the RunEngine's state attribute is updated; default is None

waiting_hook

Callable with signature f(status_object) that will be called whenever the RunEngine is waiting for long-running commands (trigger, set, kickoff, complete) to complete. This hook is useful to incorporate a progress bar.

ignore_callback_exceptions

Boolean, False by default.

call_returns_result

Boolean, False by default. If False, RunEngine will return uuid list after running a plan. If True, RunEngine will return a RunEngineResult object that contains the plan result, error status, and uuid list.

loop asyncio event loop

e.g., asyncio.get_event_loop() or asyncio.new_event_loop()

max_depth

Maximum stack depth; set this to prevent users from calling the RunEngine inside a function (which can result in unexpected behavior and breaks introspection tools). Default is None. For built-in Python interpreter, set to 2. For IPython, set to 11 (tested on IPython 5.1.0; other versions may vary).

pause_msg str

The message printed when a run is interrupted. This message includes instructions of changing the state of the RunEngine. It is set to bluesky.run_engine.PAUSE_MSG by default and can be modified based on needs.

commands

The list of commands available to Msg.

Source code in src/redsun/engine/_wrapper.py
class RunEngine(BlueskyRunEngine):
    """The Run Engine execute messages and emits Documents.

    This is a wrapper for the `bluesky.run_engine.RunEngine` class that
    allows execution without blocking the main thread.
    The main difference is that the ``__call__`` method
    is executed in a separate thread,
    and it returns a concurrent.futures.Future object
    representing the result of the plan execution.

    Parameters
    ----------
    md : dict[str, Any], optional
        The default is a standard Python dictionary, but fancier
        objects can be used to store long-term history and persist
        it between sessions. The standard configuration
        instantiates a Run Engine with historydict.HistoryDict, a
        simple interface to a sqlite file. Any object supporting
        `__getitem__`, `__setitem__`, and `clear` will work.

    loop: asyncio.AbstractEventLoop, optional
        An asyncio event loop to be used for executing plans. If not provided,
        the RunEngine will create a new event loop using ``asyncio.new_event_loop()``;
        e.g., ``asyncio.get_event_loop()`` or ``asyncio.new_event_loop()``

    preprocessors : list, optional
        Generator functions that take in a plan (generator instance) and
        modify its messages on the way out. Suitable examples include
        the functions in the module ``bluesky.plans`` with names ending in
        'wrapper'.  Functions are composed in order: the preprocessors
        ``[f, g]`` are applied like ``f(g(plan))``.

    md_validator : Callable[dict[str, Any], None], optional
        a function that raises and prevents starting a run if it deems
        the metadata to be invalid or incomplete
        Function should raise if md is invalid. What that means is
        completely up to the user. The function's return value is
        ignored.

    md_normalizer : Callable[dict[str, Any], dict[str, Any]], optional
        a function that, similar to md_validator, raises and prevents starting
        a run if it deems the metadata to be invalid or incomplete.
        If it succeeds, it returns the normalized/transformed version of
        the original metadata.
        Function should raise if md is invalid. What that means is
        completely up to the user.
        Expected return: normalized metadata

    scan_id_source : Callable[dict[str, Any], int | Awaitable[int]], optional
        a (possibly async) function that will be used to calculate scan_id.
        Default is to increment scan_id by 1 each time. However you could pass
        in a customized function to get a scan_id from any source.
        Expected return: updated scan_id value

    call_returns_result : bool, default True
        A flag that controls the return value of ``__call__``.
        If ``True``, the ``RunEngine`` will return a :class:``RunEngineResult``
        object that contains information about the plan that was run.
        If ``False``, the ``RunEngine`` will return a tuple of uids.
        The potential return value is encapsulated in the returned Future object,
        accessible via ``future.result()``.
        Defaults to ``True``.


    Attributes
    ----------
    md
        Direct access to the dict-like persistent storage described above

    record_interruptions
        False by default. Set to True to generate an extra event stream
        that records any interruptions (pauses, suspensions).

    state
        {'idle', 'running', 'paused'}

    suspenders
        Read-only collection of `bluesky.suspenders.SuspenderBase` objects
        which can suspend and resume execution; see related methods.

    preprocessors : list
        Generator functions that take in a plan (generator instance) and
        modify its messages on the way out. Suitable examples include
        the functions in the module ``bluesky.plans`` with names ending in
        'wrapper'.  Functions are composed in order: the preprocessors
        ``[f, g]`` are applied like ``f(g(plan))``.

    msg_hook
        Callable that receives all messages before they are processed
        (useful for logging or other development purposes); expected
        signature is ``f(msg)`` where ``msg`` is a ``bluesky.Msg``, a
        kind of namedtuple; default is None.

    state_hook
        Callable with signature ``f(new_state, old_state)`` that will be
        called whenever the RunEngine's state attribute is updated; default
        is None

    waiting_hook
        Callable with signature ``f(status_object)`` that will be called
        whenever the RunEngine is waiting for long-running commands
        (trigger, set, kickoff, complete) to complete. This hook is useful to
        incorporate a progress bar.

    ignore_callback_exceptions
        Boolean, False by default.

    call_returns_result
        Boolean, False by default. If False, RunEngine will return uuid list
        after running a plan. If True, RunEngine will return a RunEngineResult
        object that contains the plan result, error status, and uuid list.

    loop : asyncio event loop
        e.g., ``asyncio.get_event_loop()`` or ``asyncio.new_event_loop()``

    max_depth
        Maximum stack depth; set this to prevent users from calling the
        RunEngine inside a function (which can result in unexpected
        behavior and breaks introspection tools). Default is None.
        For built-in Python interpreter, set to 2. For IPython, set to 11
        (tested on IPython 5.1.0; other versions may vary).

    pause_msg : str
        The message printed when a run is interrupted. This message
        includes instructions of changing the state of the RunEngine.
        It is set to ``bluesky.run_engine.PAUSE_MSG`` by default and
        can be modified based on needs.

    commands:
        The list of commands available to Msg.

    """

    # TODO: using get_shared_loop() like this is a bit
    # fragile; there should be a private function that ensures
    # the shared loop is created only once at application startup
    # and properly cleaned up at shutdown; this is just a quick solution to
    # get the shared loop working for now
    def __init__(
        self,
        md: dict[str, Any] | None = None,
        *,
        loop: asyncio.AbstractEventLoop = get_shared_loop(),
        preprocessors: list[Preprocessor] | None = None,
        md_validator: MDValidator | None = None,
        md_normalizer: MDNormalizer | None = None,
        scan_id_source: MDScanIDSource | None = default_scan_id_source,
        call_returns_result: bool = True,
    ):
        # force the context_managers to be empty,
        # otherwise the RunEngine will try to use the
        # SignalHandler context manager
        self._executor = ThreadPoolExecutor(max_workers=1)

        super().__init__(
            md=md,
            loop=loop,
            preprocessors=preprocessors,
            md_validator=md_validator,
            md_normalizer=md_normalizer,
            scan_id_source=scan_id_source,  # type: ignore[arg-type]
            call_returns_result=call_returns_result,
            context_managers=[],
        )

        # override pause message to be an empty string
        self.pause_msg = ""

        # register custom commands
        self._command_registry.update(
            {
                "wait_for_actions": self._wait_for_actions,
            }
        )

    def __call__(  # type: ignore[override]
        self,
        plan: Iterable[Msg],
        subs: Subscribers | None = None,
        /,
        **metadata_kw: Any,
    ) -> Future[RunEngineResult | tuple[str, ...]]:
        """Execute a plan.

        Any keyword arguments will be interpreted as metadata and recorded with
        any run(s) created by executing the plan. Notice that the plan
        (required) and extra subscriptions (optional) must be given as
        positional arguments.

        Parameters
        ----------
        plan : typing.Iterable[`bluesky.utils.Msg`]
            A generator or that yields ``Msg`` objects (or an iterable that
            returns such a generator).
        subs : `bluesky.utils.Subscribers`, optional (positional only)
            Temporary subscriptions (a.k.a. callbacks) to be used on this run.
            For convenience, any of the following are accepted:

            * a callable, which will be subscribed to 'all'
            * a list of callables, which again will be subscribed to 'all'
            * a dictionary, mapping specific subscriptions to callables or
              lists of callables; valid keys are {'all', 'start', 'stop',
              'event', 'descriptor'}

        Returns
        -------
        Future[RunEngineResult | tuple[str, ...]]
            Future object representing the result of the plan execution.

        The result contained in the future is either:
        uids : tuple
            list of uids (i.e. RunStart Document uids) of run(s)
            if :attr:`RunEngine._call_returns_result` is ``False``
        result : :class:`RunEngineResult`
            if :attr:`RunEngine._call_returns_result` is ``True``
        """
        return self._executor.submit(
            super().__call__,
            plan,
            subs,
            **metadata_kw,
        )

    def resume(self) -> Future[RunEngineResult | tuple[str, ...]]:
        """Resume the paused plan in a separate thread.

        If the plan has been paused, the initial
        future returned by ``__call__`` will be set as completed.

        With this method, the plan is resumed in a separate thread,
        and a new future is returned.

        Returns
        -------
        ``Future[RunEngineResult | tuple[str, ...]]``
            Future object representing the result of the resumed plan.
        """
        return self._executor.submit(super().resume)

    async def _wait_for_actions(self, msg: Msg) -> tuple[str, SRLatch] | None:
        """Instruct the run engine to wait for any of the given latches to be set or reset.

        Parameters
        ----------
        msg: Msg
            The message containing the latches to wait for.
            Packs a map of SRLatch in `msg.args` and a timeout in `msg.kwargs`.

            Expected message format:

            Msg("wait_for_actions", None, latches, timeout=timeout, wait_for="set")

        Returns
        -------
        tuple[str, SRLatch] | None
            A tuple containing the name and the latch that was set/reset to unblock the plan;
            None if timeout occurred before any latch changed state.
        """
        latch_map: Mapping[str, SRLatch] = msg.args[0]
        timeout: float | None = msg.kwargs.get("timeout", None)
        wait_for: Literal["set", "reset"] = msg.kwargs.get("wait_for", "set")

        # Create a mapping to track which task corresponds to which latch
        if wait_for == "set":
            latch_tasks = {
                asyncio.create_task(latch.wait_for_set(), name=name)
                for name, latch in latch_map.items()
            }
        else:
            latch_tasks = {
                asyncio.create_task(latch.wait_for_reset(), name=name)
                for name, latch in latch_map.items()
            }

        done, pending = await asyncio.wait(
            latch_tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout
        )

        # Cancel all pending tasks
        for task in pending:
            task.cancel()

        # Return the latch that changed state
        if not done:
            return None
        completed_task = done.pop()
        task_name = completed_task.get_name()
        return task_name, latch_map[task_name]

resume

resume() -> Future[RunEngineResult | tuple[str, ...]]

Resume the paused plan in a separate thread.

If the plan has been paused, the initial future returned by __call__ will be set as completed.

With this method, the plan is resumed in a separate thread, and a new future is returned.

Returns:

Type Description
``Future[RunEngineResult | tuple[str, ...]]``

Future object representing the result of the resumed plan.

Source code in src/redsun/engine/_wrapper.py
def resume(self) -> Future[RunEngineResult | tuple[str, ...]]:
    """Resume the paused plan in a separate thread.

    If the plan has been paused, the initial
    future returned by ``__call__`` will be set as completed.

    With this method, the plan is resumed in a separate thread,
    and a new future is returned.

    Returns
    -------
    ``Future[RunEngineResult | tuple[str, ...]]``
        Future object representing the result of the resumed plan.
    """
    return self._executor.submit(super().resume)

Actions

Engine actions: decorators and types for continuous, interactive plans.

A continuous plan is one that runs in an infinite loop until explicitly stopped, and may support pause/resume and in-flight actions (user-triggered side effects while the plan is running).

This module provides:

  • SRLatch — an asyncio set-reset latch used to synchronise plan execution with external signals.
  • continous — a decorator that marks a plan function as continuous and records its togglable and pausable capabilities.
  • Action — a dataclass carrying metadata (name, description, toggle state) for a single in-flight action.
  • ContinousPlan — a typing.Protocol used for static typing and runtime isinstance checks on decorated plans.

Action dataclass

Metadata for an in-flight action on a continuous plan.

An Action is a user-triggerable side effect that can be fired while a continuous plan is running. It encapsulates an SRLatch synchronisation primitive so the plan can await the action being triggered.

Warning

The internal SRLatch is created lazily on first access of event_map, so Action objects can be constructed without a running event loop. The latch must only be accessed from within a plan.

Subclassable to add additional fields for domain-specific use cases.

Attributes:

Name Type Description
name str

Name of the action.

description str

Brief description of the action, usable as UI tooltip.

togglable bool

Whether the action is togglable or not.

toggle_states tuple[str, str]

Labels for the toggle states (on, off). Only used if togglable is True.

Source code in src/redsun/engine/actions.py
@dataclass(kw_only=True)
class Action:
    """Metadata for an in-flight action on a continuous plan.

    An `Action` is a user-triggerable side effect that can be fired while a
    continuous plan is running.  It encapsulates an `SRLatch` synchronisation
    primitive so the plan can ``await`` the action being triggered.

    !!! warning
        The internal `SRLatch` is created lazily on first access of
        `event_map`, so `Action` objects can be constructed without a running
        event loop.  The latch must only be accessed from within a plan.

    Subclassable to add additional fields for domain-specific use cases.
    """

    name: str
    """Name of the action."""

    description: str = field(default="")
    """Brief description of the action, usable as UI tooltip."""

    togglable: bool = field(default=False)
    """Whether the action is togglable or not."""

    toggle_states: tuple[str, str] = field(default=("On", "Off"))
    """Labels for the toggle states (on, off). Only used if `togglable` is True."""

    _latch: SRLatch | None = field(init=False, default=None, repr=False)

    @property
    def event_map(self) -> dict[str, SRLatch]:
        """Return the latch for this action as a single-entry dict keyed by name."""
        if not self._latch:
            self._latch = SRLatch()
        return {self.name: self._latch}

event_map property

event_map: dict[str, SRLatch]

Return the latch for this action as a single-entry dict keyed by name.

SRLatch

An asyncio Event-like object that behaves as a set-reset latch.

Wraps two asyncio.Event objects to allow waiting for either the set or the reset state of the latch. At construction the latch starts in the reset state.

Source code in src/redsun/engine/actions.py
class SRLatch:
    """An asyncio Event-like object that behaves as a set-reset latch.

    Wraps two `asyncio.Event` objects to allow waiting for either the *set*
    or the *reset* state of the latch.  At construction the latch starts in
    the **reset** state.
    """

    def __init__(self) -> None:
        self._flag: bool = False
        self._set_event: asyncio.Event = asyncio.Event()
        self._reset_event: asyncio.Event = asyncio.Event()
        self._reset_event.set()

    def set(self) -> None:
        """Set the internal flag to True.

        All coroutines waiting in `wait_for_set` are awakened.
        No-op if the flag is already set.
        """
        if not self._flag:
            self._flag = True
            self._set_event.set()
            self._reset_event.clear()

    def reset(self) -> None:
        """Reset the internal flag to False.

        All coroutines waiting in `wait_for_reset` are awakened.
        No-op if the flag is already reset.
        """
        if self._flag:
            self._flag = False
            self._reset_event.set()
            self._set_event.clear()

    def is_set(self) -> bool:
        """Return True if the internal flag is set, False otherwise."""
        return self._flag

    async def wait_for_set(self) -> None:
        """Wait until the internal flag is set.

        Returns immediately if the flag is already set; otherwise blocks
        until another coroutine calls `set`.
        """
        if self._flag:
            return
        await self._set_event.wait()

    async def wait_for_reset(self) -> None:
        """Wait until the internal flag is reset.

        Returns immediately if the flag is already reset; otherwise blocks
        until another coroutine calls `reset`.
        """
        if not self._flag:
            return
        await self._reset_event.wait()

set

set() -> None

Set the internal flag to True.

All coroutines waiting in wait_for_set are awakened. No-op if the flag is already set.

Source code in src/redsun/engine/actions.py
def set(self) -> None:
    """Set the internal flag to True.

    All coroutines waiting in `wait_for_set` are awakened.
    No-op if the flag is already set.
    """
    if not self._flag:
        self._flag = True
        self._set_event.set()
        self._reset_event.clear()

reset

reset() -> None

Reset the internal flag to False.

All coroutines waiting in wait_for_reset are awakened. No-op if the flag is already reset.

Source code in src/redsun/engine/actions.py
def reset(self) -> None:
    """Reset the internal flag to False.

    All coroutines waiting in `wait_for_reset` are awakened.
    No-op if the flag is already reset.
    """
    if self._flag:
        self._flag = False
        self._reset_event.set()
        self._set_event.clear()

is_set

is_set() -> bool

Return True if the internal flag is set, False otherwise.

Source code in src/redsun/engine/actions.py
def is_set(self) -> bool:
    """Return True if the internal flag is set, False otherwise."""
    return self._flag

wait_for_set async

wait_for_set() -> None

Wait until the internal flag is set.

Returns immediately if the flag is already set; otherwise blocks until another coroutine calls set.

Source code in src/redsun/engine/actions.py
async def wait_for_set(self) -> None:
    """Wait until the internal flag is set.

    Returns immediately if the flag is already set; otherwise blocks
    until another coroutine calls `set`.
    """
    if self._flag:
        return
    await self._set_event.wait()

wait_for_reset async

wait_for_reset() -> None

Wait until the internal flag is reset.

Returns immediately if the flag is already reset; otherwise blocks until another coroutine calls reset.

Source code in src/redsun/engine/actions.py
async def wait_for_reset(self) -> None:
    """Wait until the internal flag is reset.

    Returns immediately if the flag is already reset; otherwise blocks
    until another coroutine calls `reset`.
    """
    if not self._flag:
        return
    await self._reset_event.wait()

ContinousPlan

Bases: Protocol[P, R_co]

Protocol for plans decorated with continous.

Used both for static typing (as the return type of the continous decorator) and for runtime isinstance checks:

if isinstance(f, ContinousPlan):
    print(f.__togglable__, f.__pausable__)

Attributes:

Name Type Description
__togglable__ bool

Whether the plan is togglable (i.e. runs as an infinite loop that the run engine can stop).

__pausable__ bool

Whether the plan can be paused and resumed by the run engine.

Source code in src/redsun/engine/actions.py
@runtime_checkable
class ContinousPlan(Protocol[P, R_co]):
    """Protocol for plans decorated with `continous`.

    Used both for static typing (as the return type of the `continous`
    decorator) and for runtime ``isinstance`` checks:

    ```python
    if isinstance(f, ContinousPlan):
        print(f.__togglable__, f.__pausable__)
    ```

    Attributes
    ----------
    __togglable__ : bool
        Whether the plan is togglable (i.e. runs as an infinite loop that
        the run engine can stop).
    __pausable__ : bool
        Whether the plan can be paused and resumed by the run engine.
    """

    __togglable__: bool
    __pausable__: bool

    @abstractmethod
    def __call__(  # noqa: D102
        self, *args: P.args, **kwargs: P.kwargs
    ) -> R_co:  # pragma: no cover - protocol
        ...

continous

continous(
    func: Callable[P, R_co],
) -> ContinousPlan[P, R_co]
continous(
    *, togglable: bool = True, pausable: bool = False
) -> Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]]
continous(
    func: Callable[P, R_co] | None = None,
    /,
    *,
    togglable: bool = True,
    pausable: bool = False,
) -> (
    Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]]
    | ContinousPlan[P, R_co]
)

Mark a plan as continuous.

A continuous plan informs the view to provide UI controls that allow the user to start, stop, pause, and resume plan execution.

Can be used with or without arguments:

@continous
def my_plan() -> MsgGenerator[None]: ...


@continous(togglable=True, pausable=True)
def my_plan(detectors: Sequence[DetectorProtocol]) -> MsgGenerator[None]: ...

Parameters:

Name Type Description Default
togglable bool

Whether the plan runs as an infinite loop that the run engine can stop via a toggle button. Default is True.

True
pausable bool

Whether the plan can be paused and resumed by the run engine. Default is False.

False

Returns:

Type Description
ContinousPlan

The decorated plan function, typed as a ContinousPlan.

Notes

The decorator does not modify the function signature. It stores togglable and pausable as attributes on the function object (__togglable__ and __pausable__), to be retrieved later by create_plan_spec.

Source code in src/redsun/engine/actions.py
def continous(
    func: Callable[P, R_co] | None = None,
    /,
    *,
    togglable: bool = True,
    pausable: bool = False,
) -> Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]] | ContinousPlan[P, R_co]:
    """Mark a plan as continuous.

    A *continuous* plan informs the view to provide UI controls that allow
    the user to start, stop, pause, and resume plan execution.

    Can be used with or without arguments:

    ```python
    @continous
    def my_plan() -> MsgGenerator[None]: ...


    @continous(togglable=True, pausable=True)
    def my_plan(detectors: Sequence[DetectorProtocol]) -> MsgGenerator[None]: ...
    ```

    Parameters
    ----------
    togglable : bool, optional
        Whether the plan runs as an infinite loop that the run engine can
        stop via a toggle button. Default is True.
    pausable : bool, optional
        Whether the plan can be paused and resumed by the run engine.
        Default is False.

    Returns
    -------
    ContinousPlan
        The decorated plan function, typed as a `ContinousPlan`.

    Notes
    -----
    The decorator does not modify the function signature. It stores
    ``togglable`` and ``pausable`` as attributes on the function object
    (``__togglable__`` and ``__pausable__``), to be retrieved later by
    `create_plan_spec`.
    """

    def decorator(func: Callable[P, R_co]) -> ContinousPlan[P, R_co]:
        setattr(func, "__togglable__", togglable)
        setattr(func, "__pausable__", pausable)
        return cast("ContinousPlan[P, R_co]", func)

    if func is None:
        return decorator

    return decorator(func)

Plan stubs

Custom Bluesky plan stubs for redsun plans.

These stubs extend the standard bluesky.plan_stubs with redsun-specific action-based flow control (wait_for_actions, read_while_waiting).

All functions are generator functions that yield Msg objects and are intended to be composed inside larger Bluesky plans via yield from.

wait_for_actions

wait_for_actions(
    events: Mapping[str, SRLatch],
    timeout: float = SIXTY_FPS,
    wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch]]

Wait for any of the given latches to change state.

Loops at timeout intervals until a latch transitions, then returns the name and latch that fired. Plan execution yields control on each iteration so background tasks continue running normally.

Parameters:

Name Type Description Default
events Mapping[str, SRLatch]

Mapping of action names to their SRLatch objects.

required
timeout float

Polling interval in seconds. Default is 1/60 s (60 Hz).

SIXTY_FPS
wait_for Literal['set', 'reset']

Whether to wait for a latch to be set or reset. Default is "set".

'set'

Returns:

Type Description
tuple[str, SRLatch]

The name and latch that changed state.

Source code in src/redsun/engine/plan_stubs.py
def wait_for_actions(
    events: Mapping[str, SRLatch],
    timeout: float = SIXTY_FPS,
    wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch]]:
    """Wait for any of the given latches to change state.

    Loops at *timeout* intervals until a latch transitions, then returns
    the name and latch that fired. Plan execution yields control on each
    iteration so background tasks continue running normally.

    Parameters
    ----------
    events : Mapping[str, SRLatch]
        Mapping of action names to their `SRLatch` objects.
    timeout : float, optional
        Polling interval in seconds. Default is 1/60 s (60 Hz).
    wait_for : Literal["set", "reset"], optional
        Whether to wait for a latch to be set or reset.
        Default is `"set"`.

    Returns
    -------
    tuple[str, SRLatch]
        The name and latch that changed state.
    """
    result: tuple[str, SRLatch] | None = None
    while result is None:
        yield from bps.checkpoint()
        result = yield Msg(
            "wait_for_actions", None, events, timeout=timeout, wait_for=wait_for
        )
    return result

describe

describe(
    obj: Readable[Any],
) -> MsgGenerator[dict[str, Descriptor]]

Gather the descriptor from a Readable device.

Parameters:

Name Type Description Default
obj Readable[Any]

The device to describe.

required

Returns:

Type Description
dict[str, Descriptor]

The descriptor dict returned by obj.describe().

Source code in src/redsun/engine/plan_stubs.py
def describe(
    obj: Readable[Any],
) -> MsgGenerator[dict[str, Descriptor]]:
    """Gather the descriptor from a `Readable` device.

    Parameters
    ----------
    obj : Readable[Any]
        The device to describe.

    Returns
    -------
    dict[str, Descriptor]
        The descriptor dict returned by ``obj.describe()``.
    """

    async def _describe() -> dict[str, Descriptor]:
        return await maybe_await(obj.describe())

    task: list[asyncio.Task[dict[str, Descriptor]]] = yield from bps.wait_for(
        [_describe]
    )
    result = task[0].result()
    return result

describe_collect

describe_collect(
    obj: Collectable,
) -> MsgGenerator[
    dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
]

Gather descriptors from a Collectable device.

Parameters:

Name Type Description Default
obj Collectable

The device to describe.

required

Returns:

Type Description
dict[str, Descriptor] | dict[str, dict[str, Descriptor]]

The descriptor dict returned by obj.describe_collect().

Source code in src/redsun/engine/plan_stubs.py
def describe_collect(
    obj: Collectable,
) -> MsgGenerator[dict[str, Descriptor] | dict[str, dict[str, Descriptor]]]:
    """Gather descriptors from a `Collectable` device.

    Parameters
    ----------
    obj : Collectable
        The device to describe.

    Returns
    -------
    dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
        The descriptor dict returned by ``obj.describe_collect()``.
    """

    async def _describe_collect() -> (
        dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
    ):
        return await maybe_await(obj.describe_collect())

    task: list[
        asyncio.Task[dict[str, Descriptor] | dict[str, dict[str, Descriptor]]]
    ] = yield from bps.wait_for([_describe_collect])
    result = task[0].result()

    return result