Engine¶
Run engine¶
RunEngine
¶
Bases: RunEngine
The Run Engine execute messages and emits Documents.
This is a wrapper for the bluesky.run_engine.RunEngine class that
allows execution without blocking the main thread.
The main difference is that the __call__ method
is executed in a separate thread,
and it returns a concurrent.futures.Future object
representing the result of the plan execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
md
|
dict[str, Any]
|
The default is a standard Python dictionary, but fancier
objects can be used to store long-term history and persist
it between sessions. The standard configuration
instantiates a Run Engine with historydict.HistoryDict, a
simple interface to a sqlite file. Any object supporting
|
None
|
loop
|
AbstractEventLoop
|
An asyncio event loop to be used for executing plans. If not provided,
the RunEngine will create a new event loop using |
get_shared_loop()
|
preprocessors
|
list
|
Generator functions that take in a plan (generator instance) and
modify its messages on the way out. Suitable examples include
the functions in the module |
None
|
md_validator
|
Callable[dict[str, Any], None]
|
a function that raises and prevents starting a run if it deems the metadata to be invalid or incomplete Function should raise if md is invalid. What that means is completely up to the user. The function's return value is ignored. |
None
|
md_normalizer
|
Callable[dict[str, Any], dict[str, Any]]
|
a function that, similar to md_validator, raises and prevents starting a run if it deems the metadata to be invalid or incomplete. If it succeeds, it returns the normalized/transformed version of the original metadata. Function should raise if md is invalid. What that means is completely up to the user. Expected return: normalized metadata |
None
|
scan_id_source
|
Callable[dict[str, Any], int | Awaitable[int]]
|
a (possibly async) function that will be used to calculate scan_id. Default is to increment scan_id by 1 each time. However you could pass in a customized function to get a scan_id from any source. Expected return: updated scan_id value |
default_scan_id_source
|
call_returns_result
|
bool
|
A flag that controls the return value of |
True
|
Attributes:
| Name | Type | Description |
|---|---|---|
md |
Direct access to the dict-like persistent storage described above |
|
record_interruptions |
False by default. Set to True to generate an extra event stream that records any interruptions (pauses, suspensions). |
|
state |
{'idle', 'running', 'paused'} |
|
suspenders |
Read-only collection of |
|
preprocessors |
list
|
Generator functions that take in a plan (generator instance) and
modify its messages on the way out. Suitable examples include
the functions in the module |
msg_hook |
Callable that receives all messages before they are processed
(useful for logging or other development purposes); expected
signature is |
|
state_hook |
Callable with signature |
|
waiting_hook |
Callable with signature |
|
ignore_callback_exceptions |
Boolean, False by default. |
|
call_returns_result |
Boolean, False by default. If False, RunEngine will return uuid list after running a plan. If True, RunEngine will return a RunEngineResult object that contains the plan result, error status, and uuid list. |
|
loop |
asyncio event loop
|
e.g., |
max_depth |
Maximum stack depth; set this to prevent users from calling the RunEngine inside a function (which can result in unexpected behavior and breaks introspection tools). Default is None. For built-in Python interpreter, set to 2. For IPython, set to 11 (tested on IPython 5.1.0; other versions may vary). |
|
pause_msg |
str
|
The message printed when a run is interrupted. This message
includes instructions of changing the state of the RunEngine.
It is set to |
commands |
The list of commands available to Msg. |
Source code in src/redsun/engine/_wrapper.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 | |
resume
¶
Resume the paused plan in a separate thread.
If the plan has been paused, the initial
future returned by __call__ will be set as completed.
With this method, the plan is resumed in a separate thread, and a new future is returned.
Returns:
| Type | Description |
|---|---|
``Future[RunEngineResult | tuple[str, ...]]``
|
Future object representing the result of the resumed plan. |
Source code in src/redsun/engine/_wrapper.py
Actions¶
Engine actions: decorators and types for continuous, interactive plans.
A continuous plan is one that runs in an infinite loop until explicitly stopped, and may support pause/resume and in-flight actions (user-triggered side effects while the plan is running).
This module provides:
SRLatch— an asyncio set-reset latch used to synchronise plan execution with external signals.continous— a decorator that marks a plan function as continuous and records itstogglableandpausablecapabilities.Action— a dataclass carrying metadata (name, description, toggle state) for a single in-flight action.ContinousPlan— atyping.Protocolused for static typing and runtimeisinstancechecks on decorated plans.
Action
dataclass
¶
Metadata for an in-flight action on a continuous plan.
An Action is a user-triggerable side effect that can be fired while a
continuous plan is running. It encapsulates an SRLatch synchronisation
primitive so the plan can await the action being triggered.
Warning
The internal SRLatch is created lazily on first access of
event_map, so Action objects can be constructed without a running
event loop. The latch must only be accessed from within a plan.
Subclassable to add additional fields for domain-specific use cases.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Name of the action. |
description |
str
|
Brief description of the action, usable as UI tooltip. |
togglable |
bool
|
Whether the action is togglable or not. |
toggle_states |
tuple[str, str]
|
Labels for the toggle states (on, off). Only used if |
Source code in src/redsun/engine/actions.py
event_map
property
¶
Return the latch for this action as a single-entry dict keyed by name.
SRLatch
¶
An asyncio Event-like object that behaves as a set-reset latch.
Wraps two asyncio.Event objects to allow waiting for either the set
or the reset state of the latch. At construction the latch starts in
the reset state.
Source code in src/redsun/engine/actions.py
set
¶
Set the internal flag to True.
All coroutines waiting in wait_for_set are awakened.
No-op if the flag is already set.
Source code in src/redsun/engine/actions.py
reset
¶
Reset the internal flag to False.
All coroutines waiting in wait_for_reset are awakened.
No-op if the flag is already reset.
Source code in src/redsun/engine/actions.py
is_set
¶
wait_for_set
async
¶
Wait until the internal flag is set.
Returns immediately if the flag is already set; otherwise blocks
until another coroutine calls set.
Source code in src/redsun/engine/actions.py
wait_for_reset
async
¶
Wait until the internal flag is reset.
Returns immediately if the flag is already reset; otherwise blocks
until another coroutine calls reset.
Source code in src/redsun/engine/actions.py
ContinousPlan
¶
Bases: Protocol[P, R_co]
Protocol for plans decorated with continous.
Used both for static typing (as the return type of the continous
decorator) and for runtime isinstance checks:
Attributes:
| Name | Type | Description |
|---|---|---|
__togglable__ |
bool
|
Whether the plan is togglable (i.e. runs as an infinite loop that the run engine can stop). |
__pausable__ |
bool
|
Whether the plan can be paused and resumed by the run engine. |
Source code in src/redsun/engine/actions.py
continous
¶
continous(
func: Callable[P, R_co] | None = None,
/,
*,
togglable: bool = True,
pausable: bool = False,
) -> (
Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]]
| ContinousPlan[P, R_co]
)
Mark a plan as continuous.
A continuous plan informs the view to provide UI controls that allow the user to start, stop, pause, and resume plan execution.
Can be used with or without arguments:
@continous
def my_plan() -> MsgGenerator[None]: ...
@continous(togglable=True, pausable=True)
def my_plan(detectors: Sequence[DetectorProtocol]) -> MsgGenerator[None]: ...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
togglable
|
bool
|
Whether the plan runs as an infinite loop that the run engine can stop via a toggle button. Default is True. |
True
|
pausable
|
bool
|
Whether the plan can be paused and resumed by the run engine. Default is False. |
False
|
Returns:
| Type | Description |
|---|---|
ContinousPlan
|
The decorated plan function, typed as a |
Notes
The decorator does not modify the function signature. It stores
togglable and pausable as attributes on the function object
(__togglable__ and __pausable__), to be retrieved later by
create_plan_spec.
Source code in src/redsun/engine/actions.py
Plan stubs¶
Custom Bluesky plan stubs for redsun plans.
These stubs extend the standard bluesky.plan_stubs with redsun-specific
action-based flow control (wait_for_actions, read_while_waiting).
All functions are generator functions that yield Msg objects and are
intended to be composed inside larger Bluesky plans via yield from.
wait_for_actions
¶
wait_for_actions(
events: Mapping[str, SRLatch],
timeout: float = SIXTY_FPS,
wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch]]
Wait for any of the given latches to change state.
Loops at timeout intervals until a latch transitions, then returns the name and latch that fired. Plan execution yields control on each iteration so background tasks continue running normally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
Mapping[str, SRLatch]
|
Mapping of action names to their |
required |
timeout
|
float
|
Polling interval in seconds. Default is 1/60 s (60 Hz). |
SIXTY_FPS
|
wait_for
|
Literal['set', 'reset']
|
Whether to wait for a latch to be set or reset.
Default is |
'set'
|
Returns:
| Type | Description |
|---|---|
tuple[str, SRLatch]
|
The name and latch that changed state. |
Source code in src/redsun/engine/plan_stubs.py
describe
¶
Gather the descriptor from a Readable device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
Readable[Any]
|
The device to describe. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Descriptor]
|
The descriptor dict returned by |
Source code in src/redsun/engine/plan_stubs.py
describe_collect
¶
describe_collect(
obj: Collectable,
) -> MsgGenerator[
dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
]
Gather descriptors from a Collectable device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
Collectable
|
The device to describe. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
|
The descriptor dict returned by |