Skip to content

Analysis core

These core types appear in velocity, diameter, sum intensity, and batch analysis signatures. They are included here so generated API pages can explain the objects passed into and returned from analysis code.

Bases: ABC

Base class for one ROI/channel analysis instance.

Derived classes define a stable analysis_name, optional detection_schema, and a concrete :meth:run method. The base class owns analysis identity, validated detection parameters, dirty state, serialization helpers, and common table utilities.

Detection parameters describe scientific behavior and are serialized with the analysis. Runtime execution options, such as multiprocessing worker counts, should live on derived classes and should not be serialized unless they change scientific results.

Parameters:

Name Type Description Default
channel int

Zero-based channel index for this analysis.

required
roi_id int

ROI identifier for this analysis.

required
detection_params dict[str, Any] | None

Optional detection parameter values. Missing values are filled from detection_schema defaults when available.

None
Source code in src/acqstore/acq_image/analysis/model.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
class BaseAnalysis(ABC):
    """Base class for one ROI/channel analysis instance.

    Derived classes define a stable ``analysis_name``, optional
    ``detection_schema``, and a concrete :meth:`run` method. The base class owns
    analysis identity, validated detection parameters, dirty state,
    serialization helpers, and common table utilities.

    Detection parameters describe scientific behavior and are serialized with
    the analysis. Runtime execution options, such as multiprocessing worker
    counts, should live on derived classes and should not be serialized unless
    they change scientific results.

    Args:
        channel: Zero-based channel index for this analysis.
        roi_id: ROI identifier for this analysis.
        detection_params: Optional detection parameter values. Missing values
            are filled from ``detection_schema`` defaults when available.
    """

    analysis_name: ClassVar[str]
    depends_on: ClassVar[tuple[str, ...]] = ()
    detection_schema: ClassVar[tuple[Any, ...]] = ()
    summary_columns: ClassVar[tuple[str, ...]] = ()
    analysis_version: ClassVar[int | float | str | None] = None
    exclusive_group: ClassVar[str | None] = None

    def __init__(
        self,
        *,
        channel: int,
        roi_id: int,
        detection_params: dict[str, Any] | None = None,
    ) -> None:
        self.key = AnalysisKey(
            analysis_name=self.analysis_name,
            channel=channel,
            roi_id=roi_id,
        )
        self.detection_params = self.get_default_detection_params()
        if detection_params is not None:
            self.validate_detection_params(detection_params)
            self.detection_params.update(detection_params)
        self.result = AnalysisResult()
        self._dirty = False

    def is_dirty(self) -> bool:
        """Return whether this analysis has unsaved changes.

        Returns:
            True if this analysis has unsaved changes.
        """
        return self._dirty

    def set_dirty(self) -> None:
        """Mark this analysis dirty.

        Returns:
            None.
        """
        self._dirty = True

    def set_clean(self) -> None:
        """Mark this analysis clean.

        Returns:
            None.
        """
        self._dirty = False

    @classmethod
    def get_detection_schema(cls) -> tuple[DetectionParamSchema, ...]:
        """Return detection parameter schema.

        Returns:
            Tuple of ``DetectionParamSchema`` entries.

        Raises:
            TypeError: If the class ``detection_schema`` contains non-schema
                entries.
        """
        schema: list[DetectionParamSchema] = []
        for entry in cls.detection_schema:
            if not isinstance(entry, DetectionParamSchema):
                raise TypeError(
                    f"{cls.__name__}.detection_schema must contain DetectionParamSchema "
                    f"entries, got: {type(entry).__name__}"
                )
            schema.append(entry)
        return tuple(schema)

    @classmethod
    def get_detection_schema_dataframe(cls) -> pd.DataFrame:
        """Return this analysis type's detection-parameter schema as a DataFrame.

        This is a scripting and documentation convenience that describes the
        full detection-parameter schema. It is available on every analysis type
        (for example ``RadonVelocityAnalysis``, ``DiameterAnalysis``,
        ``EventAnalysis``, ``HeartRateAnalysis``).

        Returns:
            DataFrame indexed by parameter ``name`` with one row per detection
            parameter. Columns are ``display_name``, ``type``, ``default``,
            ``choices``, ``unit``, ``editable``, ``visible``, ``methods``,
            ``category``, and ``description``. The DataFrame is empty (columns only) when the
            analysis declares no detection parameters.

        Raises:
            TypeError: If the class ``detection_schema`` contains non-schema
                entries.
        """
        columns = [
            "name",
            "display_name",
            "type",
            "default",
            "choices",
            "unit",
            "editable",
            "visible",
            "methods",
            "category",
            "description",
        ]
        rows = [
            {
                "name": entry.name,
                "display_name": entry.display_name,
                "type": entry.value_type.value,
                "default": entry.default,
                "choices": entry.choices,
                "unit": entry.unit,
                "editable": entry.editable,
                "visible": entry.visible,
                "methods": entry.methods,
                "category": entry.category.value if entry.category is not None else None,
                "description": entry.description,
            }
            for entry in cls.get_detection_schema()
        ]
        return pd.DataFrame(rows, columns=columns).set_index("name")

    def finalize_summary(self, summary: dict[str, Any]) -> dict[str, Any]:
        """Merge run metadata into a summary with metadata keys first.

        Args:
            summary: Analysis-local summary produced by the derived ``run``
                implementation before metadata is applied.

        Returns:
            Summary dictionary with ``analysis_date``, ``analysis_time``, and
            optional ``analysis_version`` prepended before remaining keys.
        """
        now = datetime.now()
        metadata: dict[str, Any] = {
            "analysis_date": now.strftime("%y%m%d"),
            "analysis_time": f"{now.strftime('%H:%M:%S')}.{now.microsecond // 1000:03d}",
        }
        if self.analysis_version is not None:
            metadata["analysis_version"] = self.analysis_version

        metadata_keys = set(RUN_SUMMARY_METADATA_KEYS)
        rest = {key: value for key, value in summary.items() if key not in metadata_keys}
        ordered: dict[str, Any] = {}
        ordered.update(metadata)
        ordered.update(rest)
        return ordered

    @classmethod
    def get_summary_columns(cls) -> tuple[str, ...]:
        """Return flat summary keys intended for analysis-pool tables.

        Derived analyses declare this class-level schema so collection-level
        pools can create stable DataFrame columns even when a given
        ``(channel, roi_id)`` has no completed analysis. Column names returned
        here are analysis-local names. Pool classes map them to unique pool
        column names, prepending the pool spec prefix unless a key already
        starts with that prefix.

        Returns:
            Tuple of stable analysis-local summary column names.
        """
        return tuple(str(column) for column in cls.summary_columns)

    def get_summary_values(self) -> dict[str, object]:
        """Return flat summary values for analysis-pool tables.

        The default implementation selects values from
        :attr:`AnalysisResult.summary` using :meth:`get_summary_columns` and
        fills missing keys with :data:`pandas.NA`. Analyses with nested summary
        dictionaries should override this method and still return exactly the
        keys declared by :meth:`get_summary_columns`.

        Returns:
            Mapping from analysis-local summary column name to scalar value.
        """
        return {
            key: self.result.summary.get(key, pd.NA)
            for key in self.get_summary_columns()
        }

    @classmethod
    def get_default_detection_params(cls) -> dict[str, Any]:
        """Return default detection parameters from ``detection_schema``.

        Returns:
            Mapping from parameter name to default value.

        Raises:
            ValueError: If the schema contains duplicate parameter names.
        """
        defaults: dict[str, Any] = {}
        for field_schema in cls.get_detection_schema():
            if field_schema.name in defaults:
                raise ValueError(
                    f"Duplicate detection param schema name: {field_schema.name!r}"
                )
            defaults[field_schema.name] = field_schema.default
        return defaults

    @classmethod
    def validate_detection_params(cls, params: dict[str, Any]) -> None:
        """Validate detection parameter mapping against schema.

        Args:
            params: Detection parameter mapping.

        Returns:
            None.

        Raises:
            KeyError: If any key is not present in the schema.
            TypeError: If any value has the wrong type.
            ValueError: If any enum value is not in ``choices``.
        """
        if not isinstance(params, dict):
            raise TypeError(f"detection_params must be dict, got: {type(params).__name__}")

        schema_by_name = {entry.name: entry for entry in cls.get_detection_schema()}
        for key, value in params.items():
            if key not in schema_by_name:
                raise KeyError(f"Unknown detection param: {key!r}")

            entry = schema_by_name[key]
            match entry.value_type:
                case DetectionValueType.INT:
                    if isinstance(value, bool) or not isinstance(value, int):
                        raise TypeError(f"{key!r} must be int, got: {type(value).__name__}")
                case DetectionValueType.FLOAT:
                    if isinstance(value, bool) or not isinstance(value, (int, float)):
                        raise TypeError(
                            f"{key!r} must be float or int, got: {type(value).__name__}"
                        )
                case DetectionValueType.BOOL:
                    if not isinstance(value, bool):
                        raise TypeError(f"{key!r} must be bool, got: {type(value).__name__}")
                case DetectionValueType.STR:
                    if not isinstance(value, str):
                        raise TypeError(f"{key!r} must be str, got: {type(value).__name__}")
                case DetectionValueType.ENUM:
                    if entry.choices is None:
                        raise ValueError(f"{key!r} has value_type=ENUM but no choices")
                case _:
                    raise ValueError(f"Unsupported detection value type: {entry.value_type!r}")

            if entry.choices is not None and value not in entry.choices:
                raise ValueError(f"{key!r} must be one of {entry.choices!r}, got: {value!r}")

    @abstractmethod
    def run(
        self,
        data_provider: AnalysisDataProvider,
        *,
        context: AnalysisRunContext | None = None,
        dependencies: dict[str, BaseAnalysis] | None = None,
    ) -> AnalysisResult:
        """Run analysis using a narrow data-provider API.

        Derived implementations should read pixels and physical units only
        through ``data_provider``. They should use ``context`` for progress and
        cancellation, respect dependency analyses when declared, store results in
        ``self.result``, and call ``self.set_dirty()`` after mutating results.

        Args:
            data_provider: Minimal image/header access provider.
            context: Optional runtime context for progress and cancellation.
            dependencies: Dependency analyses keyed by analysis name.

        Returns:
            Analysis result. Implementations usually return ``self.result``.
        """
        raise NotImplementedError

    def get_plot_data(self) -> AnalysisPlotData | None:
        """Return display-ready plot data for this analysis.

        Derived analyses override this when they have a canonical x/y plot.

        Returns:
            Plot data, or None when the analysis has no canonical plot.
        """
        return None

    def get_overlay_traces(self) -> tuple[AnalysisOverlayTraceData, ...]:
        """Return ROI-local trace overlays for raster viewers.

        Derived analyses override this when they expose edge or path overlays.
        Coordinates are ROI-local physical units; GUI layers translate to
        full-image Plotly coordinates.

        Returns:
            Tuple of overlay trace data. Empty when no overlays exist.
        """
        return ()

    def has_table(self) -> bool:
        """Return whether this analysis has table output.

        Returns:
            True if result table exists.
        """
        return self.result.table is not None

    def get_table_columns(self) -> list[str]:
        """Return result table column names.

        Returns:
            Column names, or an empty list when no table exists.
        """
        if self.result.table is None:
            return []
        return list(self.result.table.columns)

    def get_column(self, name: str) -> list[Any]:
        """Return one result table column as a list.

        Args:
            name: Column name.

        Returns:
            Column values as a list.

        Raises:
            ValueError: If this analysis has no table.
            KeyError: If the column does not exist.
        """
        if self.result.table is None:
            raise ValueError(f"Analysis {self.analysis_name!r} has no table")
        if name not in self.result.table.columns:
            raise KeyError(f"Column not found: {name!r}")
        return self.result.table[name].tolist()

    def table_with_bookkeeping(self) -> pd.DataFrame | None:
        """Return table with channel and ROI bookkeeping columns.

        Returns:
            DataFrame with ``channel`` and ``roi_id`` columns added, or None if
            the analysis has no table output.

        Raises:
            ValueError: If result table already contains reserved bookkeeping
                columns.
        """
        if self.result.table is None:
            return None

        reserved = {"channel", "roi_id"}
        overlap = reserved.intersection(self.result.table.columns)
        if overlap:
            raise ValueError(
                f"Analysis table already has reserved columns: {sorted(overlap)}"
            )

        df = self.result.table.copy()
        df.insert(0, "roi_id", self.key.roi_id)
        df.insert(0, "channel", self.key.channel)
        return df

    def to_json_dict(self) -> dict[str, Any]:
        """Return JSON-serializable analysis record.

        Returns:
            Dictionary containing identity, detection params, and summary.
        """
        return {
            "analysis_name": self.key.analysis_name,
            "channel": self.key.channel,
            "roi_id": self.key.roi_id,
            "detection_params": dict(self.detection_params),
            "summary": dict(self.result.summary),
        }

    def load_json_dict(self, record: dict[str, Any]) -> None:
        """Load detection params and summary from a JSON record.

        Args:
            record: Analysis record from source sidecar JSON.

        Returns:
            None.
        """
        self.detection_params = dict(record.get("detection_params", {}))
        self.result.summary = dict(record.get("summary", {}))

    def save_record_json(self, path: str | Path) -> None:
        """Save this analysis record to a standalone JSON file.

        This helper is useful in tests. Normal AcqImage save code should use
        ``AcqAnalysisSet.serialize_json_analysis()`` and merge records into the
        source sidecar JSON.

        Args:
            path: Output JSON file path.

        Returns:
            None.
        """
        Path(path).write_text(json.dumps(self.to_json_dict(), indent=2))

    def load_record_json(self, path: str | Path) -> None:
        """Load this analysis record from a standalone JSON file.

        Args:
            path: Input JSON file path.

        Returns:
            None.
        """
        self.load_json_dict(json.loads(Path(path).read_text()))

is_dirty

is_dirty() -> bool

Return whether this analysis has unsaved changes.

Returns:

Type Description
bool

True if this analysis has unsaved changes.

Source code in src/acqstore/acq_image/analysis/model.py
326
327
328
329
330
331
332
def is_dirty(self) -> bool:
    """Return whether this analysis has unsaved changes.

    Returns:
        True if this analysis has unsaved changes.
    """
    return self._dirty

set_dirty

set_dirty() -> None

Mark this analysis dirty.

Returns:

Type Description
None

None.

Source code in src/acqstore/acq_image/analysis/model.py
334
335
336
337
338
339
340
def set_dirty(self) -> None:
    """Mark this analysis dirty.

    Returns:
        None.
    """
    self._dirty = True

set_clean

set_clean() -> None

Mark this analysis clean.

Returns:

Type Description
None

None.

Source code in src/acqstore/acq_image/analysis/model.py
342
343
344
345
346
347
348
def set_clean(self) -> None:
    """Mark this analysis clean.

    Returns:
        None.
    """
    self._dirty = False

get_detection_schema classmethod

get_detection_schema() -> tuple[DetectionParamSchema, ...]

Return detection parameter schema.

Returns:

Type Description
tuple[DetectionParamSchema, ...]

Tuple of DetectionParamSchema entries.

Raises:

Type Description
TypeError

If the class detection_schema contains non-schema entries.

Source code in src/acqstore/acq_image/analysis/model.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
@classmethod
def get_detection_schema(cls) -> tuple[DetectionParamSchema, ...]:
    """Return detection parameter schema.

    Returns:
        Tuple of ``DetectionParamSchema`` entries.

    Raises:
        TypeError: If the class ``detection_schema`` contains non-schema
            entries.
    """
    schema: list[DetectionParamSchema] = []
    for entry in cls.detection_schema:
        if not isinstance(entry, DetectionParamSchema):
            raise TypeError(
                f"{cls.__name__}.detection_schema must contain DetectionParamSchema "
                f"entries, got: {type(entry).__name__}"
            )
        schema.append(entry)
    return tuple(schema)

get_detection_schema_dataframe classmethod

get_detection_schema_dataframe() -> pd.DataFrame

Return this analysis type's detection-parameter schema as a DataFrame.

This is a scripting and documentation convenience that describes the full detection-parameter schema. It is available on every analysis type (for example RadonVelocityAnalysis, DiameterAnalysis, EventAnalysis, HeartRateAnalysis).

Returns:

Type Description
DataFrame

DataFrame indexed by parameter name with one row per detection

DataFrame

parameter. Columns are display_name, type, default,

DataFrame

choices, unit, editable, visible, methods,

DataFrame

category, and description. The DataFrame is empty (columns only) when the

DataFrame

analysis declares no detection parameters.

Raises:

Type Description
TypeError

If the class detection_schema contains non-schema entries.

Source code in src/acqstore/acq_image/analysis/model.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
@classmethod
def get_detection_schema_dataframe(cls) -> pd.DataFrame:
    """Return this analysis type's detection-parameter schema as a DataFrame.

    This is a scripting and documentation convenience that describes the
    full detection-parameter schema. It is available on every analysis type
    (for example ``RadonVelocityAnalysis``, ``DiameterAnalysis``,
    ``EventAnalysis``, ``HeartRateAnalysis``).

    Returns:
        DataFrame indexed by parameter ``name`` with one row per detection
        parameter. Columns are ``display_name``, ``type``, ``default``,
        ``choices``, ``unit``, ``editable``, ``visible``, ``methods``,
        ``category``, and ``description``. The DataFrame is empty (columns only) when the
        analysis declares no detection parameters.

    Raises:
        TypeError: If the class ``detection_schema`` contains non-schema
            entries.
    """
    columns = [
        "name",
        "display_name",
        "type",
        "default",
        "choices",
        "unit",
        "editable",
        "visible",
        "methods",
        "category",
        "description",
    ]
    rows = [
        {
            "name": entry.name,
            "display_name": entry.display_name,
            "type": entry.value_type.value,
            "default": entry.default,
            "choices": entry.choices,
            "unit": entry.unit,
            "editable": entry.editable,
            "visible": entry.visible,
            "methods": entry.methods,
            "category": entry.category.value if entry.category is not None else None,
            "description": entry.description,
        }
        for entry in cls.get_detection_schema()
    ]
    return pd.DataFrame(rows, columns=columns).set_index("name")

finalize_summary

finalize_summary(summary: dict[str, Any]) -> dict[str, Any]

Merge run metadata into a summary with metadata keys first.

Parameters:

Name Type Description Default
summary dict[str, Any]

Analysis-local summary produced by the derived run implementation before metadata is applied.

required

Returns:

Type Description
dict[str, Any]

Summary dictionary with analysis_date, analysis_time, and

dict[str, Any]

optional analysis_version prepended before remaining keys.

Source code in src/acqstore/acq_image/analysis/model.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def finalize_summary(self, summary: dict[str, Any]) -> dict[str, Any]:
    """Merge run metadata into a summary with metadata keys first.

    Args:
        summary: Analysis-local summary produced by the derived ``run``
            implementation before metadata is applied.

    Returns:
        Summary dictionary with ``analysis_date``, ``analysis_time``, and
        optional ``analysis_version`` prepended before remaining keys.
    """
    now = datetime.now()
    metadata: dict[str, Any] = {
        "analysis_date": now.strftime("%y%m%d"),
        "analysis_time": f"{now.strftime('%H:%M:%S')}.{now.microsecond // 1000:03d}",
    }
    if self.analysis_version is not None:
        metadata["analysis_version"] = self.analysis_version

    metadata_keys = set(RUN_SUMMARY_METADATA_KEYS)
    rest = {key: value for key, value in summary.items() if key not in metadata_keys}
    ordered: dict[str, Any] = {}
    ordered.update(metadata)
    ordered.update(rest)
    return ordered

get_summary_columns classmethod

get_summary_columns() -> tuple[str, ...]

Return flat summary keys intended for analysis-pool tables.

Derived analyses declare this class-level schema so collection-level pools can create stable DataFrame columns even when a given (channel, roi_id) has no completed analysis. Column names returned here are analysis-local names. Pool classes map them to unique pool column names, prepending the pool spec prefix unless a key already starts with that prefix.

Returns:

Type Description
tuple[str, ...]

Tuple of stable analysis-local summary column names.

Source code in src/acqstore/acq_image/analysis/model.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
@classmethod
def get_summary_columns(cls) -> tuple[str, ...]:
    """Return flat summary keys intended for analysis-pool tables.

    Derived analyses declare this class-level schema so collection-level
    pools can create stable DataFrame columns even when a given
    ``(channel, roi_id)`` has no completed analysis. Column names returned
    here are analysis-local names. Pool classes map them to unique pool
    column names, prepending the pool spec prefix unless a key already
    starts with that prefix.

    Returns:
        Tuple of stable analysis-local summary column names.
    """
    return tuple(str(column) for column in cls.summary_columns)

get_summary_values

get_summary_values() -> dict[str, object]

Return flat summary values for analysis-pool tables.

The default implementation selects values from :attr:AnalysisResult.summary using :meth:get_summary_columns and fills missing keys with :data:pandas.NA. Analyses with nested summary dictionaries should override this method and still return exactly the keys declared by :meth:get_summary_columns.

Returns:

Type Description
dict[str, object]

Mapping from analysis-local summary column name to scalar value.

Source code in src/acqstore/acq_image/analysis/model.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def get_summary_values(self) -> dict[str, object]:
    """Return flat summary values for analysis-pool tables.

    The default implementation selects values from
    :attr:`AnalysisResult.summary` using :meth:`get_summary_columns` and
    fills missing keys with :data:`pandas.NA`. Analyses with nested summary
    dictionaries should override this method and still return exactly the
    keys declared by :meth:`get_summary_columns`.

    Returns:
        Mapping from analysis-local summary column name to scalar value.
    """
    return {
        key: self.result.summary.get(key, pd.NA)
        for key in self.get_summary_columns()
    }

get_default_detection_params classmethod

get_default_detection_params() -> dict[str, Any]

Return default detection parameters from detection_schema.

Returns:

Type Description
dict[str, Any]

Mapping from parameter name to default value.

Raises:

Type Description
ValueError

If the schema contains duplicate parameter names.

Source code in src/acqstore/acq_image/analysis/model.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
@classmethod
def get_default_detection_params(cls) -> dict[str, Any]:
    """Return default detection parameters from ``detection_schema``.

    Returns:
        Mapping from parameter name to default value.

    Raises:
        ValueError: If the schema contains duplicate parameter names.
    """
    defaults: dict[str, Any] = {}
    for field_schema in cls.get_detection_schema():
        if field_schema.name in defaults:
            raise ValueError(
                f"Duplicate detection param schema name: {field_schema.name!r}"
            )
        defaults[field_schema.name] = field_schema.default
    return defaults

validate_detection_params classmethod

validate_detection_params(params: dict[str, Any]) -> None

Validate detection parameter mapping against schema.

Parameters:

Name Type Description Default
params dict[str, Any]

Detection parameter mapping.

required

Returns:

Type Description
None

None.

Raises:

Type Description
KeyError

If any key is not present in the schema.

TypeError

If any value has the wrong type.

ValueError

If any enum value is not in choices.

Source code in src/acqstore/acq_image/analysis/model.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
@classmethod
def validate_detection_params(cls, params: dict[str, Any]) -> None:
    """Validate detection parameter mapping against schema.

    Args:
        params: Detection parameter mapping.

    Returns:
        None.

    Raises:
        KeyError: If any key is not present in the schema.
        TypeError: If any value has the wrong type.
        ValueError: If any enum value is not in ``choices``.
    """
    if not isinstance(params, dict):
        raise TypeError(f"detection_params must be dict, got: {type(params).__name__}")

    schema_by_name = {entry.name: entry for entry in cls.get_detection_schema()}
    for key, value in params.items():
        if key not in schema_by_name:
            raise KeyError(f"Unknown detection param: {key!r}")

        entry = schema_by_name[key]
        match entry.value_type:
            case DetectionValueType.INT:
                if isinstance(value, bool) or not isinstance(value, int):
                    raise TypeError(f"{key!r} must be int, got: {type(value).__name__}")
            case DetectionValueType.FLOAT:
                if isinstance(value, bool) or not isinstance(value, (int, float)):
                    raise TypeError(
                        f"{key!r} must be float or int, got: {type(value).__name__}"
                    )
            case DetectionValueType.BOOL:
                if not isinstance(value, bool):
                    raise TypeError(f"{key!r} must be bool, got: {type(value).__name__}")
            case DetectionValueType.STR:
                if not isinstance(value, str):
                    raise TypeError(f"{key!r} must be str, got: {type(value).__name__}")
            case DetectionValueType.ENUM:
                if entry.choices is None:
                    raise ValueError(f"{key!r} has value_type=ENUM but no choices")
            case _:
                raise ValueError(f"Unsupported detection value type: {entry.value_type!r}")

        if entry.choices is not None and value not in entry.choices:
            raise ValueError(f"{key!r} must be one of {entry.choices!r}, got: {value!r}")

run abstractmethod

run(
    data_provider: AnalysisDataProvider,
    *,
    context: AnalysisRunContext | None = None,
    dependencies: dict[str, BaseAnalysis] | None = None,
) -> AnalysisResult

Run analysis using a narrow data-provider API.

Derived implementations should read pixels and physical units only through data_provider. They should use context for progress and cancellation, respect dependency analyses when declared, store results in self.result, and call self.set_dirty() after mutating results.

Parameters:

Name Type Description Default
data_provider AnalysisDataProvider

Minimal image/header access provider.

required
context AnalysisRunContext | None

Optional runtime context for progress and cancellation.

None
dependencies dict[str, BaseAnalysis] | None

Dependency analyses keyed by analysis name.

None

Returns:

Type Description
AnalysisResult

Analysis result. Implementations usually return self.result.

Source code in src/acqstore/acq_image/analysis/model.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
@abstractmethod
def run(
    self,
    data_provider: AnalysisDataProvider,
    *,
    context: AnalysisRunContext | None = None,
    dependencies: dict[str, BaseAnalysis] | None = None,
) -> AnalysisResult:
    """Run analysis using a narrow data-provider API.

    Derived implementations should read pixels and physical units only
    through ``data_provider``. They should use ``context`` for progress and
    cancellation, respect dependency analyses when declared, store results in
    ``self.result``, and call ``self.set_dirty()`` after mutating results.

    Args:
        data_provider: Minimal image/header access provider.
        context: Optional runtime context for progress and cancellation.
        dependencies: Dependency analyses keyed by analysis name.

    Returns:
        Analysis result. Implementations usually return ``self.result``.
    """
    raise NotImplementedError

get_plot_data

get_plot_data() -> AnalysisPlotData | None

Return display-ready plot data for this analysis.

Derived analyses override this when they have a canonical x/y plot.

Returns:

Type Description
AnalysisPlotData | None

Plot data, or None when the analysis has no canonical plot.

Source code in src/acqstore/acq_image/analysis/model.py
573
574
575
576
577
578
579
580
581
def get_plot_data(self) -> AnalysisPlotData | None:
    """Return display-ready plot data for this analysis.

    Derived analyses override this when they have a canonical x/y plot.

    Returns:
        Plot data, or None when the analysis has no canonical plot.
    """
    return None

get_overlay_traces

get_overlay_traces() -> tuple[
    AnalysisOverlayTraceData, ...
]

Return ROI-local trace overlays for raster viewers.

Derived analyses override this when they expose edge or path overlays. Coordinates are ROI-local physical units; GUI layers translate to full-image Plotly coordinates.

Returns:

Type Description
tuple[AnalysisOverlayTraceData, ...]

Tuple of overlay trace data. Empty when no overlays exist.

Source code in src/acqstore/acq_image/analysis/model.py
583
584
585
586
587
588
589
590
591
592
593
def get_overlay_traces(self) -> tuple[AnalysisOverlayTraceData, ...]:
    """Return ROI-local trace overlays for raster viewers.

    Derived analyses override this when they expose edge or path overlays.
    Coordinates are ROI-local physical units; GUI layers translate to
    full-image Plotly coordinates.

    Returns:
        Tuple of overlay trace data. Empty when no overlays exist.
    """
    return ()

has_table

has_table() -> bool

Return whether this analysis has table output.

Returns:

Type Description
bool

True if result table exists.

Source code in src/acqstore/acq_image/analysis/model.py
595
596
597
598
599
600
601
def has_table(self) -> bool:
    """Return whether this analysis has table output.

    Returns:
        True if result table exists.
    """
    return self.result.table is not None

get_table_columns

get_table_columns() -> list[str]

Return result table column names.

Returns:

Type Description
list[str]

Column names, or an empty list when no table exists.

Source code in src/acqstore/acq_image/analysis/model.py
603
604
605
606
607
608
609
610
611
def get_table_columns(self) -> list[str]:
    """Return result table column names.

    Returns:
        Column names, or an empty list when no table exists.
    """
    if self.result.table is None:
        return []
    return list(self.result.table.columns)

get_column

get_column(name: str) -> list[Any]

Return one result table column as a list.

Parameters:

Name Type Description Default
name str

Column name.

required

Returns:

Type Description
list[Any]

Column values as a list.

Raises:

Type Description
ValueError

If this analysis has no table.

KeyError

If the column does not exist.

Source code in src/acqstore/acq_image/analysis/model.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def get_column(self, name: str) -> list[Any]:
    """Return one result table column as a list.

    Args:
        name: Column name.

    Returns:
        Column values as a list.

    Raises:
        ValueError: If this analysis has no table.
        KeyError: If the column does not exist.
    """
    if self.result.table is None:
        raise ValueError(f"Analysis {self.analysis_name!r} has no table")
    if name not in self.result.table.columns:
        raise KeyError(f"Column not found: {name!r}")
    return self.result.table[name].tolist()

table_with_bookkeeping

table_with_bookkeeping() -> pd.DataFrame | None

Return table with channel and ROI bookkeeping columns.

Returns:

Type Description
DataFrame | None

DataFrame with channel and roi_id columns added, or None if

DataFrame | None

the analysis has no table output.

Raises:

Type Description
ValueError

If result table already contains reserved bookkeeping columns.

Source code in src/acqstore/acq_image/analysis/model.py
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def table_with_bookkeeping(self) -> pd.DataFrame | None:
    """Return table with channel and ROI bookkeeping columns.

    Returns:
        DataFrame with ``channel`` and ``roi_id`` columns added, or None if
        the analysis has no table output.

    Raises:
        ValueError: If result table already contains reserved bookkeeping
            columns.
    """
    if self.result.table is None:
        return None

    reserved = {"channel", "roi_id"}
    overlap = reserved.intersection(self.result.table.columns)
    if overlap:
        raise ValueError(
            f"Analysis table already has reserved columns: {sorted(overlap)}"
        )

    df = self.result.table.copy()
    df.insert(0, "roi_id", self.key.roi_id)
    df.insert(0, "channel", self.key.channel)
    return df

to_json_dict

to_json_dict() -> dict[str, Any]

Return JSON-serializable analysis record.

Returns:

Type Description
dict[str, Any]

Dictionary containing identity, detection params, and summary.

Source code in src/acqstore/acq_image/analysis/model.py
658
659
660
661
662
663
664
665
666
667
668
669
670
def to_json_dict(self) -> dict[str, Any]:
    """Return JSON-serializable analysis record.

    Returns:
        Dictionary containing identity, detection params, and summary.
    """
    return {
        "analysis_name": self.key.analysis_name,
        "channel": self.key.channel,
        "roi_id": self.key.roi_id,
        "detection_params": dict(self.detection_params),
        "summary": dict(self.result.summary),
    }

load_json_dict

load_json_dict(record: dict[str, Any]) -> None

Load detection params and summary from a JSON record.

Parameters:

Name Type Description Default
record dict[str, Any]

Analysis record from source sidecar JSON.

required

Returns:

Type Description
None

None.

Source code in src/acqstore/acq_image/analysis/model.py
672
673
674
675
676
677
678
679
680
681
682
def load_json_dict(self, record: dict[str, Any]) -> None:
    """Load detection params and summary from a JSON record.

    Args:
        record: Analysis record from source sidecar JSON.

    Returns:
        None.
    """
    self.detection_params = dict(record.get("detection_params", {}))
    self.result.summary = dict(record.get("summary", {}))

save_record_json

save_record_json(path: str | Path) -> None

Save this analysis record to a standalone JSON file.

This helper is useful in tests. Normal AcqImage save code should use AcqAnalysisSet.serialize_json_analysis() and merge records into the source sidecar JSON.

Parameters:

Name Type Description Default
path str | Path

Output JSON file path.

required

Returns:

Type Description
None

None.

Source code in src/acqstore/acq_image/analysis/model.py
684
685
686
687
688
689
690
691
692
693
694
695
696
697
def save_record_json(self, path: str | Path) -> None:
    """Save this analysis record to a standalone JSON file.

    This helper is useful in tests. Normal AcqImage save code should use
    ``AcqAnalysisSet.serialize_json_analysis()`` and merge records into the
    source sidecar JSON.

    Args:
        path: Output JSON file path.

    Returns:
        None.
    """
    Path(path).write_text(json.dumps(self.to_json_dict(), indent=2))

load_record_json

load_record_json(path: str | Path) -> None

Load this analysis record from a standalone JSON file.

Parameters:

Name Type Description Default
path str | Path

Input JSON file path.

required

Returns:

Type Description
None

None.

Source code in src/acqstore/acq_image/analysis/model.py
699
700
701
702
703
704
705
706
707
708
def load_record_json(self, path: str | Path) -> None:
    """Load this analysis record from a standalone JSON file.

    Args:
        path: Input JSON file path.

    Returns:
        None.
    """
    self.load_json_dict(json.loads(Path(path).read_text()))

Bases: Protocol

Minimal data access surface needed by analyses.

Analysis implementations use this protocol instead of reaching into AcqImage internals. The contract is intentionally small: ROI-local image data and image physical spacing. This keeps analysis code reusable from GUI, batch, test, and notebook workflows.

Source code in src/acqstore/acq_image/analysis/data_provider.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class AnalysisDataProvider(Protocol):
    """Minimal data access surface needed by analyses.

    Analysis implementations use this protocol instead of reaching into
    ``AcqImage`` internals. The contract is intentionally small: ROI-local image
    data and image physical spacing. This keeps analysis code reusable from GUI,
    batch, test, and notebook workflows.
    """

    def get_roi_image(self, channel: int, roi_id: int) -> np.ndarray:
        """Return image data for one channel cropped to one ROI.

        Args:
            channel: Channel index.
            roi_id: ROI identifier.

        Returns:
            Two-dimensional ROI image data.
        """
        ...

    def get_image_physical_units(self) -> tuple[float, float]:
        """Return physical units for the 2D image plane.

        Returns:
            Per-pixel ``(step_y, step_x)`` for ``(Y, X)`` image data.
        """
        ...

get_roi_image

get_roi_image(channel: int, roi_id: int) -> np.ndarray

Return image data for one channel cropped to one ROI.

Parameters:

Name Type Description Default
channel int

Channel index.

required
roi_id int

ROI identifier.

required

Returns:

Type Description
ndarray

Two-dimensional ROI image data.

Source code in src/acqstore/acq_image/analysis/data_provider.py
28
29
30
31
32
33
34
35
36
37
38
def get_roi_image(self, channel: int, roi_id: int) -> np.ndarray:
    """Return image data for one channel cropped to one ROI.

    Args:
        channel: Channel index.
        roi_id: ROI identifier.

    Returns:
        Two-dimensional ROI image data.
    """
    ...

get_image_physical_units

get_image_physical_units() -> tuple[float, float]

Return physical units for the 2D image plane.

Returns:

Type Description
tuple[float, float]

Per-pixel (step_y, step_x) for (Y, X) image data.

Source code in src/acqstore/acq_image/analysis/data_provider.py
40
41
42
43
44
45
46
def get_image_physical_units(self) -> tuple[float, float]:
    """Return physical units for the 2D image plane.

    Returns:
        Per-pixel ``(step_y, step_x)`` for ``(Y, X)`` image data.
    """
    ...

Runtime context for progress reporting and cooperative cancellation.

Analysis code receives this object from GUI controllers, batch runners, or scripts. Derived analyses should call :meth:report_progress at natural milestones and :meth:raise_if_cancelled or :meth:is_cancelled inside long loops.

Parameters:

Name Type Description Default
progress_callback Callable[[float | None, str], None] | None

Optional callback receiving (fraction, message). fraction should be between 0 and 1 when known, or None when progress is indeterminate.

None
cancel_callback Callable[[], bool] | None

Optional callback returning True when cancellation has been requested.

None
Source code in src/acqstore/acq_image/analysis/model.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@dataclass(slots=True)
class AnalysisRunContext:
    """Runtime context for progress reporting and cooperative cancellation.

    Analysis code receives this object from GUI controllers, batch runners, or
    scripts. Derived analyses should call :meth:`report_progress` at natural
    milestones and :meth:`raise_if_cancelled` or :meth:`is_cancelled` inside
    long loops.

    Args:
        progress_callback: Optional callback receiving ``(fraction, message)``.
            ``fraction`` should be between 0 and 1 when known, or ``None`` when
            progress is indeterminate.
        cancel_callback: Optional callback returning ``True`` when cancellation
            has been requested.
    """

    progress_callback: Callable[[float | None, str], None] | None = None
    cancel_callback: Callable[[], bool] | None = None

    def report_progress(self, fraction: float | None, message: str = "") -> None:
        """Report progress for a running analysis.

        Args:
            fraction: Fraction complete, or None if unknown.
            message: Human-readable progress message.

        Returns:
            None.
        """
        if self.progress_callback is not None:
            self.progress_callback(fraction, message)

    def is_cancelled(self) -> bool:
        """Return whether cancellation has been requested.

        Returns:
            True if cancellation has been requested.
        """
        return bool(self.cancel_callback is not None and self.cancel_callback())

    def raise_if_cancelled(self) -> None:
        """Raise if cancellation has been requested.

        Raises:
            AnalysisCancelled: If cancellation has been requested.
        """
        if self.is_cancelled():
            raise AnalysisCancelled("Analysis cancelled")

report_progress

report_progress(
    fraction: float | None, message: str = ''
) -> None

Report progress for a running analysis.

Parameters:

Name Type Description Default
fraction float | None

Fraction complete, or None if unknown.

required
message str

Human-readable progress message.

''

Returns:

Type Description
None

None.

Source code in src/acqstore/acq_image/analysis/model.py
195
196
197
198
199
200
201
202
203
204
205
206
def report_progress(self, fraction: float | None, message: str = "") -> None:
    """Report progress for a running analysis.

    Args:
        fraction: Fraction complete, or None if unknown.
        message: Human-readable progress message.

    Returns:
        None.
    """
    if self.progress_callback is not None:
        self.progress_callback(fraction, message)

is_cancelled

is_cancelled() -> bool

Return whether cancellation has been requested.

Returns:

Type Description
bool

True if cancellation has been requested.

Source code in src/acqstore/acq_image/analysis/model.py
208
209
210
211
212
213
214
def is_cancelled(self) -> bool:
    """Return whether cancellation has been requested.

    Returns:
        True if cancellation has been requested.
    """
    return bool(self.cancel_callback is not None and self.cancel_callback())

raise_if_cancelled

raise_if_cancelled() -> None

Raise if cancellation has been requested.

Raises:

Type Description
AnalysisCancelled

If cancellation has been requested.

Source code in src/acqstore/acq_image/analysis/model.py
216
217
218
219
220
221
222
223
def raise_if_cancelled(self) -> None:
    """Raise if cancellation has been requested.

    Raises:
        AnalysisCancelled: If cancellation has been requested.
    """
    if self.is_cancelled():
        raise AnalysisCancelled("Analysis cancelled")

Outputs from one completed or loaded analysis.

summary stores small JSON-serializable values that belong in an acquisition sidecar. table stores larger per-row results that can be saved to CSV and inspected in notebooks or GUI tables.

Parameters:

Name Type Description Default
summary dict[str, Any]

Small JSON-serializable result dictionary.

dict()
table DataFrame | None

Optional large tabular output. Tables produced by derived analyses must not include reserved bookkeeping columns channel or roi_id; those are added by :meth:BaseAnalysis.table_with_bookkeeping.

None
Source code in src/acqstore/acq_image/analysis/model.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@dataclass(slots=True)
class AnalysisResult:
    """Outputs from one completed or loaded analysis.

    ``summary`` stores small JSON-serializable values that belong in an
    acquisition sidecar. ``table`` stores larger per-row results that can be
    saved to CSV and inspected in notebooks or GUI tables.

    Args:
        summary: Small JSON-serializable result dictionary.
        table: Optional large tabular output. Tables produced by derived
            analyses must not include reserved bookkeeping columns ``channel``
            or ``roi_id``; those are added by :meth:`BaseAnalysis.table_with_bookkeeping`.
    """

    summary: dict[str, Any] = field(default_factory=dict)
    table: pd.DataFrame | None = None