Storage¶
Design inspiration
The storage architecture described here is heavily inspired by
ophyd-async, developed by the
Bluesky collaboration. The patterns — shared writer, path providers,
FrameSink, StorageProxy protocol — were adapted to fit the
sunflare/redsun container model.
The sunflare.storage subpackage provides the infrastructure for writing
detector frames to a persistent store. The design follows a small set of
principles:
- One shared writer per session — all devices write to the same store; each device owns its own array key within it.
- Storage is opt-in per device — devices that need storage declare a
StorageDescriptorfield; the baseDeviceclass carries no storage attribute. - The container injects the writer — devices never construct or look up a writer themselves; the application container builds one writer and injects it into every opted-in device at session start.
- Backend classes are internal — only the abstract
Writerand theStorageProxyprotocol are public; concrete backends (e.g.ZarrWriter) are selected and instantiated by the container from the session configuration. - Backend dependencies are optional extras —
acquire-zarris not a core dependency of sunflare. Installsunflare[zarr]to enable the Zarr backend.
Opting in to storage¶
A device signals that it needs storage by declaring a class-level StorageDescriptor:
import numpy as np
from sunflare.device import Device
from sunflare.storage import StorageDescriptor
class MyDetector(Device):
storage = StorageDescriptor()
def __init__(self, name: str, /) -> None:
super().__init__(name)
def prepare(self, capacity: int) -> None:
if self.storage is None:
raise RuntimeError("No storage backend configured for this session.")
self.storage.update_source(
self.name,
dtype=np.dtype("uint16"),
shape=(512, 512),
)
self._sink = self.storage.prepare(self.name, capacity=capacity)
The descriptor initialises to None; the container sets it to the shared writer
before any acquisition begins. Device code checks self.storage is None to handle
sessions that have no storage configured.
Acquisition lifecycle¶
Once the writer has been injected, a device interacts with storage through a fixed call sequence:
# 1. Register the data source (dtype, shape, optional backend-specific metadata)
self.storage.update_source(self.name, dtype=np.dtype("uint16"), shape=(512, 512))
# 2. Prepare for one acquisition; returns a FrameSink bound to this device
sink = self.storage.prepare(self.name, capacity=100)
# 3. Open the backend (called once, shared across all devices)
self.storage.kickoff()
# 4. Push frames — thread-safe, multiple sinks may write concurrently
sink.write(frame)
# 5. Signal completion for this device
sink.close()
FrameSink.close delegates to
Writer.complete. The backend is finalised
automatically once every active sink has been closed.
Path providers¶
The writer resolves store paths through a composable
PathProvider. A PathProvider is a callable
that accepts a device name and returns a PathInfo
describing where and how to write that device's data.
sunflare.storage ships three FilenameProvider
strategies that can be composed with
StaticPathProvider:
The StorageProxy protocol¶
Device code never holds a reference to a concrete backend class. It interacts
only through the StorageProxy protocol:
from sunflare.storage import StorageProxy
class StorageProxy(Protocol):
def update_source(self, name, dtype, shape, extra=None) -> None: ...
def prepare(self, name, capacity=0) -> FrameSink: ...
def kickoff(self) -> None: ...
def complete(self, name) -> None: ...
def get_indices_written(self, name=None) -> int: ...
def collect_stream_docs(self, name, indices_written) -> Iterator[StreamAsset]: ...
Writer satisfies this protocol structurally, so device
code remains independent of the concrete backend.
Tip
When testing devices in isolation, pass a MagicMock(spec=StorageProxy) as
the injected writer — no real backend is needed and all interactions are
captured for assertion.
Implementing a custom backend¶
To add a new storage backend, subclass Writer and
implement the four abstract members:
from sunflare.storage import Writer, FrameSink
import numpy.typing as npt
class MyWriter(Writer):
@property
def mimetype(self) -> str:
return "application/x-myformat"
def prepare(self, name: str, capacity: int = 0) -> FrameSink:
# backend-specific setup for this source ...
return super().prepare(name, capacity) # (1)
def kickoff(self) -> None:
if self.is_open:
return
# open your backend here ...
super().kickoff() # (2)
def _write_frame(self, name: str, frame: npt.NDArray) -> None:
# write one frame to the backend under the key for `name`
...
def _finalize(self) -> None:
# close the backend; called automatically when all sinks are done
...
super().prepare()resets per-source counters and returns the boundFrameSink— always call it.super().kickoff()setsis_open— always call it.
Warning
_write_frame is called by FrameSink.write
under the writer lock. Do not acquire the lock again inside _write_frame
or call any method that does.