Engine¶
Run engine¶
RunEngine
¶
Bases: RunEngine
The Run Engine execute messages and emits Documents.
This is a wrapper for the bluesky.run_engine.RunEngine class that
allows execution without blocking the main thread.
The main difference is that the __call__ method
is executed in a separate thread,
and it returns a concurrent.futures.Future object
representing the result of the plan execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
md
|
dict[str, Any]
|
The default is a standard Python dictionary, but fancier
objects can be used to store long-term history and persist
it between sessions. The standard configuration
instantiates a Run Engine with historydict.HistoryDict, a
simple interface to a sqlite file. Any object supporting
|
None
|
loop
|
AbstractEventLoop | None
|
An asyncio event loop to be used for executing plans. If not provided,
the RunEngine will create a new event loop using |
None
|
preprocessors
|
list
|
Generator functions that take in a plan (generator instance) and
modify its messages on the way out. Suitable examples include
the functions in the module |
None
|
md_validator
|
Callable[dict[str, Any], None]
|
a function that raises and prevents starting a run if it deems the metadata to be invalid or incomplete Function should raise if md is invalid. What that means is completely up to the user. The function's return value is ignored. |
None
|
md_normalizer
|
Callable[dict[str, Any], dict[str, Any]]
|
a function that, similar to md_validator, raises and prevents starting a run if it deems the metadata to be invalid or incomplete. If it succeeds, it returns the normalized/transformed version of the original metadata. Function should raise if md is invalid. What that means is completely up to the user. Expected return: normalized metadata |
None
|
scan_id_source
|
Callable[dict[str, Any], int | Awaitable[int]]
|
a (possibly async) function that will be used to calculate scan_id. Default is to increment scan_id by 1 each time. However you could pass in a customized function to get a scan_id from any source. Expected return: updated scan_id value |
default_scan_id_source
|
call_returns_result
|
bool
|
A flag that controls the return value of |
True
|
Attributes:
| Name | Type | Description |
|---|---|---|
md |
Direct access to the dict-like persistent storage described above |
|
record_interruptions |
False by default. Set to True to generate an extra event stream that records any interruptions (pauses, suspensions). |
|
state |
{'idle', 'running', 'paused'} |
|
suspenders |
Read-only collection of |
|
preprocessors |
list
|
Generator functions that take in a plan (generator instance) and
modify its messages on the way out. Suitable examples include
the functions in the module |
msg_hook |
Callable that receives all messages before they are processed
(useful for logging or other development purposes); expected
signature is |
|
state_hook |
Callable with signature |
|
waiting_hook |
Callable with signature |
|
ignore_callback_exceptions |
Boolean, False by default. |
|
call_returns_result |
Boolean, False by default. If False, RunEngine will return uuid list after running a plan. If True, RunEngine will return a RunEngineResult object that contains the plan result, error status, and uuid list. |
|
loop |
asyncio event loop
|
e.g., |
max_depth |
Maximum stack depth; set this to prevent users from calling the RunEngine inside a function (which can result in unexpected behavior and breaks introspection tools). Default is None. For built-in Python interpreter, set to 2. For IPython, set to 11 (tested on IPython 5.1.0; other versions may vary). |
|
pause_msg |
str
|
The message printed when a run is interrupted. This message
includes instructions of changing the state of the RunEngine.
It is set to |
commands |
The list of commands available to Msg. |
Source code in src/redsun/engine/_wrapper.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 | |
resume
¶
Resume the paused plan in a separate thread.
If the plan has been paused, the initial
future returned by __call__ will be set as completed.
With this method, the plan is resumed in a separate thread, and a new future is returned.
Returns:
| Type | Description |
|---|---|
``Future[RunEngineResult | tuple[str, ...]]``
|
Future object representing the result of the resumed plan. |
Source code in src/redsun/engine/_wrapper.py
Status
¶
Bases: Status
Track the status of a potentially-lengthy action like moving or triggering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
The amount of time to wait before marking the Status as failed. If
|
None
|
settle_time
|
float | None
|
The amount of time to wait between the caller specifying that the status has completed to running callbacks. Default is 0. |
0
|
Notes
Theory of operation:
This employs two threading.Event objects, one thread that runs for
(timeout + settle_time) seconds, and one thread that runs for
settle_time seconds (if settle_time is nonzero).
At init time, a timeout and settle_time are specified. A thread
is started, on which user callbacks, registered after init time via
add_callback, will eventually be run. The thread waits on an
Event be set or (timeout + settle_time) seconds to pass, whichever
happens first.
If (timeout + settle_time) expires and the Event has not
been set, an internal Exception is set to StatusTimeoutError, and a
second Event is set, marking the Status as done and failed. The
callbacks are run.
If a callback is registered after the Status is done, it will be run immediately.
If the first Event is set before (timeout + settle_time) expires, then the second Event is set and no internal Exception is set, marking the Status as done and successful. The callbacks are run.
There are two methods that directly set the first Event. One,
set_exception, sets it directly after setting the internal
Exception. The other, set_finished, starts a
threading.Timer that will set it after a delay (the settle_time).
One of these methods may be called, and at most once. If one is called
twice or if both are called, InvalidState is raised. If they are
called too late to prevent a StatusTimeoutError, they are ignored
but one call is still allowed. Thus, an external callback, e.g. pyepics,
may reports success or failure after the Status object has expired, but
to no effect because the callbacks have already been called and the
program has moved on.
Source code in src/redsun/engine/_status.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 | |
timeout
property
¶
The timeout for this action.
This is set when the Status is created, and it cannot be changed.
settle_time
property
¶
A delay between when set_finished is when the Status is done.
This is set when the Status is created, and it cannot be changed.
done
property
¶
Boolean indicating whether associated operation has completed.
This is set to True at init time or by calling
set_finished, set_exception.
Once True, it can never become False.
success
property
¶
Boolean indicating whether associated operation has completed.
This is set to True at init time or by calling
set_finished, set_exception
. Once True, it can never become False.
callbacks
property
¶
Callbacks to be run when the status is marked as finished.
set_exception
¶
Mark as finished but failed with the given Exception.
This method should generally not be called by the recipient of this Status object, but only by the object that created and returned it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc
|
BaseException
|
The exception that caused the failure. |
required |
Source code in src/redsun/engine/_status.py
set_finished
¶
Mark as finished successfully.
This method should generally not be called by the recipient of this Status object, but only by the object that created and returned it.
Source code in src/redsun/engine/_status.py
exception
¶
Return the exception raised by the action.
If the action has completed successfully, return None. If it has
finished in error, return the exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
If None (default) wait indefinitely until the status finishes. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Exception |
BaseException | None
|
The exception raised by the action. If the action has completed
successfully, return |
Raises:
| Type | Description |
|---|---|
WaitTimeoutError
|
If the status has not completed within |
Source code in src/redsun/engine/_status.py
wait
¶
Block until the action completes.
When the action has finished succesfully, return None. If the
action has failed, raise the exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
If |
None
|
Raises:
| Type | Description |
|---|---|
WaitTimeoutError
|
If the status has not completed within |
StatusTimeoutError
|
If the status has failed because the timeout that it was initialized with has expired. |
Exception
|
This is |
Source code in src/redsun/engine/_status.py
add_callback
¶
Register a callback to be called once when the Status finishes.
The callback will be called exactly once. If the Status is finished before a callback is added, it will be called immediately. This is threadsafe. The callback will be called regardless of success of failure. The callback has access to this status object, so it can distinguish success or failure by inspecting the object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[Status], None]
|
required |
Source code in src/redsun/engine/_status.py
Actions¶
Engine actions: decorators and types for continuous, interactive plans.
A continuous plan is one that runs in an infinite loop until explicitly stopped, and may support pause/resume and in-flight actions (user-triggered side effects while the plan is running).
This module provides:
SRLatch— an asyncio set-reset latch used to synchronise plan execution with external signals.continous— a decorator that marks a plan function as continuous and records itstogglableandpausablecapabilities.Action— a dataclass carrying metadata (name, description, toggle state) for a single in-flight action.ContinousPlan— atyping.Protocolused for static typing and runtimeisinstancechecks on decorated plans.
Action
dataclass
¶
Metadata for an in-flight action on a continuous plan.
An Action is a user-triggerable side effect that can be fired while a
continuous plan is running. It encapsulates an SRLatch synchronisation
primitive so the plan can await the action being triggered.
Warning
The internal SRLatch is created lazily on first access of
event_map, so Action objects can be constructed without a running
event loop. The latch must only be accessed from within a plan.
Subclassable to add additional fields for domain-specific use cases.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the action. |
required |
description
|
str
|
Brief description of the action, usable as UI tooltip. |
''
|
togglable
|
bool
|
Whether the action is togglable or not. |
False
|
toggle_states
|
tuple[str, str]
|
Labels for the toggle states (on, off). Only used if |
('On', 'Off')
|
Source code in src/redsun/engine/actions.py
description
class-attribute
instance-attribute
¶
Brief description of the action, usable as UI tooltip.
togglable
class-attribute
instance-attribute
¶
Whether the action is togglable or not.
toggle_states
class-attribute
instance-attribute
¶
Labels for the toggle states (on, off). Only used if togglable is True.
event_map
property
¶
Return the latch for this action as a single-entry dict keyed by name.
SRLatch
¶
An asyncio Event-like object that behaves as a set-reset latch.
Wraps two asyncio.Event objects to allow waiting for either the set
or the reset state of the latch. At construction the latch starts in
the reset state.
Source code in src/redsun/engine/actions.py
set
¶
Set the internal flag to True.
All coroutines waiting in wait_for_set are awakened.
No-op if the flag is already set.
Source code in src/redsun/engine/actions.py
reset
¶
Reset the internal flag to False.
All coroutines waiting in wait_for_reset are awakened.
No-op if the flag is already reset.
Source code in src/redsun/engine/actions.py
is_set
¶
wait_for_set
async
¶
Wait until the internal flag is set.
Returns immediately if the flag is already set; otherwise blocks
until another coroutine calls set.
Source code in src/redsun/engine/actions.py
wait_for_reset
async
¶
Wait until the internal flag is reset.
Returns immediately if the flag is already reset; otherwise blocks
until another coroutine calls reset.
Source code in src/redsun/engine/actions.py
ContinousPlan
¶
Bases: Protocol[P, R_co]
Protocol for plans decorated with continous.
Used both for static typing (as the return type of the continous
decorator) and for runtime isinstance checks:
Attributes:
| Name | Type | Description |
|---|---|---|
__togglable__ |
bool
|
Whether the plan is togglable (i.e. runs as an infinite loop that the run engine can stop). |
__pausable__ |
bool
|
Whether the plan can be paused and resumed by the run engine. |
Source code in src/redsun/engine/actions.py
continous
¶
continous(
func: Callable[P, R_co] | None = None,
/,
*,
togglable: bool = True,
pausable: bool = False,
) -> (
Callable[[Callable[P, R_co]], ContinousPlan[P, R_co]]
| ContinousPlan[P, R_co]
)
Mark a plan as continuous.
A continuous plan informs the view to provide UI controls that allow the user to start, stop, pause, and resume plan execution.
Can be used with or without arguments:
@continous
def my_plan() -> MsgGenerator[None]: ...
@continous(togglable=True, pausable=True)
def my_plan(detectors: Sequence[DetectorProtocol]) -> MsgGenerator[None]: ...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
togglable
|
bool
|
Whether the plan runs as an infinite loop that the run engine can stop via a toggle button. Default is True. |
True
|
pausable
|
bool
|
Whether the plan can be paused and resumed by the run engine. Default is False. |
False
|
Returns:
| Type | Description |
|---|---|
ContinousPlan
|
The decorated plan function, typed as a |
Notes
The decorator does not modify the function signature. It stores
togglable and pausable as attributes on the function object
(__togglable__ and __pausable__), to be retrieved later by
create_plan_spec.
Source code in src/redsun/engine/actions.py
Plan stubs¶
Custom Bluesky plan stubs for redsun plans.
These stubs extend the standard bluesky.plan_stubs with redsun-specific
operations such as device-cache management (stash, clear_cache) and
action-based flow control (wait_for_actions, read_while_waiting).
All functions are generator functions that yield Msg objects and are
intended to be composed inside larger Bluesky plans via yield from.
set_property
¶
set_property(
obj: Movable[Any],
value: Any,
/,
propr: str,
timeout: float | None = None,
) -> MsgGenerator[Status]
Set a property of a Movable object and wait for completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
Movable[Any]
|
The movable object whose property will be set. |
required |
value
|
Any
|
The value to set. |
required |
propr
|
str
|
The property name to set (keyword-only). |
required |
timeout
|
float | None
|
Maximum time in seconds to wait for completion. None means wait indefinitely. Default is None. |
None
|
Yields:
| Type | Description |
|---|---|
Msg
|
A |
Returns:
| Type | Description |
|---|---|
Status
|
The status object returned by the |
Source code in src/redsun/engine/plan_stubs.py
wait_for_actions
¶
wait_for_actions(
events: Mapping[str, SRLatch],
timeout: float = 0.001,
wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch] | None]
Wait for any of the given latches to change state.
Plan execution blocks until one latch transitions; background tasks continue running normally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
Mapping[str, SRLatch]
|
Mapping of action names to their |
required |
timeout
|
float
|
Maximum time in seconds to wait before returning None. Default is 0.001 seconds. |
0.001
|
wait_for
|
Literal['set', 'reset']
|
Whether to wait for a latch to be set or reset.
Default is |
'set'
|
Returns:
| Type | Description |
|---|---|
tuple[str, SRLatch] | None
|
The name and latch that changed state, or None if the timeout elapsed without any latch transitioning. |
Source code in src/redsun/engine/plan_stubs.py
read_while_waiting
¶
read_while_waiting(
objs: Sequence[Readable[Any]],
events: Mapping[str, SRLatch],
stream_name: str = "primary",
refresh_period: float = SIXTY_FPS,
wait_for: Literal["set", "reset"] = "set",
) -> MsgGenerator[tuple[str, SRLatch]]
Repeatedly trigger and read devices until an action latch changes state.
On each iteration the plan triggers and reads all objects in objs, then checks whether any latch in events has changed state. The loop repeats at refresh_period until a latch transitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
objs
|
Sequence[Readable[Any]]
|
Devices to trigger and read on each iteration. |
required |
events
|
Mapping[str, SRLatch]
|
Mapping of action names to |
required |
stream_name
|
str
|
Name of the Bluesky stream to collect data into.
Default is |
'primary'
|
refresh_period
|
float
|
Polling period in seconds. Default is 1/60 s (60 Hz). |
SIXTY_FPS
|
wait_for
|
Literal['set', 'reset']
|
Whether to wait for a latch to be set or reset.
Default is |
'set'
|
Returns:
| Type | Description |
|---|---|
tuple[str, SRLatch]
|
The name and latch that unblocked the loop. |
Source code in src/redsun/engine/plan_stubs.py
read_and_stash
¶
read_and_stash(
objs: Sequence[Readable[Any]],
cache_objs: Sequence[HasCache],
*,
stream: str = "primary",
group: str | None = None,
wait: bool = False,
) -> MsgGenerator[dict[str, Reading[Any]]]
Trigger, read, and stash readings from one or more devices.
Triggers all Triggerable devices in objs, reads each device, stashes
the reading into the corresponding HasCache object in cache_objs, and
emits a Bluesky create/save pair to record the event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
objs
|
Sequence[Readable[Any]]
|
Devices to read from. |
required |
cache_objs
|
Sequence[HasCache]
|
Cache objects paired with objs (same order) to stash readings into. |
required |
stream
|
str
|
Bluesky stream name. Default is |
'primary'
|
group
|
str | None
|
Identifier for the stash group. Auto-generated if None. |
None
|
wait
|
bool
|
Whether to wait for all stash operations to complete before returning. Default is False. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, Reading[Any]]
|
Combined readings from all devices. |
Source code in src/redsun/engine/plan_stubs.py
stash
¶
stash(
obj: HasCache,
reading: dict[str, Reading[Any]],
*,
group: str | None,
wait: bool,
) -> MsgGenerator[None]
Stash a reading into a HasCache device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
HasCache
|
The cache object to stash the reading into. |
required |
reading
|
dict[str, Reading[Any]]
|
The reading to stash, typically from |
required |
group
|
str | None
|
Identifier for the stash group. A unique id is generated if None. |
required |
wait
|
bool
|
Whether to wait for the stash operation to complete. |
required |
Source code in src/redsun/engine/plan_stubs.py
clear_cache
¶
clear_cache(
obj: HasCache,
*,
group: str | None = None,
wait: bool = False,
) -> MsgGenerator[None]
Clear the cache of a HasCache device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
HasCache
|
The cache object to clear. |
required |
group
|
str | None
|
Identifier for the clear operation. Auto-generated if None. |
None
|
wait
|
bool
|
Whether to wait for the clear operation to complete. Default is False. |
False
|
Source code in src/redsun/engine/plan_stubs.py
describe
¶
Gather the descriptor from a Readable device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
Readable[Any]
|
The device to describe. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Descriptor]
|
The descriptor dict returned by |
Source code in src/redsun/engine/plan_stubs.py
describe_collect
¶
describe_collect(
obj: Collectable,
) -> MsgGenerator[
dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
]
Gather descriptors from a Collectable device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
Collectable
|
The device to describe. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Descriptor] | dict[str, dict[str, Descriptor]]
|
The descriptor dict returned by |