Skip to content

Storage

Writer

Bases: ABC, Loggable

Abstract base class for storage backend writers.

Writers are long-lived singletons, created once when the application starts and reused across every acquisition. The class-level registry is keyed by a composite key (name, mimetype) where:

  • mimetype identifies the storage format and determines which Writer subclass handles serialisation (e.g. "application/x-zarr" maps to ZarrWriter).
  • name is a store group name that distinguishes multiple independent stores that share the same format. Use "default" for the common single-store case. This is not a device name — many devices may contribute sources to the same writer by referencing the same (name, mimetype) key.

Example keys:

("default", "application/x-zarr")  # the normal single-store case
("live", "application/x-zarr")     # a separate live-preview store
("default", "application/x-hdf5")  # a different format entirely

URI setting is controlled by the Presenter layer via set_uri. Subclasses must override set_uri to perform any backend-specific path translation without disturbing the registry.

Call order per acquisition:

  1. set_uri(uri)
    • called by a presenter before the plan
  2. register(source_name, data_key, dtype, shape, capacity)
    • called by each device in its own prepare()
    • returns a FrameSink
  3. kickoff()
    • opens the backend
  4. sink.write(frame)
    • push frames (thread-safe)
  5. sink.close()
    • signals completion for this source

Subclasses must implement the following methods and properties:

  • mimetype — MIME type string
  • kickoff
    • open the backend
    • must call super().kickoff() to set is_open and enforce the URI guard
  • _on_prepare (private)
    • backend-specific setup after a source is registered (e.g. pre-declare Zarr array dimensions); called at the end of prepare
  • _write_frame
    • write one frame to the backend
  • _finalize
    • close the backend when all sources are done
  • _class_mimetype (class method)
    • return the MIME type string for this subclass; used for registry keys

Parameters:

Name Type Description Default
name str

Store group name. See class docstring for semantics.

required
Source code in src/redsun/storage/_base.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
class Writer(abc.ABC, Loggable):
    """Abstract base class for storage backend writers.

    Writers are long-lived singletons, created once when the application
    starts and reused across every acquisition.  The class-level registry
    is keyed by a **composite key** ``(name, mimetype)`` where:

    - ``mimetype`` identifies the storage format and determines which
      ``Writer`` subclass handles serialisation (e.g.
      ``"application/x-zarr"`` maps to ``ZarrWriter``).
    - ``name`` is a **store group name** that distinguishes multiple
      independent stores that share the same format.  Use ``"default"``
      for the common single-store case.  This is *not* a device name —
      many devices may contribute sources to the same writer by
      referencing the same ``(name, mimetype)`` key.

    Example keys:

        ("default", "application/x-zarr")  # the normal single-store case
        ("live", "application/x-zarr")     # a separate live-preview store
        ("default", "application/x-hdf5")  # a different format entirely

    URI setting is controlled by the Presenter layer
    via [`set_uri`][redsun.storage.Writer.set_uri].
    Subclasses must override ``set_uri`` to perform any backend-specific path
    translation without disturbing the registry.

    Call order per acquisition:

    1. ``set_uri(uri)``
        - called by a presenter before the plan
    2. ``register(source_name, data_key, dtype, shape, capacity)``
        - called by each device in its own ``prepare()``
        - returns a [`FrameSink`][redsun.storage.FrameSink]
    3. ``kickoff()``
        - opens the backend
    4. ``sink.write(frame)``
        - push frames (thread-safe)
    5. ``sink.close()``
        - signals completion for this source

    Subclasses must implement the following methods and properties:

    - [`mimetype`][redsun.storage.Writer.mimetype] — MIME type string
    - [`kickoff`][redsun.storage.Writer.kickoff]
        - open the backend
        - must call ``super().kickoff()`` to set ``is_open`` and enforce the URI guard
    - [`_on_prepare`][redsun.storage.Writer._on_prepare] (private)
        - backend-specific setup after a source is registered (e.g. pre-declare Zarr array
          dimensions); called at the end of [`prepare`][redsun.storage.Writer.prepare]
    - [`_write_frame`][redsun.storage.Writer._write_frame]
        - write one frame to the backend
    - [`_finalize`][redsun.storage.Writer._finalize]
        - close the backend when all sources are done
    - [`_class_mimetype`][redsun.storage.Writer._class_mimetype] (class method)
        - return the MIME type string for this subclass; used for registry keys

    Parameters
    ----------
    name : str
        Store group name.  See class docstring for semantics.
    """

    _registry: dict[_WriterKey, "Writer"] = {}
    _registry_lock: th.Lock = th.Lock()

    def __init__(self, name: str) -> None:
        self._name = name
        self._uri: str = ""
        self._lock = th.Lock()
        self._is_open = False
        self._sources: dict[str, SourceInfo] = {}
        self._metadata: dict[str, dict[str, Any]] = {}

    @classmethod
    def get(cls: type[_W], name: str = "default") -> _W:
        """Return the singleton writer for *(name, cls.mimetype)*.

        Creates a new instance on first call; returns the existing one
        on every subsequent call for the same ``(name, mimetype)`` pair.
        The URI is *not* set here — call
        [`set_uri`][redsun.storage.Writer.set_uri] separately.

        This method is normally not called directly by devices or
        application code.  Use
        [`make_writer`][redsun.storage.make_writer] instead, which
        resolves the correct subclass from the mimetype string.

        Parameters
        ----------
        name : str
            Store group name.  Defaults to ``"default"``.

        Returns
        -------
        Writer
            Singleton instance for ``(name, cls.mimetype)``.

        Raises
        ------
        TypeError
            If the existing registry entry is not an instance of ``cls``.
        """
        # cls.mimetype is an abstract property; each concrete subclass
        # provides a fixed string (e.g. ZarrWriter.mimetype == "application/x-zarr").
        # We access it via the class directly to avoid needing an instance.
        key: _WriterKey = (name, cls._class_mimetype())
        with cls._registry_lock:
            if key not in cls._registry:
                cls._registry[key] = cls(name)
            instance = cls._registry[key]
        if not isinstance(instance, cls):
            raise TypeError(
                f"Registry entry for {key!r} is {type(instance).__name__!r}, "
                f"expected {cls.__name__!r}"
            )
        return instance

    @classmethod
    def release(cls, name: str = "default") -> None:
        """Remove the registry entry for *(name, cls.mimetype)*.

        Called by a presenter at application shutdown.
        Devices should not call this directly.

        Parameters
        ----------
        name : str
            Store group name.  Defaults to ``"default"``.
        """
        key: _WriterKey = (name, cls._class_mimetype())
        with cls._registry_lock:
            cls._registry.pop(key, None)

    @classmethod
    @abc.abstractmethod
    def _class_mimetype(cls) -> str:
        """Return the MIME type string for this subclass.

        Used by [`get`][redsun.storage.Writer.get] and
        [`release`][redsun.storage.Writer.release] to build the registry
        key before any instance exists.  Must return the same value as
        the instance property [`mimetype`][redsun.storage.Writer.mimetype].

        Subclasses implement this as a one-liner::

            @classmethod
            def _class_mimetype(cls) -> str:
                return "application/x-zarr"
        """
        ...

    @property
    def is_open(self) -> bool:
        """Return whether the backend is currently open."""
        return self._is_open

    @property
    def uri(self) -> str:
        """The current store URI.

        Empty string until
        [`set_uri`][redsun.storage.Writer.set_uri] has been called.
        """
        return self._uri

    @property
    def mimetype(self) -> str:
        """The MIME type string for this backend.

        Must return the same value as
        [`_class_mimetype`][redsun.storage.Writer._class_mimetype].
        """
        return self._class_mimetype()

    def set_uri(self, uri: str) -> None:
        """Update the store URI for the next acquisition.

        Called by a presenter before each acquisition and
        whenever the user changes the output directory.  The writer
        must not be open when this is called.

        Subclasses should override this to perform any backend-specific
        path translation (e.g. rebuilding ``StreamSettings.store_path``
        for ``ZarrWriter``) and must call ``super().set_uri(uri)``.

        Parameters
        ----------
        uri : str
            New store URI (e.g. ``"file:///data/2026_02_25/scan_00001"``).

        Raises
        ------
        RuntimeError
            If the writer is currently open.
        """
        if self._is_open:
            raise RuntimeError(
                f"Cannot change URI on writer ({self._name!r}, {self.mimetype!r}) "
                "while it is open."
            )
        self._uri = uri
        self.logger.debug(f"URI updated to {uri!r}")

    def prepare(
        self,
        name: str,
        data_key: str,
        dtype: np.dtype[np.generic],
        shape: tuple[int, ...],
        capacity: int = 0,
    ) -> FrameSink:
        """Register a data source and return a ready ``FrameSink``.

        Called by each device inside its own ``prepare()`` method, once
        per acquisition.  Replaces the former two-step
        ``update_source`` + ``prepare`` sequence.

        Safe to call multiple times on the same source name — counters
        are reset on each call, making it suitable for repeated
        acquisitions without recreating the writer.

        Parameters
        ----------
        name : str
            Source name, typically the device name.  Multiple devices
            may register distinct source names on the same writer.
        data_key : str
            Bluesky data key used in stream documents.
        dtype : np.dtype[np.generic]
            NumPy data type of the frames.
        shape : tuple[int, ...]
            Shape of each individual frame.
        capacity : int
            Maximum number of frames to accept.  ``0`` means unlimited.

        Returns
        -------
        FrameSink
            Bound sink ready to accept frames via ``sink.write(frame)``.

        Raises
        ------
        RuntimeError
            If the writer is currently open.
        """
        if self._is_open:
            raise RuntimeError(
                f"Cannot register source {name!r} on writer "
                f"({self._name!r}, {self.mimetype!r}) while it is open."
            )
        self._sources[name] = SourceInfo(
            name=name,
            dtype=dtype,
            shape=shape,
            data_key=data_key,
            capacity=capacity,
        )
        self.logger.debug(
            f"Registered source {name!r} — shape={shape}, capacity={capacity}"
        )
        self._on_prepare(name)
        return FrameSink(self, name)

    @abc.abstractmethod
    def _on_prepare(self, name: str) -> None:
        """Backend-specific hook called after a source is registered.

        Invoked at the end of [`prepare`][redsun.storage.Writer.prepare]
        once ``self._sources[name]`` is fully populated.  Subclasses use
        this to pre-declare backend structures — for example, ``ZarrWriter``
        builds its ``ArraySettings`` here from the source dtype and shape.

        Parameters
        ----------
        name : str
            The source name just registered.
        """
        ...

    def get_indices_written(self, name: str | None = None) -> int:
        """Return the number of frames written for a source.

        Parameters
        ----------
        name : str | None
            Source name.  If ``None``, returns the minimum across all
            registered sources (useful for synchronisation checks).

        Raises
        ------
        KeyError
            If *name* is not registered.
        """
        if name is None:
            if not self._sources:
                return 0
            return min(s.frames_written for s in self._sources.values())
        if name not in self._sources.keys():
            raise KeyError(f"Unknown source {name!r}")
        return self._sources[name].frames_written

    def reset_collection_state(self, name: str) -> None:
        """Reset the stream-document counters for *name*.

        Parameters
        ----------
        name : str
            Source name to reset.
        """
        source = self._sources[name]
        source.collection_counter = 0
        source.stream_resource_uid = str(uuid.uuid4())

    @abc.abstractmethod
    def kickoff(self) -> None:
        """Open the storage backend for a new acquisition.

        Subclasses must call ``super().kickoff()`` to set
        [`is_open`][redsun.storage.Writer.is_open] and to enforce the
        URI guard.  Subsequent calls while already open must be no-ops.

        Raises
        ------
        RuntimeError
            If [`uri`][redsun.storage.Writer.uri] has not been set yet.
        """
        if not self._uri:
            clear_metadata()
            raise RuntimeError(
                f"Writer ({self._name!r}, {self.mimetype!r}) has no URI. "
                "A presenter must call set_uri() before kickoff()."
            )
        if not self._is_open:
            self._metadata = snapshot_metadata()
            self._is_open = True

    def complete(self, name: str) -> None:
        """Mark acquisition complete for source *name*.

        Called automatically by [`FrameSink.close`][redsun.storage.FrameSink.close].
        When the last registered source calls ``complete``, the backend
        is finalised and ``is_open`` is reset.  The writer instance
        remains in the registry and is ready for the next acquisition.

        Parameters
        ----------
        name : str
            Source name signalling completion.
        """
        self._sources[name].completed = True
        if all(s.completed for s in self._sources.values()):
            self._finalize()
            self._is_open = False
            clear_metadata()
            self.logger.debug("All sources complete; backend finalised.")

    def clear_sources(self) -> None:
        """Remove all registered sources.

        A presenter in charge of monitoring
        writing progress should take care to call this
        after each plan is finished.
        """
        self._sources.clear()
        self.logger.debug("Source cache reset.")

    @abc.abstractmethod
    def _write_frame(self, name: str, frame: npt.NDArray[np.generic]) -> None:
        """Write one frame to the backend.

        Called by [`FrameSink.write`][redsun.storage.FrameSink.write]
        under the writer lock.

        Parameters
        ----------
        name : str
            Source name.
        frame : npt.NDArray[np.generic]
            Frame data to write.
        """
        ...

    @abc.abstractmethod
    def _finalize(self) -> None:
        """Close the backend after all sources have completed."""
        ...

    def collect_stream_docs(
        self,
        name: str,
        indices_written: int,
    ) -> Iterator[StreamAsset]:
        """Yield ``StreamResource`` and ``StreamDatum`` documents for *name*.

        Parameters
        ----------
        name : str
            Source name.
        indices_written : int
            Number of frames to report in this call.

        Yields
        ------
        StreamAsset
            Tuples of ``("stream_resource", doc)`` or
            ``("stream_datum", doc)``.

        Raises
        ------
        KeyError
            If *name* is not registered.
        """
        if name not in self._sources:
            raise KeyError(f"Unknown source {name!r}")

        source = self._sources[name]

        if indices_written == 0:
            return

        frames_to_report = min(indices_written, source.frames_written)

        if source.collection_counter >= frames_to_report:
            return

        if source.collection_counter == 0:
            stream_resource: StreamResource = {
                "data_key": source.data_key,
                "mimetype": self.mimetype,
                "parameters": {"array_name": source.name},
                "uid": source.stream_resource_uid,
                "uri": self.uri,
            }
            yield ("stream_resource", stream_resource)

        stream_datum: StreamDatum = {
            "descriptor": "",
            "indices": {"start": source.collection_counter, "stop": frames_to_report},
            "seq_nums": {"start": 0, "stop": 0},
            "stream_resource": source.stream_resource_uid,
            "uid": f"{source.stream_resource_uid}/{source.collection_counter}",
        }
        yield ("stream_datum", stream_datum)

        source.collection_counter = frames_to_report

is_open property

is_open: bool

Return whether the backend is currently open.

uri property

uri: str

The current store URI.

Empty string until set_uri has been called.

mimetype property

mimetype: str

The MIME type string for this backend.

Must return the same value as _class_mimetype.

get classmethod

get(name: str = 'default') -> _W

Return the singleton writer for (name, cls.mimetype).

Creates a new instance on first call; returns the existing one on every subsequent call for the same (name, mimetype) pair. The URI is not set here — call set_uri separately.

This method is normally not called directly by devices or application code. Use [make_writer][redsun.storage.make_writer] instead, which resolves the correct subclass from the mimetype string.

Parameters:

Name Type Description Default
name str

Store group name. Defaults to "default".

'default'

Returns:

Type Description
Writer

Singleton instance for (name, cls.mimetype).

Raises:

Type Description
TypeError

If the existing registry entry is not an instance of cls.

Source code in src/redsun/storage/_base.py
@classmethod
def get(cls: type[_W], name: str = "default") -> _W:
    """Return the singleton writer for *(name, cls.mimetype)*.

    Creates a new instance on first call; returns the existing one
    on every subsequent call for the same ``(name, mimetype)`` pair.
    The URI is *not* set here — call
    [`set_uri`][redsun.storage.Writer.set_uri] separately.

    This method is normally not called directly by devices or
    application code.  Use
    [`make_writer`][redsun.storage.make_writer] instead, which
    resolves the correct subclass from the mimetype string.

    Parameters
    ----------
    name : str
        Store group name.  Defaults to ``"default"``.

    Returns
    -------
    Writer
        Singleton instance for ``(name, cls.mimetype)``.

    Raises
    ------
    TypeError
        If the existing registry entry is not an instance of ``cls``.
    """
    # cls.mimetype is an abstract property; each concrete subclass
    # provides a fixed string (e.g. ZarrWriter.mimetype == "application/x-zarr").
    # We access it via the class directly to avoid needing an instance.
    key: _WriterKey = (name, cls._class_mimetype())
    with cls._registry_lock:
        if key not in cls._registry:
            cls._registry[key] = cls(name)
        instance = cls._registry[key]
    if not isinstance(instance, cls):
        raise TypeError(
            f"Registry entry for {key!r} is {type(instance).__name__!r}, "
            f"expected {cls.__name__!r}"
        )
    return instance

release classmethod

release(name: str = 'default') -> None

Remove the registry entry for (name, cls.mimetype).

Called by a presenter at application shutdown. Devices should not call this directly.

Parameters:

Name Type Description Default
name str

Store group name. Defaults to "default".

'default'
Source code in src/redsun/storage/_base.py
@classmethod
def release(cls, name: str = "default") -> None:
    """Remove the registry entry for *(name, cls.mimetype)*.

    Called by a presenter at application shutdown.
    Devices should not call this directly.

    Parameters
    ----------
    name : str
        Store group name.  Defaults to ``"default"``.
    """
    key: _WriterKey = (name, cls._class_mimetype())
    with cls._registry_lock:
        cls._registry.pop(key, None)

_class_mimetype abstractmethod classmethod

_class_mimetype() -> str

Return the MIME type string for this subclass.

Used by get and release to build the registry key before any instance exists. Must return the same value as the instance property mimetype.

Subclasses implement this as a one-liner::

@classmethod
def _class_mimetype(cls) -> str:
    return "application/x-zarr"
Source code in src/redsun/storage/_base.py
@classmethod
@abc.abstractmethod
def _class_mimetype(cls) -> str:
    """Return the MIME type string for this subclass.

    Used by [`get`][redsun.storage.Writer.get] and
    [`release`][redsun.storage.Writer.release] to build the registry
    key before any instance exists.  Must return the same value as
    the instance property [`mimetype`][redsun.storage.Writer.mimetype].

    Subclasses implement this as a one-liner::

        @classmethod
        def _class_mimetype(cls) -> str:
            return "application/x-zarr"
    """
    ...

set_uri

set_uri(uri: str) -> None

Update the store URI for the next acquisition.

Called by a presenter before each acquisition and whenever the user changes the output directory. The writer must not be open when this is called.

Subclasses should override this to perform any backend-specific path translation (e.g. rebuilding StreamSettings.store_path for ZarrWriter) and must call super().set_uri(uri).

Parameters:

Name Type Description Default
uri str

New store URI (e.g. "file:///data/2026_02_25/scan_00001").

required

Raises:

Type Description
RuntimeError

If the writer is currently open.

Source code in src/redsun/storage/_base.py
def set_uri(self, uri: str) -> None:
    """Update the store URI for the next acquisition.

    Called by a presenter before each acquisition and
    whenever the user changes the output directory.  The writer
    must not be open when this is called.

    Subclasses should override this to perform any backend-specific
    path translation (e.g. rebuilding ``StreamSettings.store_path``
    for ``ZarrWriter``) and must call ``super().set_uri(uri)``.

    Parameters
    ----------
    uri : str
        New store URI (e.g. ``"file:///data/2026_02_25/scan_00001"``).

    Raises
    ------
    RuntimeError
        If the writer is currently open.
    """
    if self._is_open:
        raise RuntimeError(
            f"Cannot change URI on writer ({self._name!r}, {self.mimetype!r}) "
            "while it is open."
        )
    self._uri = uri
    self.logger.debug(f"URI updated to {uri!r}")

prepare

prepare(
    name: str,
    data_key: str,
    dtype: dtype[generic],
    shape: tuple[int, ...],
    capacity: int = 0,
) -> FrameSink

Register a data source and return a ready FrameSink.

Called by each device inside its own prepare() method, once per acquisition. Replaces the former two-step update_source + prepare sequence.

Safe to call multiple times on the same source name — counters are reset on each call, making it suitable for repeated acquisitions without recreating the writer.

Parameters:

Name Type Description Default
name str

Source name, typically the device name. Multiple devices may register distinct source names on the same writer.

required
data_key str

Bluesky data key used in stream documents.

required
dtype dtype[generic]

NumPy data type of the frames.

required
shape tuple[int, ...]

Shape of each individual frame.

required
capacity int

Maximum number of frames to accept. 0 means unlimited.

0

Returns:

Type Description
FrameSink

Bound sink ready to accept frames via sink.write(frame).

Raises:

Type Description
RuntimeError

If the writer is currently open.

Source code in src/redsun/storage/_base.py
def prepare(
    self,
    name: str,
    data_key: str,
    dtype: np.dtype[np.generic],
    shape: tuple[int, ...],
    capacity: int = 0,
) -> FrameSink:
    """Register a data source and return a ready ``FrameSink``.

    Called by each device inside its own ``prepare()`` method, once
    per acquisition.  Replaces the former two-step
    ``update_source`` + ``prepare`` sequence.

    Safe to call multiple times on the same source name — counters
    are reset on each call, making it suitable for repeated
    acquisitions without recreating the writer.

    Parameters
    ----------
    name : str
        Source name, typically the device name.  Multiple devices
        may register distinct source names on the same writer.
    data_key : str
        Bluesky data key used in stream documents.
    dtype : np.dtype[np.generic]
        NumPy data type of the frames.
    shape : tuple[int, ...]
        Shape of each individual frame.
    capacity : int
        Maximum number of frames to accept.  ``0`` means unlimited.

    Returns
    -------
    FrameSink
        Bound sink ready to accept frames via ``sink.write(frame)``.

    Raises
    ------
    RuntimeError
        If the writer is currently open.
    """
    if self._is_open:
        raise RuntimeError(
            f"Cannot register source {name!r} on writer "
            f"({self._name!r}, {self.mimetype!r}) while it is open."
        )
    self._sources[name] = SourceInfo(
        name=name,
        dtype=dtype,
        shape=shape,
        data_key=data_key,
        capacity=capacity,
    )
    self.logger.debug(
        f"Registered source {name!r} — shape={shape}, capacity={capacity}"
    )
    self._on_prepare(name)
    return FrameSink(self, name)

_on_prepare abstractmethod

_on_prepare(name: str) -> None

Backend-specific hook called after a source is registered.

Invoked at the end of prepare once self._sources[name] is fully populated. Subclasses use this to pre-declare backend structures — for example, ZarrWriter builds its ArraySettings here from the source dtype and shape.

Parameters:

Name Type Description Default
name str

The source name just registered.

required
Source code in src/redsun/storage/_base.py
@abc.abstractmethod
def _on_prepare(self, name: str) -> None:
    """Backend-specific hook called after a source is registered.

    Invoked at the end of [`prepare`][redsun.storage.Writer.prepare]
    once ``self._sources[name]`` is fully populated.  Subclasses use
    this to pre-declare backend structures — for example, ``ZarrWriter``
    builds its ``ArraySettings`` here from the source dtype and shape.

    Parameters
    ----------
    name : str
        The source name just registered.
    """
    ...

get_indices_written

get_indices_written(name: str | None = None) -> int

Return the number of frames written for a source.

Parameters:

Name Type Description Default
name str | None

Source name. If None, returns the minimum across all registered sources (useful for synchronisation checks).

None

Raises:

Type Description
KeyError

If name is not registered.

Source code in src/redsun/storage/_base.py
def get_indices_written(self, name: str | None = None) -> int:
    """Return the number of frames written for a source.

    Parameters
    ----------
    name : str | None
        Source name.  If ``None``, returns the minimum across all
        registered sources (useful for synchronisation checks).

    Raises
    ------
    KeyError
        If *name* is not registered.
    """
    if name is None:
        if not self._sources:
            return 0
        return min(s.frames_written for s in self._sources.values())
    if name not in self._sources.keys():
        raise KeyError(f"Unknown source {name!r}")
    return self._sources[name].frames_written

reset_collection_state

reset_collection_state(name: str) -> None

Reset the stream-document counters for name.

Parameters:

Name Type Description Default
name str

Source name to reset.

required
Source code in src/redsun/storage/_base.py
def reset_collection_state(self, name: str) -> None:
    """Reset the stream-document counters for *name*.

    Parameters
    ----------
    name : str
        Source name to reset.
    """
    source = self._sources[name]
    source.collection_counter = 0
    source.stream_resource_uid = str(uuid.uuid4())

kickoff abstractmethod

kickoff() -> None

Open the storage backend for a new acquisition.

Subclasses must call super().kickoff() to set is_open and to enforce the URI guard. Subsequent calls while already open must be no-ops.

Raises:

Type Description
RuntimeError

If uri has not been set yet.

Source code in src/redsun/storage/_base.py
@abc.abstractmethod
def kickoff(self) -> None:
    """Open the storage backend for a new acquisition.

    Subclasses must call ``super().kickoff()`` to set
    [`is_open`][redsun.storage.Writer.is_open] and to enforce the
    URI guard.  Subsequent calls while already open must be no-ops.

    Raises
    ------
    RuntimeError
        If [`uri`][redsun.storage.Writer.uri] has not been set yet.
    """
    if not self._uri:
        clear_metadata()
        raise RuntimeError(
            f"Writer ({self._name!r}, {self.mimetype!r}) has no URI. "
            "A presenter must call set_uri() before kickoff()."
        )
    if not self._is_open:
        self._metadata = snapshot_metadata()
        self._is_open = True

complete

complete(name: str) -> None

Mark acquisition complete for source name.

Called automatically by FrameSink.close. When the last registered source calls complete, the backend is finalised and is_open is reset. The writer instance remains in the registry and is ready for the next acquisition.

Parameters:

Name Type Description Default
name str

Source name signalling completion.

required
Source code in src/redsun/storage/_base.py
def complete(self, name: str) -> None:
    """Mark acquisition complete for source *name*.

    Called automatically by [`FrameSink.close`][redsun.storage.FrameSink.close].
    When the last registered source calls ``complete``, the backend
    is finalised and ``is_open`` is reset.  The writer instance
    remains in the registry and is ready for the next acquisition.

    Parameters
    ----------
    name : str
        Source name signalling completion.
    """
    self._sources[name].completed = True
    if all(s.completed for s in self._sources.values()):
        self._finalize()
        self._is_open = False
        clear_metadata()
        self.logger.debug("All sources complete; backend finalised.")

clear_sources

clear_sources() -> None

Remove all registered sources.

A presenter in charge of monitoring writing progress should take care to call this after each plan is finished.

Source code in src/redsun/storage/_base.py
def clear_sources(self) -> None:
    """Remove all registered sources.

    A presenter in charge of monitoring
    writing progress should take care to call this
    after each plan is finished.
    """
    self._sources.clear()
    self.logger.debug("Source cache reset.")

_write_frame abstractmethod

_write_frame(name: str, frame: NDArray[generic]) -> None

Write one frame to the backend.

Called by FrameSink.write under the writer lock.

Parameters:

Name Type Description Default
name str

Source name.

required
frame NDArray[generic]

Frame data to write.

required
Source code in src/redsun/storage/_base.py
@abc.abstractmethod
def _write_frame(self, name: str, frame: npt.NDArray[np.generic]) -> None:
    """Write one frame to the backend.

    Called by [`FrameSink.write`][redsun.storage.FrameSink.write]
    under the writer lock.

    Parameters
    ----------
    name : str
        Source name.
    frame : npt.NDArray[np.generic]
        Frame data to write.
    """
    ...

_finalize abstractmethod

_finalize() -> None

Close the backend after all sources have completed.

Source code in src/redsun/storage/_base.py
@abc.abstractmethod
def _finalize(self) -> None:
    """Close the backend after all sources have completed."""
    ...

collect_stream_docs

collect_stream_docs(
    name: str, indices_written: int
) -> Iterator[StreamAsset]

Yield StreamResource and StreamDatum documents for name.

Parameters:

Name Type Description Default
name str

Source name.

required
indices_written int

Number of frames to report in this call.

required

Yields:

Type Description
StreamAsset

Tuples of ("stream_resource", doc) or ("stream_datum", doc).

Raises:

Type Description
KeyError

If name is not registered.

Source code in src/redsun/storage/_base.py
def collect_stream_docs(
    self,
    name: str,
    indices_written: int,
) -> Iterator[StreamAsset]:
    """Yield ``StreamResource`` and ``StreamDatum`` documents for *name*.

    Parameters
    ----------
    name : str
        Source name.
    indices_written : int
        Number of frames to report in this call.

    Yields
    ------
    StreamAsset
        Tuples of ``("stream_resource", doc)`` or
        ``("stream_datum", doc)``.

    Raises
    ------
    KeyError
        If *name* is not registered.
    """
    if name not in self._sources:
        raise KeyError(f"Unknown source {name!r}")

    source = self._sources[name]

    if indices_written == 0:
        return

    frames_to_report = min(indices_written, source.frames_written)

    if source.collection_counter >= frames_to_report:
        return

    if source.collection_counter == 0:
        stream_resource: StreamResource = {
            "data_key": source.data_key,
            "mimetype": self.mimetype,
            "parameters": {"array_name": source.name},
            "uid": source.stream_resource_uid,
            "uri": self.uri,
        }
        yield ("stream_resource", stream_resource)

    stream_datum: StreamDatum = {
        "descriptor": "",
        "indices": {"start": source.collection_counter, "stop": frames_to_report},
        "seq_nums": {"start": 0, "stop": 0},
        "stream_resource": source.stream_resource_uid,
        "uid": f"{source.stream_resource_uid}/{source.collection_counter}",
    }
    yield ("stream_datum", stream_datum)

    source.collection_counter = frames_to_report

Device-facing handle for pushing frames to a storage backend.

Returned by [Writer.register][redsun.storage.Writer.register]. Devices write frames by calling write; the sink routes each frame to the backend and updates the frame counter atomically.

Calling close signals that no more frames will arrive from this source and triggers backend finalisation once all sources are complete.

Parameters:

Name Type Description Default
writer Writer

The writer that owns this sink.

required
name str

Source name this sink is bound to.

required
Source code in src/redsun/storage/_base.py
class FrameSink:
    """Device-facing handle for pushing frames to a storage backend.

    Returned by [`Writer.register`][redsun.storage.Writer.register].
    Devices write frames by calling [`write`][redsun.storage.FrameSink.write];
    the sink routes each frame to the backend and updates the frame counter
    atomically.

    Calling [`close`][redsun.storage.FrameSink.close] signals that no more
    frames will arrive from this source and triggers backend finalisation
    once all sources are complete.

    Parameters
    ----------
    writer : Writer
        The writer that owns this sink.
    name : str
        Source name this sink is bound to.
    """

    def __init__(self, writer: Writer, name: str) -> None:
        self._writer = writer
        self._name = name

    def write(self, frame: npt.NDArray[np.generic]) -> None:
        """Push ``frame`` to the storage backend.

        Thread-safe.

        Parameters
        ----------
        frame : npt.NDArray[np.generic]
            Array data to write. dtype and shape must match those declared
            in [`Writer.register`][redsun.storage.Writer.register].
        """
        with self._writer._lock:
            self._writer._write_frame(self._name, frame)
            self._writer._sources[self._name].frames_written += 1

    def close(self) -> None:
        """Signal that no more frames will be written from this sink.

        Delegates to [`Writer.complete`][redsun.storage.Writer.complete].
        """
        self._writer.complete(self._name)

write

write(frame: NDArray[generic]) -> None

Push frame to the storage backend.

Thread-safe.

Parameters:

Name Type Description Default
frame NDArray[generic]

Array data to write. dtype and shape must match those declared in [Writer.register][redsun.storage.Writer.register].

required
Source code in src/redsun/storage/_base.py
def write(self, frame: npt.NDArray[np.generic]) -> None:
    """Push ``frame`` to the storage backend.

    Thread-safe.

    Parameters
    ----------
    frame : npt.NDArray[np.generic]
        Array data to write. dtype and shape must match those declared
        in [`Writer.register`][redsun.storage.Writer.register].
    """
    with self._writer._lock:
        self._writer._write_frame(self._name, frame)
        self._writer._sources[self._name].frames_written += 1

close

close() -> None

Signal that no more frames will be written from this sink.

Delegates to Writer.complete.

Source code in src/redsun/storage/_base.py
def close(self) -> None:
    """Signal that no more frames will be written from this sink.

    Delegates to [`Writer.complete`][redsun.storage.Writer.complete].
    """
    self._writer.complete(self._name)

Path providers

PathInfo dataclass

Where and how a storage backend should write data for one device.

Note

For local file storage, the store_uri must be converted to a concrete filesystem path before use. This is the responsibility of the backend writer.

Source code in src/redsun/storage/_path.py
@dataclass
class PathInfo:
    """Where and how a storage backend should write data for one device.

    !!! note

        For local file storage, the `store_uri` must be converted to
        a concrete filesystem path before use. This is the responsibility
        of the backend writer.
    """

    store_uri: str
    """URI of the storage root."""
    array_key: str
    """Key (array name) within the store for this device's data."""

    capacity: int = 0
    """Capacity in frames.  ``0`` means unlimited."""

    mimetype_hint: str = "application/x-zarr"
    """MIME type hint for the backend.  Consumers may use this to select the correct reader."""

    extra: dict[str, Any] = field(default_factory=dict)
    """Optional backend-specific metadata (e.g. OME-Zarr axis labels, physical units).  Base writers ignore this field."""

store_uri instance-attribute

store_uri: str

URI of the storage root.

array_key instance-attribute

array_key: str

Key (array name) within the store for this device's data.

capacity class-attribute instance-attribute

capacity: int = 0

Capacity in frames. 0 means unlimited.

mimetype_hint class-attribute instance-attribute

mimetype_hint: str = 'application/x-zarr'

MIME type hint for the backend. Consumers may use this to select the correct reader.

extra class-attribute instance-attribute

extra: dict[str, Any] = field(default_factory=dict)

Optional backend-specific metadata (e.g. OME-Zarr axis labels, physical units). Base writers ignore this field.

FilenameProvider

Bases: Protocol

Callable that produces a filename stem for a given key.

Source code in src/redsun/storage/_path.py
@runtime_checkable
class FilenameProvider(Protocol):
    """Callable that produces a filename stem for a given key."""

    def __call__(self, key: str | None = None, group: str | None = None) -> str:
        """Return a filename stem for *key* and *group*.

        Parameters
        ----------
        key : str | None
            Discriminator passed by the caller — typically a plan name
            (when called from a presenter) or a device name (when called
            from a writer).  Implementations may ignore it if the filename
            is key-agnostic.
        group : str | None
            Writer group name (e.g. ``'default'``).  When provided,
            implementations should embed it in the filename so that
            outputs from different writer groups do not collide.

        Returns
        -------
        str
            A filename stem without extension.
        """
        ...

__call__

__call__(
    key: str | None = None, group: str | None = None
) -> str

Return a filename stem for key and group.

Parameters:

Name Type Description Default
key str | None

Discriminator passed by the caller — typically a plan name (when called from a presenter) or a device name (when called from a writer). Implementations may ignore it if the filename is key-agnostic.

None
group str | None

Writer group name (e.g. 'default'). When provided, implementations should embed it in the filename so that outputs from different writer groups do not collide.

None

Returns:

Type Description
str

A filename stem without extension.

Source code in src/redsun/storage/_path.py
def __call__(self, key: str | None = None, group: str | None = None) -> str:
    """Return a filename stem for *key* and *group*.

    Parameters
    ----------
    key : str | None
        Discriminator passed by the caller — typically a plan name
        (when called from a presenter) or a device name (when called
        from a writer).  Implementations may ignore it if the filename
        is key-agnostic.
    group : str | None
        Writer group name (e.g. ``'default'``).  When provided,
        implementations should embed it in the filename so that
        outputs from different writer groups do not collide.

    Returns
    -------
    str
        A filename stem without extension.
    """
    ...

PathProvider

Bases: Protocol

Callable that produces PathInfo for a given key.

Source code in src/redsun/storage/_path.py
@runtime_checkable
class PathProvider(Protocol):
    """Callable that produces [`PathInfo`][redsun.storage.PathInfo] for a given key."""

    def __call__(self, key: str | None = None, group: str | None = None) -> PathInfo:
        """Return path information for *key* and *group*.

        Parameters
        ----------
        key : str | None
            Discriminator passed by the caller — typically a plan name
            (when called from a presenter) or a device name (when called
            from a writer).
        group : str | None
            Writer group name (e.g. ``'default'``).  When provided,
            the path should reflect the group so that outputs from
            different writer groups do not collide.

        Returns
        -------
        [`PathInfo`][redsun.storage.PathInfo]
            Complete path and storage metadata.
        """
        ...

__call__

__call__(
    key: str | None = None, group: str | None = None
) -> PathInfo

Return path information for key and group.

Parameters:

Name Type Description Default
key str | None

Discriminator passed by the caller — typically a plan name (when called from a presenter) or a device name (when called from a writer).

None
group str | None

Writer group name (e.g. 'default'). When provided, the path should reflect the group so that outputs from different writer groups do not collide.

None

Returns:

Type Description
[`PathInfo`][redsun.storage.PathInfo]

Complete path and storage metadata.

Source code in src/redsun/storage/_path.py
def __call__(self, key: str | None = None, group: str | None = None) -> PathInfo:
    """Return path information for *key* and *group*.

    Parameters
    ----------
    key : str | None
        Discriminator passed by the caller — typically a plan name
        (when called from a presenter) or a device name (when called
        from a writer).
    group : str | None
        Writer group name (e.g. ``'default'``).  When provided,
        the path should reflect the group so that outputs from
        different writer groups do not collide.

    Returns
    -------
    [`PathInfo`][redsun.storage.PathInfo]
        Complete path and storage metadata.
    """
    ...

SessionPathProvider

Bases: PathProvider

Provides structured, session-scoped paths with per-key auto-increment counters.

Produces URIs of the form::

file:///<base_dir>/<session>/<YYYY_MM_DD>/<key>_<counter>

where <key> is the value passed to __call__ (e.g. the plan name), and <counter> is a zero-padded integer that increments independently for each distinct key. Calling with key=None uses "default" as the key.

The date segment is fixed at construction time so that a session started just before midnight does not split its files across two date directories.

Parameters:

Name Type Description Default
base_dir Path | None

Root directory for all output files. Defaults to ~/redsun-storage.

None
session str

Session name, used as the second path segment. Defaults to "default".

'default'
max_digits int

Zero-padding width for the counter. Defaults to 5.

5
mimetype_hint str

MIME type hint forwarded to PathInfo.

'application/x-zarr'
capacity int

Default frame capacity forwarded to PathInfo.

0

Examples:

    provider = SessionPathProvider(base_dir=Path("/data"), session="exp1")
    info = provider("live_stream")
    info.store_uri                                  # 'file:///data/exp1/2026_02_24/live_stream_00000'
    provider("live_stream").store_uri               # 'file:///data/exp1/2026_02_24/live_stream_00001'
    provider("snap").store_uri                      # 'file:///data/exp1/2026_02_24/snap_00000'
    provider("live_stream", group="cam").store_uri  # 'file:///data/exp1/2026_02_24/live_stream_cam_00000'
Source code in src/redsun/storage/_path.py
class SessionPathProvider(PathProvider):
    """Provides structured, session-scoped paths with per-key auto-increment counters.

    Produces URIs of the form::

        file:///<base_dir>/<session>/<YYYY_MM_DD>/<key>_<counter>

    where ``<key>`` is the value passed to [`__call__`][redsun.storage.SessionPathProvider.__call__] (e.g. the plan
    name), and ``<counter>`` is a zero-padded integer that increments
    independently for each distinct ``key``.  Calling with ``key=None``
    uses ``"default"`` as the key.

    The date segment is fixed at construction time so that a session
    started just before midnight does not split its files across two
    date directories.

    Parameters
    ----------
    base_dir :
        Root directory for all output files.
        Defaults to ``~/redsun-storage``.
    session : str
        Session name, used as the second path segment.
        Defaults to ``"default"``.
    max_digits : int
        Zero-padding width for the counter.  Defaults to ``5``.
    mimetype_hint : str
        MIME type hint forwarded to [`PathInfo`][redsun.storage.PathInfo].
    capacity : int
        Default frame capacity forwarded to [`PathInfo`][redsun.storage.PathInfo].

    Examples
    --------
    ```python
        provider = SessionPathProvider(base_dir=Path("/data"), session="exp1")
        info = provider("live_stream")
        info.store_uri                                  # 'file:///data/exp1/2026_02_24/live_stream_00000'
        provider("live_stream").store_uri               # 'file:///data/exp1/2026_02_24/live_stream_00001'
        provider("snap").store_uri                      # 'file:///data/exp1/2026_02_24/snap_00000'
        provider("live_stream", group="cam").store_uri  # 'file:///data/exp1/2026_02_24/live_stream_cam_00000'
    ```
    """

    def __init__(
        self,
        base_dir: Path | None = None,
        session: str = "default",
        max_digits: int = 5,
        mimetype_hint: str = "application/x-zarr",
        capacity: int = 0,
    ) -> None:
        self._base_dir = (
            base_dir if base_dir is not None else Path.home() / "redsun-storage"
        )
        self._session = session
        self._max_digits = max_digits
        self._mimetype_hint = mimetype_hint
        self._capacity = capacity
        self._date = datetime.datetime.now().strftime("%Y_%m_%d")
        self._counters: dict[str, int] = self._scan_existing()

    @property
    def session(self) -> str:
        """The active session name."""
        return self._session

    @session.setter
    def session(self, value: str) -> None:
        """Update the session name and rescan the new session directory.

        Counters are rebuilt from whatever already exists under
        ``<base_dir>/<value>/<date>/`` so that numbering continues
        correctly if the session was used in a previous run.
        """
        self._session = value
        self._time_resolved_path = self._base_dir / self._session / self._date
        self._time_resolved_path.mkdir(parents=True, exist_ok=True)
        self._counters = self._scan_existing()

    @property
    def base_dir(self) -> Path:
        """The root output directory."""
        return self._base_dir

    @base_dir.setter
    def base_dir(self, value: Path) -> None:
        """Update the root output directory and rescan.

        Counters are rebuilt from whatever already exists under
        ``<value>/<session>/<date>/`` so that numbering continues
        correctly if the directory was used in a previous run.
        """
        self._base_dir = value
        self._time_resolved_path = self._base_dir / self._session / self._date
        self._time_resolved_path.mkdir(parents=True, exist_ok=True)
        self._counters = self._scan_existing()

    def _scan_existing(self) -> dict[str, int]:
        """Scan the current date directory and return counters initialised from existing directories.

        Looks for directories directly inside
        ``<base_dir>/<session>/<date>/`` whose names match
        ``<key>_<N>`` (last ``_``-delimited segment is a pure integer).
        For each key, the counter is set to ``max(N) + 1`` so the next
        call produces a name that does not collide with existing data.

        Entries whose names cannot be parsed are silently ignored.
        If the directory does not exist yet, an empty dict is returned.

        Returns
        -------
        dict[str, int]
            Mapping of key to next counter value.
        """
        directory = self._base_dir / self._session / self._date
        counters: dict[str, int] = {}
        if not directory.is_dir():
            return counters
        for entry in directory.iterdir():
            if not entry.is_dir():
                continue
            name = entry.name
            parts = name.rsplit("_", 1)
            if len(parts) != 2:
                continue
            key, suffix = parts
            if not suffix.isdigit():
                continue
            n = int(suffix)
            if n + 1 > counters.get(key, 0):
                counters[key] = n + 1
        return counters

    def __call__(self, key: str | None = None, group: str | None = None) -> PathInfo:
        """Return a fresh [`PathInfo`][redsun.storage.PathInfo] for *key* and advance its counter.

        Parameters
        ----------
        key : str | None
            Discriminator for the counter bucket — typically a plan name
            (e.g. ``"live_stream"``, ``"snap"``).
            ``None`` maps to ``"default"``.
        group : str | None
            Writer group name (e.g. ``"default"``).  When provided the
            filename becomes ``<key>_<group>_<counter>`` and the counter
            is tracked independently per ``(key, group)`` pair so that
            different writer groups never collide.

        Returns
        -------
        PathInfo
            Path rooted at
            ``<base_dir>/<session>/<YYYY_MM_DD>/<key>[_<group>]_<counter>``.
        """
        resolved_key = key or "default"
        bucket = f"{resolved_key}_{group}" if group else resolved_key
        current = self._counters.get(bucket, 0)

        if len(str(current)) > self._max_digits:
            raise ValueError(
                f"Counter for key {bucket!r} exceeded "
                f"maximum of {self._max_digits} digits"
            )

        padded = f"{current:0{self._max_digits}}"
        stem = (
            f"{resolved_key}_{group}_{padded}" if group else f"{resolved_key}_{padded}"
        )
        directory = self._base_dir / self._session / self._date
        store_uri = f"file://{directory.as_posix()}/{stem}"

        self._counters[bucket] = current + 1

        return PathInfo(
            store_uri=store_uri,
            array_key=resolved_key,
            capacity=self._capacity,
            mimetype_hint=self._mimetype_hint,
        )

__call__

__call__(
    key: str | None = None, group: str | None = None
) -> PathInfo

Return a fresh PathInfo for key and advance its counter.

Parameters:

Name Type Description Default
key str | None

Discriminator for the counter bucket — typically a plan name (e.g. "live_stream", "snap"). None maps to "default".

None
group str | None

Writer group name (e.g. "default"). When provided the filename becomes <key>_<group>_<counter> and the counter is tracked independently per (key, group) pair so that different writer groups never collide.

None

Returns:

Type Description
PathInfo

Path rooted at <base_dir>/<session>/<YYYY_MM_DD>/<key>[_<group>]_<counter>.

Source code in src/redsun/storage/_path.py
def __call__(self, key: str | None = None, group: str | None = None) -> PathInfo:
    """Return a fresh [`PathInfo`][redsun.storage.PathInfo] for *key* and advance its counter.

    Parameters
    ----------
    key : str | None
        Discriminator for the counter bucket — typically a plan name
        (e.g. ``"live_stream"``, ``"snap"``).
        ``None`` maps to ``"default"``.
    group : str | None
        Writer group name (e.g. ``"default"``).  When provided the
        filename becomes ``<key>_<group>_<counter>`` and the counter
        is tracked independently per ``(key, group)`` pair so that
        different writer groups never collide.

    Returns
    -------
    PathInfo
        Path rooted at
        ``<base_dir>/<session>/<YYYY_MM_DD>/<key>[_<group>]_<counter>``.
    """
    resolved_key = key or "default"
    bucket = f"{resolved_key}_{group}" if group else resolved_key
    current = self._counters.get(bucket, 0)

    if len(str(current)) > self._max_digits:
        raise ValueError(
            f"Counter for key {bucket!r} exceeded "
            f"maximum of {self._max_digits} digits"
        )

    padded = f"{current:0{self._max_digits}}"
    stem = (
        f"{resolved_key}_{group}_{padded}" if group else f"{resolved_key}_{padded}"
    )
    directory = self._base_dir / self._session / self._date
    store_uri = f"file://{directory.as_posix()}/{stem}"

    self._counters[bucket] = current + 1

    return PathInfo(
        store_uri=store_uri,
        array_key=resolved_key,
        capacity=self._capacity,
        mimetype_hint=self._mimetype_hint,
    )

Metadata

Module-level metadata registry for device contributions.

Non-imaging devices (motors, lights, etc.) call register_metadata during their prepare() method to contribute acquisition-time metadata. Writer backends snapshot the registry at Writer.kickoff and clear it when the last source completes via Writer.complete.

register_metadata

register_metadata(
    name: str, metadata: dict[str, Any]
) -> None

Stage metadata for device name.

Parameters:

Name Type Description Default
name str

Device name used as the registry key.

required
metadata dict[str, Any]

JSON-serializable metadata contributed by the device.

required
Source code in src/redsun/storage/metadata.py
def register_metadata(name: str, metadata: dict[str, Any]) -> None:
    """Stage metadata for device *name*.

    Parameters
    ----------
    name : str
        Device name used as the registry key.
    metadata : dict[str, Any]
        JSON-serializable metadata contributed by the device.
    """
    _registry[name] = metadata

snapshot_metadata

snapshot_metadata() -> dict[str, dict[str, Any]]

Return a copy of the current registry.

Source code in src/redsun/storage/metadata.py
def snapshot_metadata() -> dict[str, dict[str, Any]]:
    """Return a copy of the current registry."""
    return copy.deepcopy(_registry)

clear_metadata

clear_metadata() -> None

Clear the registry.

Called by Writer.complete after the last source finalises, and by Writer.kickoff if the backend fails to open.

Source code in src/redsun/storage/metadata.py
def clear_metadata() -> None:
    """Clear the registry.

    Called by [`Writer.complete`][redsun.storage.Writer.complete] after the last
    source finalises, and by [`Writer.kickoff`][redsun.storage.Writer.kickoff]
    if the backend fails to open.
    """
    _registry.clear()

Stage metadata for device name.

Parameters:

Name Type Description Default
name str

Device name used as the registry key.

required
metadata dict[str, Any]

JSON-serializable metadata contributed by the device.

required
Source code in src/redsun/storage/metadata.py
def register_metadata(name: str, metadata: dict[str, Any]) -> None:
    """Stage metadata for device *name*.

    Parameters
    ----------
    name : str
        Device name used as the registry key.
    metadata : dict[str, Any]
        JSON-serializable metadata contributed by the device.
    """
    _registry[name] = metadata

Clear the registry.

Called by Writer.complete after the last source finalises, and by Writer.kickoff if the backend fails to open.

Source code in src/redsun/storage/metadata.py
def clear_metadata() -> None:
    """Clear the registry.

    Called by [`Writer.complete`][redsun.storage.Writer.complete] after the last
    source finalises, and by [`Writer.kickoff`][redsun.storage.Writer.kickoff]
    if the backend fails to open.
    """
    _registry.clear()

Factory & utilities

PrepareInfo dataclass

Plan-time information passed to device prepare() methods.

Warning

These are still experimental. New fields may be added or existing fields may change.

Source code in src/redsun/storage/__init__.py
@dataclass
class PrepareInfo:
    """Plan-time information passed to device ``prepare()`` methods.

    !!! warning

        These are still experimental. New fields may be added
        or existing fields may change.

    """

    capacity: int = 0
    """Number of frames to prepare for.  ``0`` means unlimited."""

    write_forever: bool = False
    """Whether the device should prepare to write indefinitely (e.g. for live streaming)."""

capacity class-attribute instance-attribute

capacity: int = 0

Number of frames to prepare for. 0 means unlimited.

write_forever class-attribute instance-attribute

write_forever: bool = False

Whether the device should prepare to write indefinitely (e.g. for live streaming).

make_writer

make_writer(mimetype: str, name: str = 'default') -> Writer

Return the singleton writer for (name, mimetype).

Delegates to the appropriate Writer.get, depending on mimetype, so that all devices get the same writer instance for the same (name, mimetype).

Parameters:

Name Type Description Default
mimetype str

Backend format. Currently only "application/x-zarr" is supported.

required
name str

Store group name. All devices that should write into the same physical store must use the same name. Defaults to "default", which is correct for the common single-store case.

'default'

Returns:

Type Description
Writer

Singleton writer instance for (name, mimetype).

Raises:

Type Description
ValueError

If mimetype is not a recognised format.

Source code in src/redsun/storage/device.py
def make_writer(
    mimetype: str,
    name: str = "default",
) -> Writer:
    """Return the singleton writer for *(name, mimetype)*.

    Delegates to the appropriate
    [Writer.get][redsun.storage.Writer.get], depending on *mimetype*,
    so that all devices get the same writer instance for the same *(name, mimetype)*.

    Parameters
    ----------
    mimetype : str
        Backend format.  Currently only ``"application/x-zarr"`` is
        supported.
    name : str
        Store group name.  All devices that should write into the same
        physical store must use the same name.  Defaults to
        ``"default"``, which is correct for the common single-store
        case.

    Returns
    -------
    Writer
        Singleton writer instance for *(name, mimetype)*.

    Raises
    ------
    ValueError
        If *mimetype* is not a recognised format.
    """
    # TODO: this try catch is wrong, need rethinking
    try:
        if mimetype == "application/x-zarr":
            return ZarrWriter.get(name)
        raise ValueError(f"Unsupported mimetype: {mimetype!r}")
    except ImportError as e:
        raise ValueError(f"Cannot create writer for mimetype {mimetype!r}") from e

get_available_writers

get_available_writers() -> dict[str, dict[str, Writer]]

Return all registered writers grouped by mimetype.

Provides a presenter with a view of the current writer registry, grouped for convenient iteration by format. The outer key is the mimetype string (e.g. "application/x-zarr"); the inner key is the store group name (e.g. "default").

Returns:

Type Description
dict[str, dict[str, Writer]]

{mimetype: {group_name: writer}}

Source code in src/redsun/storage/presenter.py
def get_available_writers() -> dict[str, dict[str, Writer]]:
    """Return all registered writers grouped by mimetype.

    Provides a presenter with a view of the current writer
    registry, grouped for convenient iteration by format.  The outer
    key is the mimetype string (e.g. ``"application/x-zarr"``); the
    inner key is the store group name (e.g. ``"default"``).

    Returns
    -------
    dict[str, dict[str, Writer]]
        ``{mimetype: {group_name: writer}}``
    """
    result: dict[str, dict[str, Writer]] = {}
    with Writer._registry_lock:
        for (name, mimetype), writer in Writer._registry.items():
            result.setdefault(mimetype, {})[name] = writer
    return result

from_uri

from_uri(uri: str) -> str

Convert a URI to a filesystem path if local, otherwise return as-is.

Parameters:

Name Type Description Default
uri str

The URI to convert.

required

Returns:

Type Description
str

The filesystem path if the URI is a local file URI, otherwise the original URI.

Source code in src/redsun/storage/utils.py
def from_uri(uri: str) -> str:
    """Convert a URI to a filesystem path if local, otherwise return as-is.

    Parameters
    ----------
    uri : str
        The URI to convert.

    Returns
    -------
    str
        The filesystem path if the URI is a local file URI, otherwise the original URI.
    """
    parsed = urlparse(uri)
    if parsed.scheme == "file":
        return url2pathname(parsed.path)
    return uri

Protocols

Storage-level protocols for redsun.

Defines structural protocols that devices can implement to declare an association with a storage writer.

HasWriter

Bases: Protocol

Protocol for devices that are paired with a storage writer.

Implementing this protocol allows the storage presenter to discover the writer associated with a device automatically, without requiring explicit wiring in the application configuration.

Source code in src/redsun/storage/protocols.py
@runtime_checkable
class HasWriter(Protocol):
    """Protocol for devices that are paired with a storage writer.

    Implementing this protocol allows the storage presenter to discover
    the writer associated with a device automatically, without requiring
    explicit wiring in the application configuration.
    """

    def get_writer(self) -> Writer:
        """Return the writer associated with this device.

        Returns
        -------
        Writer
            The storage writer paired with this device.
        """
        ...

get_writer

get_writer() -> Writer

Return the writer associated with this device.

Returns:

Type Description
Writer

The storage writer paired with this device.

Source code in src/redsun/storage/protocols.py
def get_writer(self) -> Writer:
    """Return the writer associated with this device.

    Returns
    -------
    Writer
        The storage writer paired with this device.
    """
    ...