Skip to content

Engine

Run engine

RunEngine

Bases: RunEngine

The Run Engine execute messages and emits Documents.

This is a wrapper for the bluesky.run_engine.RunEngine class that allows execution without blocking the main thread. The main difference is that the __call__ method is executed in a separate thread, and it returns a concurrent.futures.Future object representing the result of the plan execution.

Parameters:

Name Type Description Default
md dict[str, Any]

The default is a standard Python dictionary, but fancier objects can be used to store long-term history and persist it between sessions. The standard configuration instantiates a Run Engine with historydict.HistoryDict, a simple interface to a sqlite file. Any object supporting __getitem__, __setitem__, and clear will work.

None
loop AbstractEventLoop | None

An asyncio event loop to be used for executing plans. If not provided, the RunEngine will create a new event loop using asyncio.new_event_loop(); e.g., asyncio.get_event_loop() or asyncio.new_event_loop()

None
preprocessors list

Generator functions that take in a plan (generator instance) and modify its messages on the way out. Suitable examples include the functions in the module bluesky.plans with names ending in 'wrapper'. Functions are composed in order: the preprocessors [f, g] are applied like f(g(plan)).

None
md_validator Callable[dict[str, Any], None]

a function that raises and prevents starting a run if it deems the metadata to be invalid or incomplete Function should raise if md is invalid. What that means is completely up to the user. The function's return value is ignored.

None
md_normalizer Callable[dict[str, Any], dict[str, Any]]

a function that, similar to md_validator, raises and prevents starting a run if it deems the metadata to be invalid or incomplete. If it succeeds, it returns the normalized/transformed version of the original metadata. Function should raise if md is invalid. What that means is completely up to the user. Expected return: normalized metadata

None
scan_id_source Callable[dict[str, Any], int | Awaitable[int]]

a (possibly async) function that will be used to calculate scan_id. Default is to increment scan_id by 1 each time. However you could pass in a customized function to get a scan_id from any source. Expected return: updated scan_id value

default_scan_id_source
call_returns_result bool

A flag that controls the return value of __call__. If True, the RunEngine will return a :class:RunEngineResult object that contains information about the plan that was run. If False, the RunEngine will return a tuple of uids. The potential return value is encapsulated in the returned Future object, accessible via future.result(). Defaults to True.

True

Attributes:

Name Type Description
md

Direct access to the dict-like persistent storage described above

record_interruptions

False by default. Set to True to generate an extra event stream that records any interruptions (pauses, suspensions).

state

{'idle', 'running', 'paused'}

suspenders

Read-only collection of bluesky.suspenders.SuspenderBase objects which can suspend and resume execution; see related methods.

preprocessors list

Generator functions that take in a plan (generator instance) and modify its messages on the way out. Suitable examples include the functions in the module bluesky.plans with names ending in 'wrapper'. Functions are composed in order: the preprocessors [f, g] are applied like f(g(plan)).

msg_hook

Callable that receives all messages before they are processed (useful for logging or other development purposes); expected signature is f(msg) where msg is a bluesky.Msg, a kind of namedtuple; default is None.

state_hook

Callable with signature f(new_state, old_state) that will be called whenever the RunEngine's state attribute is updated; default is None

waiting_hook

Callable with signature f(status_object) that will be called whenever the RunEngine is waiting for long-running commands (trigger, set, kickoff, complete) to complete. This hook is useful to incorporate a progress bar.

ignore_callback_exceptions

Boolean, False by default.

call_returns_result

Boolean, False by default. If False, RunEngine will return uuid list after running a plan. If True, RunEngine will return a RunEngineResult object that contains the plan result, error status, and uuid list.

loop asyncio event loop

e.g., asyncio.get_event_loop() or asyncio.new_event_loop()

max_depth

Maximum stack depth; set this to prevent users from calling the RunEngine inside a function (which can result in unexpected behavior and breaks introspection tools). Default is None. For built-in Python interpreter, set to 2. For IPython, set to 11 (tested on IPython 5.1.0; other versions may vary).

pause_msg str

The message printed when a run is interrupted. This message includes instructions of changing the state of the RunEngine. It is set to bluesky.run_engine.PAUSE_MSG by default and can be modified based on needs.

commands

The list of commands available to Msg.

Source code in src/redsun/engine/_wrapper.py
class RunEngine(BlueskyRunEngine):
    """The Run Engine execute messages and emits Documents.

    This is a wrapper for the `bluesky.run_engine.RunEngine` class that
    allows execution without blocking the main thread.
    The main difference is that the ``__call__`` method
    is executed in a separate thread,
    and it returns a concurrent.futures.Future object
    representing the result of the plan execution.

    Parameters
    ----------
    md : dict[str, Any], optional
        The default is a standard Python dictionary, but fancier
        objects can be used to store long-term history and persist
        it between sessions. The standard configuration
        instantiates a Run Engine with historydict.HistoryDict, a
        simple interface to a sqlite file. Any object supporting
        `__getitem__`, `__setitem__`, and `clear` will work.

    loop: asyncio.AbstractEventLoop, optional
        An asyncio event loop to be used for executing plans. If not provided,
        the RunEngine will create a new event loop using ``asyncio.new_event_loop()``;
        e.g., ``asyncio.get_event_loop()`` or ``asyncio.new_event_loop()``

    preprocessors : list, optional
        Generator functions that take in a plan (generator instance) and
        modify its messages on the way out. Suitable examples include
        the functions in the module ``bluesky.plans`` with names ending in
        'wrapper'.  Functions are composed in order: the preprocessors
        ``[f, g]`` are applied like ``f(g(plan))``.

    md_validator : Callable[dict[str, Any], None], optional
        a function that raises and prevents starting a run if it deems
        the metadata to be invalid or incomplete
        Function should raise if md is invalid. What that means is
        completely up to the user. The function's return value is
        ignored.

    md_normalizer : Callable[dict[str, Any], dict[str, Any]], optional
        a function that, similar to md_validator, raises and prevents starting
        a run if it deems the metadata to be invalid or incomplete.
        If it succeeds, it returns the normalized/transformed version of
        the original metadata.
        Function should raise if md is invalid. What that means is
        completely up to the user.
        Expected return: normalized metadata

    scan_id_source : Callable[dict[str, Any], int | Awaitable[int]], optional
        a (possibly async) function that will be used to calculate scan_id.
        Default is to increment scan_id by 1 each time. However you could pass
        in a customized function to get a scan_id from any source.
        Expected return: updated scan_id value

    call_returns_result : bool, default True
        A flag that controls the return value of ``__call__``.
        If ``True``, the ``RunEngine`` will return a :class:``RunEngineResult``
        object that contains information about the plan that was run.
        If ``False``, the ``RunEngine`` will return a tuple of uids.
        The potential return value is encapsulated in the returned Future object,
        accessible via ``future.result()``.
        Defaults to ``True``.


    Attributes
    ----------
    md
        Direct access to the dict-like persistent storage described above

    record_interruptions
        False by default. Set to True to generate an extra event stream
        that records any interruptions (pauses, suspensions).

    state
        {'idle', 'running', 'paused'}

    suspenders
        Read-only collection of `bluesky.suspenders.SuspenderBase` objects
        which can suspend and resume execution; see related methods.

    preprocessors : list
        Generator functions that take in a plan (generator instance) and
        modify its messages on the way out. Suitable examples include
        the functions in the module ``bluesky.plans`` with names ending in
        'wrapper'.  Functions are composed in order: the preprocessors
        ``[f, g]`` are applied like ``f(g(plan))``.

    msg_hook
        Callable that receives all messages before they are processed
        (useful for logging or other development purposes); expected
        signature is ``f(msg)`` where ``msg`` is a ``bluesky.Msg``, a
        kind of namedtuple; default is None.

    state_hook
        Callable with signature ``f(new_state, old_state)`` that will be
        called whenever the RunEngine's state attribute is updated; default
        is None

    waiting_hook
        Callable with signature ``f(status_object)`` that will be called
        whenever the RunEngine is waiting for long-running commands
        (trigger, set, kickoff, complete) to complete. This hook is useful to
        incorporate a progress bar.

    ignore_callback_exceptions
        Boolean, False by default.

    call_returns_result
        Boolean, False by default. If False, RunEngine will return uuid list
        after running a plan. If True, RunEngine will return a RunEngineResult
        object that contains the plan result, error status, and uuid list.

    loop : asyncio event loop
        e.g., ``asyncio.get_event_loop()`` or ``asyncio.new_event_loop()``

    max_depth
        Maximum stack depth; set this to prevent users from calling the
        RunEngine inside a function (which can result in unexpected
        behavior and breaks introspection tools). Default is None.
        For built-in Python interpreter, set to 2. For IPython, set to 11
        (tested on IPython 5.1.0; other versions may vary).

    pause_msg : str
        The message printed when a run is interrupted. This message
        includes instructions of changing the state of the RunEngine.
        It is set to ``bluesky.run_engine.PAUSE_MSG`` by default and
        can be modified based on needs.

    commands:
        The list of commands available to Msg.

    """

    def __init__(
        self,
        md: dict[str, Any] | None = None,
        *,
        loop: asyncio.AbstractEventLoop | None = None,
        preprocessors: list[Preprocessor] | None = None,
        md_validator: MDValidator | None = None,
        md_normalizer: MDNormalizer | None = None,
        scan_id_source: MDScanIDSource | None = default_scan_id_source,
        call_returns_result: bool = True,
    ):
        # force the context_managers to be empty,
        # otherwise the RunEngine will try to use the
        # SignalHandler context manager
        self._executor = ThreadPoolExecutor(max_workers=1)

        super().__init__(
            md=md,
            loop=loop,
            preprocessors=preprocessors,
            md_validator=md_validator,
            md_normalizer=md_normalizer,
            scan_id_source=scan_id_source,  # type: ignore[arg-type]
            call_returns_result=call_returns_result,
            context_managers=[],
        )

        # override pause message to be an empty string
        self.pause_msg = ""

        # register custom commands
        self._command_registry.update(
            {
                "wait_for_actions": self._wait_for_actions,
                "stash": self._stash,
                "clear_cache": self._clear_cache,
            }
        )

    def __call__(  # type: ignore[override]
        self,
        plan: Iterable[Msg],
        subs: Subscribers | None = None,
        /,
        **metadata_kw: Any,
    ) -> Future[RunEngineResult | tuple[str, ...]]:
        """Execute a plan.

        Any keyword arguments will be interpreted as metadata and recorded with
        any run(s) created by executing the plan. Notice that the plan
        (required) and extra subscriptions (optional) must be given as
        positional arguments.

        Parameters
        ----------
        plan : typing.Iterable[`bluesky.utils.Msg`]
            A generator or that yields ``Msg`` objects (or an iterable that
            returns such a generator).
        subs : `bluesky.utils.Subscribers`, optional (positional only)
            Temporary subscriptions (a.k.a. callbacks) to be used on this run.
            For convenience, any of the following are accepted:

            * a callable, which will be subscribed to 'all'
            * a list of callables, which again will be subscribed to 'all'
            * a dictionary, mapping specific subscriptions to callables or
              lists of callables; valid keys are {'all', 'start', 'stop',
              'event', 'descriptor'}

        Returns
        -------
        Future[RunEngineResult | tuple[str, ...]]
            Future object representing the result of the plan execution.

        The result contained in the future is either:
        uids : tuple
            list of uids (i.e. RunStart Document uids) of run(s)
            if :attr:`RunEngine._call_returns_result` is ``False``
        result : :class:`RunEngineResult`
            if :attr:`RunEngine._call_returns_result` is ``True``
        """
        return self._executor.submit(
            super().__call__,
            plan,
            subs,
            **metadata_kw,
        )

    def resume(self) -> Future[RunEngineResult | tuple[str, ...]]:
        """Resume the paused plan in a separate thread.

        If the plan has been paused, the initial
        future returned by ``__call__`` will be set as completed.

        With this method, the plan is resumed in a separate thread,
        and a new future is returned.

        Returns
        -------
        ``Future[RunEngineResult | tuple[str, ...]]``
            Future object representing the result of the resumed plan.
        """
        return self._executor.submit(super().resume)

    async def _wait_for_actions(self, msg: Msg) -> tuple[str, SRLatch] | None:
        """Instruct the run engine to wait for any of the given latches to be set or reset.

        Parameters
        ----------
        msg: Msg
            The message containing the latches to wait for.
            Packs a map of SRLatch in `msg.args` and a timeout in `msg.kwargs`.

            Expected message format:

            Msg("wait_for_actions", None, latches, timeout=timeout, wait_for="set")

        Returns
        -------
        tuple[str, SRLatch] | None
            A tuple containing the name and the latch that was set/reset to unblock the plan;
            None if timeout occurred before any latch changed state.
        """
        latch_map: Mapping[str, SRLatch] = msg.args[0]
        timeout: float | None = msg.kwargs.get("timeout", None)
        wait_for: Literal["set", "reset"] = msg.kwargs.get("wait_for", "set")

        # Create a mapping to track which task corresponds to which latch
        if wait_for == "set":
            latch_tasks = {
                asyncio.create_task(latch.wait_for_set(), name=name)
                for name, latch in latch_map.items()
            }
        else:
            latch_tasks = {
                asyncio.create_task(latch.wait_for_reset(), name=name)
                for name, latch in latch_map.items()
            }

        done, pending = await asyncio.wait(
            latch_tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout
        )

        # Cancel all pending tasks
        for task in pending:
            task.cancel()

        # Return the latch that changed state
        if not done:
            return None
        completed_task = done.pop()
        task_name = completed_task.get_name()
        return task_name, latch_map[task_name]

    async def _stash(self: RunEngine, msg: Msg) -> Status:
        """Instruct the run engine to stash the given readings in the model cache.

        Parameters
        ----------
        msg: Msg
            The message containing the readings to stash.
            Expected message format:

            Msg("stash", obj, readings)

            - obj: the model object to stash the readings for (implements `HasCache` protocol).
            - readings: a dict of readings to stash in the model cache.
        """
        obj = msg.obj
        if not isinstance(obj, HasCache):
            raise TypeError(
                f"Object {obj} does not support caching. It must implement the HasCache protocol."
            )
        if len(msg.args) != 1:
            raise RuntimeError(
                "Expected only one positional argument in the message after the object."
            )
        readings: dict[str, Reading[Any]] = msg.args[0]
        group: str | None = dict(msg.kwargs).get("group", None)
        if not group:
            raise RuntimeError("Expected a 'group' keyword argument in the message.")
        status = obj.stash(readings)

        self._add_status_to_group(obj, status, group, action="stash")

        return status

    async def _clear_cache(self, msg: Msg) -> Status:
        """Instruct the run engine to clear the model cache.

        Parameters
        ----------
        msg: Msg
            The message containing the object whose cache to clear.
            Expected message format:

            Msg("clear_cache", obj, name)

            - obj: the model object to clear the cache for (implements `HasCache` protocol).
            - name: the device name associated with the cache to clear.
        """
        obj = msg.obj
        if not isinstance(obj, HasCache):
            raise TypeError(
                f"Object {obj} does not support caching. It must implement the HasCache protocol."
            )
        group: str | None = dict(msg.kwargs).get("group", None)
        if not group:
            raise RuntimeError("Expected a 'group' keyword argument in the message.")
        status = obj.clear()

        self._add_status_to_group(obj, status, group, action="clear")

        return status

resume

resume() -> Future[RunEngineResult | tuple[str, ...]]

Resume the paused plan in a separate thread.

If the plan has been paused, the initial future returned by __call__ will be set as completed.

With this method, the plan is resumed in a separate thread, and a new future is returned.

Returns:

Type Description
``Future[RunEngineResult | tuple[str, ...]]``

Future object representing the result of the resumed plan.

Source code in src/redsun/engine/_wrapper.py
def resume(self) -> Future[RunEngineResult | tuple[str, ...]]:
    """Resume the paused plan in a separate thread.

    If the plan has been paused, the initial
    future returned by ``__call__`` will be set as completed.

    With this method, the plan is resumed in a separate thread,
    and a new future is returned.

    Returns
    -------
    ``Future[RunEngineResult | tuple[str, ...]]``
        Future object representing the result of the resumed plan.
    """
    return self._executor.submit(super().resume)

Status

Bases: Status

Track the status of a potentially-lengthy action like moving or triggering.

Parameters:

Name Type Description Default
timeout float | None

The amount of time to wait before marking the Status as failed. If None (default) wait forever. It is strongly encouraged to set a finite timeout. If settle_time below is set, that time is added to the effective timeout.

None
settle_time float | None

The amount of time to wait between the caller specifying that the status has completed to running callbacks. Default is 0.

0
Notes

Theory of operation:

This employs two threading.Event objects, one thread that runs for (timeout + settle_time) seconds, and one thread that runs for settle_time seconds (if settle_time is nonzero).

At init time, a timeout and settle_time are specified. A thread is started, on which user callbacks, registered after init time via add_callback, will eventually be run. The thread waits on an Event be set or (timeout + settle_time) seconds to pass, whichever happens first.

If (timeout + settle_time) expires and the Event has not been set, an internal Exception is set to StatusTimeoutError, and a second Event is set, marking the Status as done and failed. The callbacks are run.

If a callback is registered after the Status is done, it will be run immediately.

If the first Event is set before (timeout + settle_time) expires, then the second Event is set and no internal Exception is set, marking the Status as done and successful. The callbacks are run.

There are two methods that directly set the first Event. One, set_exception, sets it directly after setting the internal Exception. The other, set_finished, starts a threading.Timer that will set it after a delay (the settle_time). One of these methods may be called, and at most once. If one is called twice or if both are called, InvalidState is raised. If they are called too late to prevent a StatusTimeoutError, they are ignored but one call is still allowed. Thus, an external callback, e.g. pyepics, may reports success or failure after the Status object has expired, but to no effect because the callbacks have already been called and the program has moved on.

Source code in src/redsun/engine/_status.py
class Status(PStatus):
    """
    Track the status of a potentially-lengthy action like moving or triggering.

    Parameters
    ----------
    timeout: float, optional
        The amount of time to wait before marking the Status as failed.  If
        ``None`` (default) wait forever. It is strongly encouraged to set a
        finite timeout.  If settle_time below is set, that time is added to the
        effective timeout.
    settle_time: float, optional
        The amount of time to wait between the caller specifying that the
        status has completed to running callbacks. Default is 0.

    Notes
    -----
    Theory of operation:

    This employs two ``threading.Event`` objects, one thread that runs for
    (timeout + settle_time) seconds, and one thread that runs for
    settle_time seconds (if settle_time is nonzero).

    At __init__ time, a *timeout* and *settle_time* are specified. A thread
    is started, on which user callbacks, registered after __init__ time via
    [`add_callback`](), will eventually be run. The thread waits on an
    Event be set or (timeout + settle_time) seconds to pass, whichever
    happens first.

    If (timeout + settle_time) expires and the Event has not
    been set, an internal Exception is set to ``StatusTimeoutError``, and a
    second Event is set, marking the Status as done and failed. The
    callbacks are run.

    If a callback is registered after the Status is done, it will be run
    immediately.

    If the first Event is set before (timeout + settle_time) expires,
    then the second Event is set and no internal Exception is set, marking
    the Status as done and successful. The callbacks are run.

    There are two methods that directly set the first Event. One,
    [`set_exception`](), sets it directly after setting the internal
    Exception.  The other, [`set_finished`](), starts a
    ``threading.Timer`` that will set it after a delay (the settle_time).
    One of these methods may be called, and at most once. If one is called
    twice or if both are called, ``InvalidState`` is raised. If they are
    called too late to prevent a ``StatusTimeoutError``, they are ignored
    but one call is still allowed. Thus, an external callback, e.g. pyepics,
    may reports success or failure after the Status object has expired, but
    to no effect because the callbacks have already been called and the
    program has moved on.
    """

    def __init__(self, *, timeout: float | None = None, settle_time: float | None = 0):
        super().__init__()
        self._logger = logging.getLogger("redsun")
        self._tname = None
        self._lock = threading.RLock()
        self._event = threading.Event()  # state associated with done-ness
        self._settled_event = threading.Event()
        # "Externally initiated" means set_finished() or set_exception(exc) was
        # called, as opposed to completion via an internal timeout.
        self._externally_initiated_completion_lock = threading.Lock()
        self._externally_initiated_completion = False
        self._callbacks: deque[Callable[[Status], None]] = deque()
        self._exception: BaseException | None = None

        if settle_time is None:
            settle_time = 0.0

        self._settle_time = float(settle_time)

        if timeout is not None:
            timeout = float(timeout)
        self._timeout = timeout

        self._callback_thread = threading.Thread(
            target=self._run_callbacks, daemon=True, name=self._tname
        )
        self._callback_thread.start()

    @property
    def timeout(self) -> float | None:
        """The timeout for this action.

        This is set when the Status is created, and it cannot be changed.
        """
        return self._timeout

    @property
    def settle_time(self) -> float:
        """A delay between when [`set_finished`]() is when the Status is done.

        This is set when the Status is created, and it cannot be changed.
        """
        return self._settle_time

    @property
    def done(self) -> bool:
        """Boolean indicating whether associated operation has completed.

        This is set to True at __init__ time or by calling
        [`set_finished`](), [`set_exception`]().
        Once True, it can never become False.
        """
        return self._event.is_set()

    @property
    def success(self) -> bool:
        """Boolean indicating whether associated operation has completed.

        This is set to True at __init__ time or by calling
        [`set_finished`](), [`set_exception`]()
        . Once True, it can never become False.
        """
        return self.done and self._exception is None

    def _handle_failure(self) -> None:
        """Do something if an exception occurred during the action."""
        # TODO: implement this; maybe ophyd has a good example
        pass

    def _settled(self) -> None:
        """Connect to this this when status has completed and settled."""
        # TODO: implement this; maybe ophyd has a good example
        pass

    def _run_callbacks(self) -> None:
        """Set the Event and run the callbacks."""
        if self.timeout is None:
            timeout = None
        else:
            timeout = self.timeout + self.settle_time
        if not self._settled_event.wait(timeout):
            # We have timed out. It's possible that set_finished() has already
            # been called but we got here before the settle_time timer expired.
            # And it's possible that in this space be between the above
            # statement timing out grabbing the lock just below,
            # set_exception(exc) has been called. Both of these possibilties
            # are accounted for.
            self._logger.warning("%r has timed out", self)
            with self._externally_initiated_completion_lock:
                # Set the exception and mark the Status as done, unless
                # set_exception(exc) was called externally before we grabbed
                # the lock.
                if self._exception is None:
                    exc = StatusTimeoutError(
                        f"Status {self!r} failed to complete in specified timeout."
                    )
                    self._exception = exc
        # Mark this as "settled".
        try:
            self._settled()
        except Exception:
            # No alternative but to log this. We can't supersede set_exception,
            # and we have to continue and run the callbacks.
            self._logger.exception("%r encountered error during _settled()", self)
        # Now we know whether or not we have succeed or failed, either by
        # timeout above or by set_exception(exc), so we can set the Event that
        # will mark this Status as done.
        with self._lock:
            self._event.set()
        if self._exception is not None:
            try:
                self._handle_failure()
            except Exception:
                self._logger.exception(
                    "%r encountered an error during _handle_failure()", self
                )
        # The callbacks have access to self, from which they can distinguish
        # success or failure.
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                self._logger.exception(
                    "An error was raised on a background thread while "
                    "running the callback %r(%r).",
                    cb,
                    self,
                )
        self._callbacks.clear()

    def set_exception(self, exc: BaseException) -> None:
        """Mark as finished but failed with the given Exception.

        This method should generally not be called by the *recipient* of this
        Status object, but only by the object that created and returned it.

        Parameters
        ----------
        exc: BaseException
            The exception that caused the failure.
        """
        # Since we rely on this being raise-able later, check proactively to
        # avoid potentially very confusing failures.
        if not (isinstance(exc, BaseException)):
            # Note that Python allows `raise Exception` or raise Exception()`
            # so we allow a class or an instance here too.
            raise ValueError(f"Expected an Exception, got {exc!r}")

        # Ban certain Timeout subclasses that have special significance. This
        # would probably never come up except due to some rare user error, but
        # if it did it could be very confusing indeed!
        for exc_class in (StatusTimeoutError, WaitTimeoutError):
            if isinstance(exc, exc_class):
                raise ValueError(
                    f"{exc_class} has special significance and cannot be set "
                    "as the exception. Use a plain TimeoutError or some other "
                    "subclass thereof."
                )

        with self._externally_initiated_completion_lock:
            if self._externally_initiated_completion:
                raise InvalidState(
                    "Either set_finished() or set_exception() has "
                    f"already been called on {self!r}"
                )
            self._externally_initiated_completion = True
            if isinstance(self._exception, StatusTimeoutError):
                # We have already timed out.
                return
            self._exception = exc
            self._settled_event.set()

    def set_finished(self) -> None:
        """Mark as finished successfully.

        This method should generally not be called by the *recipient* of this
        Status object, but only by the object that created and returned it.
        """
        with self._externally_initiated_completion_lock:
            if self._externally_initiated_completion:
                raise InvalidState(
                    "Either set_finished() or set_exception() has "
                    f"already been called on {self!r}"
                )
            self._externally_initiated_completion = True
        # Note that in either case, the callbacks themselves are run from the
        # same thread. This just sets an Event, either from this thread (the
        # one calling set_finished) or the thread created below.
        if self.settle_time > 0:
            threading.Timer(self.settle_time, self._settled_event.set).start()
        else:
            self._settled_event.set()

    def exception(self, timeout: float | None = None) -> BaseException | None:
        """Return the exception raised by the action.

        If the action has completed successfully, return ``None``. If it has
        finished in error, return the exception.

        Parameters
        ----------
        timeout: float | None, optional
            If None (default) wait indefinitely until the status finishes.

        Returns
        -------
        Exception:
            The exception raised by the action. If the action has completed
            successfully, return ``None``.

        Raises
        ------
        WaitTimeoutError
            If the status has not completed within ``timeout`` (starting from
            when this method was called, not from the beginning of the action).
        """
        if not self._event.wait(timeout=timeout):
            raise WaitTimeoutError("Status has not completed yet.")
        return self._exception

    def wait(self, timeout: float | None = None) -> None:
        """Block until the action completes.

        When the action has finished succesfully, return ``None``. If the
        action has failed, raise the exception.

        Parameters
        ----------
        timeout: ``float``, optional
            If ``None`` (default) wait indefinitely until the status finishes.

        Raises
        ------
        WaitTimeoutError
            If the status has not completed within ``timeout`` (starting from
            when this method was called, not from the beginning of the action).
        StatusTimeoutError
            If the status has failed because the *timeout* that it was
            initialized with has expired.
        Exception
            This is ``status.exception()``, raised if the status has finished
            with an error.  This may include ``TimeoutError``, which
            indicates that the action itself raised ``TimeoutError``, distinct
            from ``WaitTimeoutError`` above.
        """
        if not self._event.wait(timeout=timeout):
            raise WaitTimeoutError("Status has not completed yet.")
        if self._exception is not None:
            raise self._exception

    @property
    def callbacks(self) -> deque[Callable[[Status], None]]:
        """Callbacks to be run when the status is marked as finished."""
        return self._callbacks

    def add_callback(self, callback: Callable[[Status], None]) -> None:
        """Register a callback to be called once when the Status finishes.

        The callback will be called exactly once. If the Status is finished
        before a callback is added, it will be called immediately. This is
        threadsafe.
        The callback will be called regardless of success of failure. The
        callback has access to this status object, so it can distinguish success
        or failure by inspecting the object.

        Parameters
        ----------
        callback: ``Callable[[Status], None]``
            The callback to be called when the status is marked as finished.

            ```python
            def callback(status: Status) -> None:
                # Do something with the status object
                ...
            ```
        """
        with self._lock:
            if self.done:
                # Call it once and do not hold a reference to it.
                callback(self)
            else:
                # Hold a strong reference to this. In other contexts we tend to
                # hold weak references to callbacks, but this is a single-shot
                # callback, so we will hold a strong reference until we call it,
                # and then clear this cache to drop the reference(s).
                self._callbacks.append(callback)

timeout property

timeout: float | None

The timeout for this action.

This is set when the Status is created, and it cannot be changed.

settle_time property

settle_time: float

A delay between when set_finished is when the Status is done.

This is set when the Status is created, and it cannot be changed.

done property

done: bool

Boolean indicating whether associated operation has completed.

This is set to True at init time or by calling set_finished, set_exception. Once True, it can never become False.

success property

success: bool

Boolean indicating whether associated operation has completed.

This is set to True at init time or by calling set_finished, set_exception . Once True, it can never become False.

callbacks property

callbacks: deque[Callable[[Status], None]]

Callbacks to be run when the status is marked as finished.

set_exception

set_exception(exc: BaseException) -> None

Mark as finished but failed with the given Exception.

This method should generally not be called by the recipient of this Status object, but only by the object that created and returned it.

Parameters:

Name Type Description Default
exc BaseException

The exception that caused the failure.

required
Source code in src/redsun/engine/_status.py
def set_exception(self, exc: BaseException) -> None:
    """Mark as finished but failed with the given Exception.

    This method should generally not be called by the *recipient* of this
    Status object, but only by the object that created and returned it.

    Parameters
    ----------
    exc: BaseException
        The exception that caused the failure.
    """
    # Since we rely on this being raise-able later, check proactively to
    # avoid potentially very confusing failures.
    if not (isinstance(exc, BaseException)):
        # Note that Python allows `raise Exception` or raise Exception()`
        # so we allow a class or an instance here too.
        raise ValueError(f"Expected an Exception, got {exc!r}")

    # Ban certain Timeout subclasses that have special significance. This
    # would probably never come up except due to some rare user error, but
    # if it did it could be very confusing indeed!
    for exc_class in (StatusTimeoutError, WaitTimeoutError):
        if isinstance(exc, exc_class):
            raise ValueError(
                f"{exc_class} has special significance and cannot be set "
                "as the exception. Use a plain TimeoutError or some other "
                "subclass thereof."
            )

    with self._externally_initiated_completion_lock:
        if self._externally_initiated_completion:
            raise InvalidState(
                "Either set_finished() or set_exception() has "
                f"already been called on {self!r}"
            )
        self._externally_initiated_completion = True
        if isinstance(self._exception, StatusTimeoutError):
            # We have already timed out.
            return
        self._exception = exc
        self._settled_event.set()

set_finished

set_finished() -> None

Mark as finished successfully.

This method should generally not be called by the recipient of this Status object, but only by the object that created and returned it.

Source code in src/redsun/engine/_status.py
def set_finished(self) -> None:
    """Mark as finished successfully.

    This method should generally not be called by the *recipient* of this
    Status object, but only by the object that created and returned it.
    """
    with self._externally_initiated_completion_lock:
        if self._externally_initiated_completion:
            raise InvalidState(
                "Either set_finished() or set_exception() has "
                f"already been called on {self!r}"
            )
        self._externally_initiated_completion = True
    # Note that in either case, the callbacks themselves are run from the
    # same thread. This just sets an Event, either from this thread (the
    # one calling set_finished) or the thread created below.
    if self.settle_time > 0:
        threading.Timer(self.settle_time, self._settled_event.set).start()
    else:
        self._settled_event.set()

exception

exception(
    timeout: float | None = None,
) -> BaseException | None

Return the exception raised by the action.

If the action has completed successfully, return None. If it has finished in error, return the exception.

Parameters:

Name Type Description Default
timeout float | None

If None (default) wait indefinitely until the status finishes.

None

Returns:

Name Type Description
Exception BaseException | None

The exception raised by the action. If the action has completed successfully, return None.

Raises:

Type Description
WaitTimeoutError

If the status has not completed within timeout (starting from when this method was called, not from the beginning of the action).

Source code in src/redsun/engine/_status.py
def exception(self, timeout: float | None = None) -> BaseException | None:
    """Return the exception raised by the action.

    If the action has completed successfully, return ``None``. If it has
    finished in error, return the exception.

    Parameters
    ----------
    timeout: float | None, optional
        If None (default) wait indefinitely until the status finishes.

    Returns
    -------
    Exception:
        The exception raised by the action. If the action has completed
        successfully, return ``None``.

    Raises
    ------
    WaitTimeoutError
        If the status has not completed within ``timeout`` (starting from
        when this method was called, not from the beginning of the action).
    """
    if not self._event.wait(timeout=timeout):
        raise WaitTimeoutError("Status has not completed yet.")
    return self._exception

wait

wait(timeout: float | None = None) -> None

Block until the action completes.

When the action has finished succesfully, return None. If the action has failed, raise the exception.

Parameters:

Name Type Description Default
timeout float | None

If None (default) wait indefinitely until the status finishes.

None

Raises:

Type Description
WaitTimeoutError

If the status has not completed within timeout (starting from when this method was called, not from the beginning of the action).

StatusTimeoutError

If the status has failed because the timeout that it was initialized with has expired.

Exception

This is status.exception(), raised if the status has finished with an error. This may include TimeoutError, which indicates that the action itself raised TimeoutError, distinct from WaitTimeoutError above.

Source code in src/redsun/engine/_status.py
def wait(self, timeout: float | None = None) -> None:
    """Block until the action completes.

    When the action has finished succesfully, return ``None``. If the
    action has failed, raise the exception.

    Parameters
    ----------
    timeout: ``float``, optional
        If ``None`` (default) wait indefinitely until the status finishes.

    Raises
    ------
    WaitTimeoutError
        If the status has not completed within ``timeout`` (starting from
        when this method was called, not from the beginning of the action).
    StatusTimeoutError
        If the status has failed because the *timeout* that it was
        initialized with has expired.
    Exception
        This is ``status.exception()``, raised if the status has finished
        with an error.  This may include ``TimeoutError``, which
        indicates that the action itself raised ``TimeoutError``, distinct
        from ``WaitTimeoutError`` above.
    """
    if not self._event.wait(timeout=timeout):
        raise WaitTimeoutError("Status has not completed yet.")
    if self._exception is not None:
        raise self._exception

add_callback

add_callback(callback: Callable[[Status], None]) -> None

Register a callback to be called once when the Status finishes.

The callback will be called exactly once. If the Status is finished before a callback is added, it will be called immediately. This is threadsafe. The callback will be called regardless of success of failure. The callback has access to this status object, so it can distinguish success or failure by inspecting the object.

Parameters:

Name Type Description Default
callback Callable[[Status], None]

The callback to be called when the status is marked as finished.

def callback(status: Status) -> None:
    # Do something with the status object
    ...
required
Source code in src/redsun/engine/_status.py
def add_callback(self, callback: Callable[[Status], None]) -> None:
    """Register a callback to be called once when the Status finishes.

    The callback will be called exactly once. If the Status is finished
    before a callback is added, it will be called immediately. This is
    threadsafe.
    The callback will be called regardless of success of failure. The
    callback has access to this status object, so it can distinguish success
    or failure by inspecting the object.

    Parameters
    ----------
    callback: ``Callable[[Status], None]``
        The callback to be called when the status is marked as finished.

        ```python
        def callback(status: Status) -> None:
            # Do something with the status object
            ...
        ```
    """
    with self._lock:
        if self.done:
            # Call it once and do not hold a reference to it.
            callback(self)
        else:
            # Hold a strong reference to this. In other contexts we tend to
            # hold weak references to callbacks, but this is a single-shot
            # callback, so we will hold a strong reference until we call it,
            # and then clear this cache to drop the reference(s).
            self._callbacks.append(callback)

Actions

Engine actions: decorators and types for continuous, interactive plans.

A continuous plan is one that runs in an infinite loop until explicitly stopped, and may support pause/resume and in-flight actions (user-triggered side effects while the plan is running).

This module provides:

  • SRLatch — an asyncio set-reset latch used to synchronise plan execution with external signals.
  • continous — a decorator that marks a plan function as continuous and records its togglable and pausable capabilities.
  • Action — a dataclass carrying metadata (name, description, toggle state) for a single in-flight action.
  • ContinousPlan — a typing.Protocol used for static typing and runtime isinstance checks on decorated plans.

Action dataclass

Metadata for an in-flight action on a continuous plan.

An Action is a user-triggerable side effect that can be fired while a continuous plan is running. It encapsulates an SRLatch synchronisation primitive so the plan can await the action being triggered.

Warning

The internal SRLatch is created lazily on first access of event_map, so Action objects can be constructed without a running event loop. The latch must only be accessed from within a plan.

Subclassable to add additional fields for domain-specific use cases.

Parameters:

Name Type Description Default
name str

Name of the action.

required
description str

Brief description of the action, usable as UI tooltip.

''
togglable bool

Whether the action is togglable or not.

False
toggle_states tuple[str, str]

Labels for the toggle states (on, off). Only used if togglable is True.

('On', 'Off')
Source code in src/redsun/engine/actions.py
@dataclass(kw_only=True)
class Action:
    """Metadata for an in-flight action on a continuous plan.

    An `Action` is a user-triggerable side effect that can be fired while a
    continuous plan is running.  It encapsulates an `SRLatch` synchronisation
    primitive so the plan can ``await`` the action being triggered.

    !!! warning
        The internal `SRLatch` is created lazily on first access of
        `event_map`, so `Action` objects can be constructed without a running
        event loop.  The latch must only be accessed from within a plan.

    Subclassable to add additional fields for domain-specific use cases.
    """

    name: str
    """Name of the action."""

    description: str = field(default="")
    """Brief description of the action, usable as UI tooltip."""

    togglable: bool = field(default=False)
    """Whether the action is togglable or not."""

    toggle_states: tuple[str, str] = field(default=("On", "Off"))
    """Labels for the toggle states (on, off). Only used if `togglable` is True."""

    _latch: SRLatch | None = field(init=False, default=None, repr=False)

    @property
    def event_map(self) -> dict[str, SRLatch]:
        """Return the latch for this action as a single-entry dict keyed by name."""
        if not self._latch:
            self._latch = SRLatch()
        return {self.name: self._latch}

name instance-attribute

name: str

Name of the action.

description class-attribute instance-attribute

description: str = field(default='')

Brief description of the action, usable as UI tooltip.

togglable class-attribute instance-attribute

togglable: bool = field(default=False)

Whether the action is togglable or not.

toggle_states class-attribute instance-attribute

toggle_states: tuple[str, str] = field(
    default=("On", "Off")
)

Labels for the toggle states (on, off). Only used if togglable is True.

event_map property

event_map: dict[str, SRLatch]

Return the latch for this action as a single-entry dict keyed by name.

SRLatch

An asyncio Event-like object that behaves as a set-reset latch.

Wraps two asyncio.Event objects to allow waiting for either the set or the reset state of the latch. At construction the latch starts in the reset state.

Source code in src/redsun/engine/actions.py
class SRLatch:
    """An asyncio Event-like object that behaves as a set-reset latch.

    Wraps two `asyncio.Event` objects to allow waiting for either the *set*
    or the *reset* state of the latch.  At construction the latch starts in
    the **reset** state.
    """

    def __init__(self) -> None:
        self._flag: bool = False
        self._set_event: asyncio.Event = asyncio.Event()
        self._reset_event: asyncio.Event = asyncio.Event()
        self._reset_event.set()

    def set(self) -> None:
        """Set the internal flag to True.

        All coroutines waiting in `wait_for_set` are awakened.
        No-op if the flag is already set.
        """
        if not self._flag:
            self._flag = True
            self._set_event.set()
            self._reset_event.clear()

    def reset(self) -> None:
        """Reset the internal flag to False.

        All coroutines waiting in `wait_for_reset` are awakened.
        No-op if the flag is already reset.
        """
        if self._flag:
            self._flag = False
            self._reset_event.set()
            self._set_event.clear()

    def is_set(self) -> bool:
        """Return True if the internal flag is set, False otherwise."""
        return self._flag

    async def wait_for_set(self) -> None:
        """Wait until the internal flag is set.

        Returns immediately if the flag is already set; otherwise blocks
        until another coroutine calls `set`.
        """
        if self._flag:
            return
        await self._set_event.wait()

    async def wait_for_reset(self) -> None:
        """Wait until the internal flag is reset.

        Returns immediately if the flag is already reset; otherwise blocks
        until another coroutine calls `reset`.
        """
        if not self._flag:
            return
        await self._reset_event.wait()

set

set() -> None

Set the internal flag to True.

All coroutines waiting in wait_for_set are awakened. No-op if the flag is already set.

Source code in src/redsun/engine/actions.py
def set(self) -> None:
    """Set the internal flag to True.

    All coroutines waiting in `wait_for_set` are awakened.
    No-op if the flag is already set.
    """
    if not self._flag:
        self._flag = True
        self._set_event.set()
        self._reset_event.clear()

reset

reset() -> None

Reset the internal flag to False.

All coroutines waiting in wait_for_reset are awakened. No-op if the flag is already reset.

Source code in src/redsun/engine/actions.py
def reset(self) -> None:
    """Reset the internal flag to False.

    All coroutines waiting in `wait_for_reset` are awakened.
    No-op if the flag is already reset.
    """
    if self._flag:
        self._flag = False
        self._reset_event.set()
        self._set_event.clear()

is_set

is_set() -> bool

Return True if the internal flag is set, False otherwise.

Source code in src/redsun/engine/actions.py
def is_set(self) -> bool:
    """Return True if the internal flag is set, False otherwise."""
    return self._flag

wait_for_set async

wait_for_set() -> None

Wait until the internal flag is set.

Returns immediately if the flag is already set; otherwise blocks until another coroutine calls set.

Source code in src/redsun/engine/actions.py
async def wait_for_set(self) -> None:
    """Wait until the internal flag is set.

    Returns immediately if the flag is already set; otherwise blocks
    until another coroutine calls `set`.
    """
    if self._flag:
        return
    await self._set_event.wait()

wait_for_reset async

wait_for_reset() -> None

Wait until the internal flag is reset.

Returns immediately if the flag is already reset; otherwise blocks until another coroutine calls reset.

Source code in src/redsun/engine/actions.py
async def wait_for_reset(self) -> None:
    """Wait until the internal flag is reset.

    Returns immediately if the flag is already reset; otherwise blocks
    until another coroutine calls `reset`.
    """
    if not self._flag:
        return
    await self._reset_event.wait()

ContinousPlan

Bases: Protocol[P, R_co]

Protocol for plans decorated with continous.

Used both for static typing (as the return type of the continous decorator) and for runtime isinstance checks:

if isinstance(f, ContinousPlan):
    print(f.__togglable__, f.__pausable__)

Attributes:

Name Type Description
__togglable__ bool

Whether the plan is togglable (i.e. runs as an infinite loop that the run engine can stop).

__pausable__ bool

Whether the plan can be paused and resumed by the run engine.

Source code in src/redsun/engine/actions.py
@runtime_checkable
class ContinousPlan(Protocol[P, R_co]):
    """Protocol for plans decorated with `continous`.

    Used both for static typing (as the return type of the `continous`
    decorator) and for runtime ``isinstance`` checks:

    ```python
    if isinstance(f, ContinousPlan):
        print(f.__togglable__, f.__pausable__)
    ```

    Attributes
    ----------
    __togglable__ : bool
        Whether the plan is togglable (i.e. runs as an infinite loop that
        the run engine can stop).
    __pausable__ : bool
        Whether the plan can be paused and resumed by the run engine.
    """

    __togglable__: bool
    __pausable__: bool

    @abstractmethod
    def __call__(  # noqa: D102
        self, *args: P.args, **kwargs: P.kwargs
    ) -> R_co:  # pragma: no cover - protocol
        ...

continous

continous(
    func: Callable[P, R_co],
) -> ContinousPlan[P, R_co]
continous(
    *, togglable: bool = True, pausable: bool = False
) -> Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]]
continous(
    func: Callable[P, R_co] | None = None,
    /,
    *,
    togglable: bool = True,
    pausable: bool = False,
) -> (
    Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]]
    | ContinousPlan[P, R_co]
)

Mark a plan as continuous.

A continuous plan informs the view to provide UI controls that allow the user to start, stop, pause, and resume plan execution.

Can be used with or without arguments:

@continous
def my_plan() -> MsgGenerator[None]: ...


@continous(togglable=True, pausable=True)
def my_plan(detectors: Sequence[DetectorProtocol]) -> MsgGenerator[None]: ...

Parameters:

Name Type Description Default
togglable bool

Whether the plan runs as an infinite loop that the run engine can stop via a toggle button. Default is True.

True
pausable bool

Whether the plan can be paused and resumed by the run engine. Default is False.

False

Returns:

Type Description
ContinousPlan

The decorated plan function, typed as a ContinousPlan.

Notes

The decorator does not modify the function signature. It stores togglable and pausable as attributes on the function object (__togglable__ and __pausable__), to be retrieved later by create_plan_spec.

Source code in src/redsun/engine/actions.py
def continous(
    func: Callable[P, R_co] | None = None,
    /,
    *,
    togglable: bool = True,
    pausable: bool = False,
) -> Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]] | ContinousPlan[P, R_co]:
    """Mark a plan as continuous.

    A *continuous* plan informs the view to provide UI controls that allow
    the user to start, stop, pause, and resume plan execution.

    Can be used with or without arguments:

    ```python
    @continous
    def my_plan() -> MsgGenerator[None]: ...


    @continous(togglable=True, pausable=True)
    def my_plan(detectors: Sequence[DetectorProtocol]) -> MsgGenerator[None]: ...
    ```

    Parameters
    ----------
    togglable : bool, optional
        Whether the plan runs as an infinite loop that the run engine can
        stop via a toggle button. Default is True.
    pausable : bool, optional
        Whether the plan can be paused and resumed by the run engine.
        Default is False.

    Returns
    -------
    ContinousPlan
        The decorated plan function, typed as a `ContinousPlan`.

    Notes
    -----
    The decorator does not modify the function signature. It stores
    ``togglable`` and ``pausable`` as attributes on the function object
    (``__togglable__`` and ``__pausable__``), to be retrieved later by
    `create_plan_spec`.
    """

    def decorator(func: Callable[P, R_co]) -> ContinousPlan[P, R_co]:
        setattr(func, "__togglable__", togglable)
        setattr(func, "__pausable__", pausable)
        return cast("ContinousPlan[P, R_co]", func)

    if func is None:
        return decorator

    return decorator(func)

Plan stubs

Custom Bluesky plan stubs for redsun plans.

These stubs extend the standard bluesky.plan_stubs with redsun-specific operations such as device-cache management (stash, clear_cache) and action-based flow control (wait_for_actions, read_while_waiting).

All functions are generator functions that yield Msg objects and are intended to be composed inside larger Bluesky plans via yield from.

set_property

set_property(
    obj: Movable[Any],
    value: Any,
    /,
    propr: str,
    timeout: float | None = None,
) -> MsgGenerator[Status]

Set a property of a Movable object and wait for completion.

Parameters:

Name Type Description Default
obj Movable[Any]

The movable object whose property will be set.

required
value Any

The value to set.

required
propr str

The property name to set (keyword-only).

required
timeout float | None

Maximum time in seconds to wait for completion. None means wait indefinitely. Default is None.

None

Yields:

Type Description
Msg

A set message followed by a wait message.

Returns:

Type Description
Status

The status object returned by the set operation.

Source code in src/redsun/engine/plan_stubs.py
def set_property(
    obj: Movable[Any],
    value: Any,
    /,
    propr: str,
    timeout: float | None = None,
) -> MsgGenerator[Status]:
    """Set a property of a `Movable` object and wait for completion.

    Parameters
    ----------
    obj : Movable[Any]
        The movable object whose property will be set.
    value : Any
        The value to set.
    propr : str
        The property name to set (keyword-only).
    timeout : float | None, optional
        Maximum time in seconds to wait for completion.
        None means wait indefinitely. Default is None.

    Yields
    ------
    Msg
        A ``set`` message followed by a ``wait`` message.

    Returns
    -------
    Status
        The status object returned by the ``set`` operation.
    """
    group = str(uuid.uuid4())
    status: Status = yield Msg("set", obj, value, group=group, propr=propr)
    yield Msg("wait", None, group=group, timeout=timeout)
    return status

wait_for_actions

wait_for_actions(
    events: Mapping[str, SRLatch],
    timeout: float = 0.001,
    wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch] | None]

Wait for any of the given latches to change state.

Plan execution blocks until one latch transitions; background tasks continue running normally.

Parameters:

Name Type Description Default
events Mapping[str, SRLatch]

Mapping of action names to their SRLatch objects.

required
timeout float

Maximum time in seconds to wait before returning None. Default is 0.001 seconds.

0.001
wait_for Literal['set', 'reset']

Whether to wait for a latch to be set or reset. Default is "set".

'set'

Returns:

Type Description
tuple[str, SRLatch] | None

The name and latch that changed state, or None if the timeout elapsed without any latch transitioning.

Source code in src/redsun/engine/plan_stubs.py
def wait_for_actions(
    events: Mapping[str, SRLatch],
    timeout: float = 0.001,
    wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch] | None]:
    """Wait for any of the given latches to change state.

    Plan execution blocks until one latch transitions; background tasks
    continue running normally.

    Parameters
    ----------
    events : Mapping[str, SRLatch]
        Mapping of action names to their `SRLatch` objects.
    timeout : float, optional
        Maximum time in seconds to wait before returning None.
        Default is 0.001 seconds.
    wait_for : Literal["set", "reset"], optional
        Whether to wait for a latch to be set or reset.
        Default is ``"set"``.

    Returns
    -------
    tuple[str, SRLatch] | None
        The name and latch that changed state, or None if the timeout
        elapsed without any latch transitioning.
    """
    ret: tuple[str, SRLatch] | None = yield Msg(
        "wait_for_actions", None, events, timeout=timeout, wait_for=wait_for
    )
    return ret

read_while_waiting

read_while_waiting(
    objs: Sequence[Readable[Any]],
    events: Mapping[str, SRLatch],
    stream_name: str = "primary",
    refresh_period: float = SIXTY_FPS,
    wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch]]

Repeatedly trigger and read devices until an action latch changes state.

On each iteration the plan triggers and reads all objects in objs, then checks whether any latch in events has changed state. The loop repeats at refresh_period until a latch transitions.

Parameters:

Name Type Description Default
objs Sequence[Readable[Any]]

Devices to trigger and read on each iteration.

required
events Mapping[str, SRLatch]

Mapping of action names to SRLatch objects to monitor.

required
stream_name str

Name of the Bluesky stream to collect data into. Default is "primary".

'primary'
refresh_period float

Polling period in seconds. Default is 1/60 s (60 Hz).

SIXTY_FPS
wait_for Literal['set', 'reset']

Whether to wait for a latch to be set or reset. Default is "set".

'set'

Returns:

Type Description
tuple[str, SRLatch]

The name and latch that unblocked the loop.

Source code in src/redsun/engine/plan_stubs.py
def read_while_waiting(
    objs: Sequence[Readable[Any]],
    events: Mapping[str, SRLatch],
    stream_name: str = "primary",
    refresh_period: float = SIXTY_FPS,
    wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch]]:
    """Repeatedly trigger and read devices until an action latch changes state.

    On each iteration the plan triggers and reads all objects in *objs*, then
    checks whether any latch in *events* has changed state.  The loop repeats
    at *refresh_period* until a latch transitions.

    Parameters
    ----------
    objs : Sequence[Readable[Any]]
        Devices to trigger and read on each iteration.
    events : Mapping[str, SRLatch]
        Mapping of action names to `SRLatch` objects to monitor.
    stream_name : str, optional
        Name of the Bluesky stream to collect data into.
        Default is ``"primary"``.
    refresh_period : float, optional
        Polling period in seconds. Default is 1/60 s (60 Hz).
    wait_for : Literal["set", "reset"], optional
        Whether to wait for a latch to be set or reset.
        Default is ``"set"``.

    Returns
    -------
    tuple[str, SRLatch]
        The name and latch that unblocked the loop.
    """
    event: tuple[str, SRLatch] | None = None
    while event is None:
        yield from bps.checkpoint()
        event = yield from wait_for_actions(
            events, timeout=refresh_period, wait_for=wait_for
        )
        yield from bps.trigger_and_read(objs, name=stream_name)
    return event

read_and_stash

read_and_stash(
    objs: Sequence[Readable[Any]],
    cache_objs: Sequence[HasCache],
    *,
    stream: str = "primary",
    group: str | None = None,
    wait: bool = False,
) -> MsgGenerator[dict[str, Reading[Any]]]

Trigger, read, and stash readings from one or more devices.

Triggers all Triggerable devices in objs, reads each device, stashes the reading into the corresponding HasCache object in cache_objs, and emits a Bluesky create/save pair to record the event.

Parameters:

Name Type Description Default
objs Sequence[Readable[Any]]

Devices to read from.

required
cache_objs Sequence[HasCache]

Cache objects paired with objs (same order) to stash readings into.

required
stream str

Bluesky stream name. Default is "primary".

'primary'
group str | None

Identifier for the stash group. Auto-generated if None.

None
wait bool

Whether to wait for all stash operations to complete before returning. Default is False.

False

Returns:

Type Description
dict[str, Reading[Any]]

Combined readings from all devices.

Source code in src/redsun/engine/plan_stubs.py
def read_and_stash(
    objs: Sequence[Readable[Any]],
    cache_objs: Sequence[HasCache],
    *,
    stream: str = "primary",
    group: str | None = None,
    wait: bool = False,
) -> MsgGenerator[dict[str, Reading[Any]]]:
    """Trigger, read, and stash readings from one or more devices.

    Triggers all `Triggerable` devices in *objs*, reads each device, stashes
    the reading into the corresponding `HasCache` object in *cache_objs*, and
    emits a Bluesky ``create``/``save`` pair to record the event.

    Parameters
    ----------
    objs : Sequence[Readable[Any]]
        Devices to read from.
    cache_objs : Sequence[HasCache]
        Cache objects paired with *objs* (same order) to stash readings into.
    stream : str, optional
        Bluesky stream name. Default is ``"primary"``.
    group : str | None, optional
        Identifier for the stash group. Auto-generated if None.
    wait : bool, optional
        Whether to wait for all stash operations to complete before
        returning. Default is False.

    Returns
    -------
    dict[str, Reading[Any]]
        Combined readings from all devices.
    """

    def inner_trigger() -> MsgGenerator[None]:
        grp = short_uid("trigger")
        no_wait = True
        for obj in objs:
            if isinstance(obj, Triggerable):
                no_wait = False
                yield from bps.trigger(obj, group=grp)
        if not no_wait:
            yield from bps.wait(group=grp)

    ret: dict[str, Reading[Any]] = {}

    if any(isinstance(obj, Triggerable) for obj in objs):
        yield from inner_trigger()

    yield from bps.create(stream)
    for obj, cache_obj in zip(objs, cache_objs):
        reading = yield from bps.read(obj)
        yield from stash(cache_obj, reading, group=group, wait=wait)
        ret.update(reading)

    yield from bps.save()
    return ret

stash

stash(
    obj: HasCache,
    reading: dict[str, Reading[Any]],
    *,
    group: str | None,
    wait: bool,
) -> MsgGenerator[None]

Stash a reading into a HasCache device.

Parameters:

Name Type Description Default
obj HasCache

The cache object to stash the reading into.

required
reading dict[str, Reading[Any]]

The reading to stash, typically from bps.read.

required
group str | None

Identifier for the stash group. A unique id is generated if None.

required
wait bool

Whether to wait for the stash operation to complete.

required
Source code in src/redsun/engine/plan_stubs.py
def stash(
    obj: HasCache,
    reading: dict[str, Reading[Any]],
    *,
    group: str | None,
    wait: bool,
) -> MsgGenerator[None]:
    """Stash a reading into a `HasCache` device.

    Parameters
    ----------
    obj : HasCache
        The cache object to stash the reading into.
    reading : dict[str, Reading[Any]]
        The reading to stash, typically from `bps.read`.
    group : str | None
        Identifier for the stash group. A unique id is generated if None.
    wait : bool
        Whether to wait for the stash operation to complete.
    """
    if not group:
        group = short_uid("stash")

    yield Msg("stash", obj, reading, group=group)
    if wait:
        yield from bps.wait(group=group)

clear_cache

clear_cache(
    obj: HasCache,
    *,
    group: str | None = None,
    wait: bool = False,
) -> MsgGenerator[None]

Clear the cache of a HasCache device.

Parameters:

Name Type Description Default
obj HasCache

The cache object to clear.

required
group str | None

Identifier for the clear operation. Auto-generated if None.

None
wait bool

Whether to wait for the clear operation to complete. Default is False.

False
Source code in src/redsun/engine/plan_stubs.py
def clear_cache(
    obj: HasCache, *, group: str | None = None, wait: bool = False
) -> MsgGenerator[None]:
    """Clear the cache of a `HasCache` device.

    Parameters
    ----------
    obj : HasCache
        The cache object to clear.
    group : str | None, optional
        Identifier for the clear operation. Auto-generated if None.
    wait : bool, optional
        Whether to wait for the clear operation to complete.
        Default is False.
    """
    if not group:
        group = short_uid("clear_cache")

    yield Msg("clear_cache", obj, group=group)
    if wait:
        yield from bps.wait(group=group)

describe

describe(
    obj: Readable[Any],
) -> MsgGenerator[dict[str, Descriptor]]

Gather the descriptor from a Readable device.

Parameters:

Name Type Description Default
obj Readable[Any]

The device to describe.

required

Returns:

Type Description
dict[str, Descriptor]

The descriptor dict returned by obj.describe().

Source code in src/redsun/engine/plan_stubs.py
def describe(
    obj: Readable[Any],
) -> MsgGenerator[dict[str, Descriptor]]:
    """Gather the descriptor from a `Readable` device.

    Parameters
    ----------
    obj : Readable[Any]
        The device to describe.

    Returns
    -------
    dict[str, Descriptor]
        The descriptor dict returned by ``obj.describe()``.
    """

    async def _describe() -> dict[str, Descriptor]:
        return await maybe_await(obj.describe())

    task: list[asyncio.Task[dict[str, Descriptor]]] = yield from bps.wait_for(
        [_describe]
    )
    result = task[0].result()
    return result

describe_collect

describe_collect(
    obj: Collectable,
) -> MsgGenerator[
    dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
]

Gather descriptors from a Collectable device.

Parameters:

Name Type Description Default
obj Collectable

The device to describe.

required

Returns:

Type Description
dict[str, Descriptor] | dict[str, dict[str, Descriptor]]

The descriptor dict returned by obj.describe_collect().

Source code in src/redsun/engine/plan_stubs.py
def describe_collect(
    obj: Collectable,
) -> MsgGenerator[dict[str, Descriptor] | dict[str, dict[str, Descriptor]]]:
    """Gather descriptors from a `Collectable` device.

    Parameters
    ----------
    obj : Collectable
        The device to describe.

    Returns
    -------
    dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
        The descriptor dict returned by ``obj.describe_collect()``.
    """

    async def _describe_collect() -> (
        dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
    ):
        return await maybe_await(obj.describe_collect())

    task: list[
        asyncio.Task[dict[str, Descriptor] | dict[str, dict[str, Descriptor]]]
    ] = yield from bps.wait_for([_describe_collect])
    result = task[0].result()

    return result