Skip to content

Sum intensity analysis

Bases: BaseAnalysis

Measure normalized line intensity and detect peaks/events.

The analysis runs on a full-resolution rectangular ROI crop. Rows are interpreted as time and columns as distance. The raw row-sum trace is stored for debugging, but peak detection uses normalized intensity sum_intensity / image.shape[1] so ROIs with different spatial widths are more comparable.

Parameters:

Name Type Description Default
channel int

Zero-based channel index for analysis.

required
roi_id int

ROI identifier for analysis.

required
detection_params dict[str, object] | None

Optional detection parameters. Missing values are filled from detection_schema defaults.

None
Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
@register_analysis_class
class SumIntensityAnalysis(BaseAnalysis):
    """Measure normalized line intensity and detect peaks/events.

    The analysis runs on a full-resolution rectangular ROI crop. Rows are
    interpreted as time and columns as distance. The raw row-sum trace is stored
    for debugging, but peak detection uses normalized intensity
    ``sum_intensity / image.shape[1]`` so ROIs with different spatial widths are
    more comparable.

    Args:
        channel: Zero-based channel index for analysis.
        roi_id: ROI identifier for analysis.
        detection_params: Optional detection parameters. Missing values are
            filled from ``detection_schema`` defaults.
    """

    analysis_name = "sum_intensity"
    analysis_version = 1
    summary_columns = (
        "analysis_date",
        "analysis_time",
        "analysis_version",
        "status",
        "num_timepoints",
        "num_peaks",
        "num_space_pixels",
        "seconds_per_line",
        "f0_baseline",
        "baseline_method",
        "baseline_percentile",
        "manual_f0_baseline",
        "detrend_method",
        "detection_method",
        "detection_source",
        "peak_search_window_ms",
        "width_search_window_ms",
        "baseline_window_ms",
        "peak_amplitude_mean",
        "peak_amplitude_median",
        "errors",
    )
    detection_schema = (
        DetectionParamSchema(
            name="window_radius_points",
            display_name="Window Radius",
            value_type=DetectionValueType.INT,
            default=6,
            unit="points",
            description="Radius around each time row used for rolling row-sum averaging.",
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="filter_method",
            display_name="Filter Method",
            value_type=DetectionValueType.ENUM,
            default="median",
            choices=("none", "median"),
            description="Optional pre-detection trace filter applied to normalized intensity.",
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="median_filter_kernel_points",
            display_name="Median Filter Kernel",
            value_type=DetectionValueType.INT,
            default=3,
            unit="points",
            description="Median filter kernel size in time points. Even values are rounded up.",
            methods=("median",),
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="detrend_method",
            display_name="Detrend Method",
            value_type=DetectionValueType.ENUM,
            default="single_exponential",
            choices=("none", "single_exponential"),
            description="Optional bleach-trend removal before detection.",
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="baseline_method",
            display_name="F0 Baseline Method",
            value_type=DetectionValueType.ENUM,
            default="percentile",
            choices=("percentile", "manual"),
            description="Method used to estimate scalar F0 for delta-F over F0.",
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="baseline_percentile",
            display_name="F0 Baseline Percentile",
            value_type=DetectionValueType.FLOAT,
            default=20.0,
            unit="percentile",
            description="Percentile of the filtered and detrended trace used as F0.",
            methods=("percentile",),
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="manual_f0_baseline",
            display_name="Manual F0 Baseline",
            value_type=DetectionValueType.FLOAT,
            default=1.0,
            description=(
                "User-supplied scalar F0 used when baseline_method is manual. "
                "Units are the same as the filtered/detrended normalized intensity trace."
            ),
            methods=("manual",),
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="baseline_window_ms",
            display_name="Baseline Window",
            value_type=DetectionValueType.FLOAT,
            default=100.0,
            unit="ms",
            description=(
                "Backward window before each detected onset used to calculate "
                "event-local baseline_mean, baseline_std, and prominence."
            ),
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="baseline_min_value",
            display_name="F0 Minimum Value",
            value_type=DetectionValueType.FLOAT,
            default=1e-12,
            description="Small positive floor used to avoid division by zero in df/f0.",
            visible=False,
            category=DetectionParamCategory.PREPROCESSING,
        ),
        DetectionParamSchema(
            name="detection_method",
            display_name="Detection Method",
            value_type=DetectionValueType.ENUM,
            default="derivative_threshold",
            choices=("derivative_threshold", "absolute_threshold"),
            description="Peak onset detector.",
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="polarity",
            display_name="Polarity",
            value_type=DetectionValueType.ENUM,
            default="positive",
            choices=("positive", "negative"),
            description="Expected peak polarity in the detection signal.",
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="detection_source",
            display_name="Detection Source",
            value_type=DetectionValueType.ENUM,
            default=SumIntensityTraceKey.DF_F_SIGNAL.value,
            choices=(
                SumIntensityTraceKey.SUM_INTENSITY.value,
                SumIntensityTraceKey.NORM_SUM_INTENSITY.value,
                SumIntensityTraceKey.FILTERED_NORM_SUM_INTENSITY.value,
                SumIntensityTraceKey.DETRENDED_NORM_SUM_INTENSITY.value,
                SumIntensityTraceKey.DF_F_SIGNAL.value,
            ),
            description=(
                "Continuous trace used for onset detection. Derivative-threshold "
                "detection uses the time derivative of this selected trace."
            ),
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="absolute_threshold",
            display_name="Absolute Threshold",
            value_type=DetectionValueType.FLOAT,
            default=0.0,
            description="Detection-signal threshold used by absolute_threshold detection.",
            methods=("absolute_threshold",),
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="derivative_threshold_per_sec",
            display_name="Derivative Threshold",
            value_type=DetectionValueType.FLOAT,
            default=3.0,
            unit="1/s",
            description="Derivative threshold in selected detection-source units per second.",
            methods=("derivative_threshold",),
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="refractory_period_ms",
            display_name="Refractory Period",
            value_type=DetectionValueType.FLOAT,
            default=10.0,
            unit="ms",
            description="Minimum accepted onset-to-onset interval.",
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="peak_search_window_ms",
            display_name="Peak Search Window",
            value_type=DetectionValueType.FLOAT,
            default=50.0,
            unit="ms",
            description="Forward search window used to refine peak index after onset.",
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="width_search_window_ms",
            display_name="Width Search Window",
            value_type=DetectionValueType.FLOAT,
            default=750.0,
            unit="ms",
            description=(
                "Maximum forward search from the refined peak to find falling-side "
                "fractional width crossings. Missing crossings within this window "
                "are stored as level-crossing failures."
            ),
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
        DetectionParamSchema(
            name="level_fractions",
            display_name="Level Fractions",
            value_type=DetectionValueType.STR,
            default="0.1,0.2,0.5,0.8,0.9",
            description="Comma-separated peak-amplitude fractions for width measurements.",
            visible=False,
            category=DetectionParamCategory.PEAK_DETECTION,
        ),
    )

    @classmethod
    def get_detection_presets(cls) -> tuple[SumIntensityDetectionPreset, ...]:
        """Return built-in detection presets for this analysis type.

        Returns:
            Tuple of immutable preset descriptors in stable UI order.
        """
        return list_sum_intensity_detection_presets()

    @classmethod
    def get_detection_preset(
        cls,
        name: SumIntensityPresetName | str,
    ) -> SumIntensityDetectionPreset:
        """Return one built-in detection preset.

        Args:
            name: Preset enum value or its string value.

        Returns:
            Matching preset descriptor.

        Raises:
            KeyError: If ``name`` is not a built-in preset.
        """
        return get_sum_intensity_detection_preset(name)

    @classmethod
    def get_detection_preset_params(
        cls,
        name: SumIntensityPresetName | str,
    ) -> dict[str, object]:
        """Return a copied detection-parameter mapping for one preset.

        Args:
            name: Preset enum value or its string value.

        Returns:
            Complete detection-parameter dictionary suitable for constructing or
            updating a ``SumIntensityAnalysis`` instance.

        Raises:
            KeyError: If ``name`` is not a built-in preset.
        """
        params = get_sum_intensity_detection_preset_params(name)
        cls.validate_detection_params(params)
        return params

    @classmethod
    def get_feature_schema(cls) -> tuple[SumIntensityFeatureSchema, ...]:
        """Return event-level result feature schema entries.

        Returns:
            Tuple of feature schema records in stable report order.
        """
        return get_sum_intensity_feature_schema()

    @classmethod
    def get_feature_schema_dataframe(cls) -> pd.DataFrame:
        """Return event-level result feature schema as a DataFrame.

        Returns:
            DataFrame with one row per documented event-level feature.
        """
        return get_sum_intensity_feature_schema_dataframe()

    def __init__(
        self,
        *,
        channel: int,
        roi_id: int,
        detection_params: dict[str, object] | None = None,
    ) -> None:
        """Create a sum-intensity analysis instance.

        Args:
            channel: Channel index for analysis.
            roi_id: ROI identifier for analysis.
            detection_params: Optional detection parameters.
        """
        super().__init__(channel=channel, roi_id=roi_id, detection_params=detection_params)

    def run(
        self,
        data_provider: AnalysisDataProvider,
        *,
        context: AnalysisRunContext | None = None,
        dependencies: dict[str, BaseAnalysis] | None = None,
    ) -> AnalysisResult:
        """Run sum-intensity peak detection on one ROI crop.

        Args:
            data_provider: Provider for ROI image data and physical spacing.
                ``get_roi_image`` must return a 2D ``(time, space)`` array.
            context: Optional progress/cancellation context.
            dependencies: Unused for sum-intensity analysis.

        Returns:
            Populated analysis result. The result table includes normalized
            intensity, detection signal, onset mask, and peak mask columns.
        """
        _ = dependencies
        context = context or AnalysisRunContext()
        context.raise_if_cancelled()
        context.report_progress(0.0, "Loading ROI image")
        image = data_provider.get_roi_image(channel=self.key.channel, roi_id=self.key.roi_id)
        physical_units = data_provider.get_image_physical_units()
        context.report_progress(0.25, "Running sum intensity analysis")
        result = run_sum_intensity(
            image,
            detection_params=self.detection_params,
            physical_units=physical_units,
        )
        context.raise_if_cancelled()
        self.result.summary = self.finalize_summary(result.summary)
        self.result.table = result.table
        self.set_dirty()
        context.report_progress(1.0, "Sum intensity analysis complete")
        return self.result

    @classmethod
    def get_pool_summary_columns(cls) -> tuple[str, ...]:
        """Return scalar summary columns for sum-intensity pool tables.

        This additive pool-facing API keeps the existing rich summary and JSON
        APIs unchanged while exposing table-safe scalar columns. List-like
        summary values are flattened for collection-level DataFrame caches.

        Returns:
            Tuple of scalar summary column names in stable order.
        """
        columns: list[str] = []
        for column in cls.get_summary_columns():
            if column == "errors":
                columns.extend(["error_count", "errors_text"])
            else:
                columns.append(column)
        return tuple(columns)

    def get_pool_summary_values(self) -> dict[str, object]:
        """Return scalar summary values for sum-intensity pool tables.

        Returns:
            Mapping whose keys match :meth:`get_pool_summary_columns`. Values
            are scalar objects suitable for pandas table cells.
        """
        values: dict[str, object] = {}
        for column in self.get_summary_columns():
            if column == "errors":
                errors = self.result.summary.get("errors", ())
                if errors is pd.NA or errors is None:
                    error_values: tuple[str, ...] = ()
                elif isinstance(errors, (list, tuple)):
                    error_values = tuple(str(item) for item in errors)
                else:
                    error_values = (str(errors),)
                values["error_count"] = len(error_values)
                values["errors_text"] = "; ".join(error_values)
            else:
                values[column] = self.result.summary.get(column, pd.NA)
        return values

    @classmethod
    def get_pool_peak_columns(cls) -> tuple[str, ...]:
        """Return scalar peak-row columns for sum-intensity pool tables.

        Returns:
            Tuple of flattened peak-event column names. Feature columns are
            generated from :meth:`get_feature_schema` so future event-level
            features propagate to the pool API without pool changes.
        """
        base_columns = (
            "peak_id",
            "peak_status",
            "peak_warning_count",
            "peak_warnings_text",
            "onset_index",
            "onset_time_sec",
            "onset_value",
            "peak_index",
            "peak_time_sec",
            "peak_value",
            "peak_amplitude",
            "peak_detection_method",
            "onset_to_onset_interval_sec",
            "peak_to_peak_interval_sec",
        )
        feature_columns: list[str] = []
        for schema in cls.get_feature_schema():
            feature_columns.extend(
                [
                    schema.name,
                    f"{schema.name}_status",
                    f"{schema.name}_reason",
                ]
            )
        width_columns: list[str] = []
        for level in PeakWidthLevel:
            prefix = level.value
            width_columns.extend(
                [
                    f"{prefix}_value",
                    f"{prefix}_left_index",
                    f"{prefix}_right_index",
                    f"{prefix}_points",
                    f"{prefix}_sec",
                    f"{prefix}_status",
                ]
            )
        return base_columns + tuple(feature_columns) + tuple(width_columns)

    def get_pool_peak_rows(self) -> tuple[dict[str, object], ...]:
        """Return flattened scalar rows for detected sum-intensity peaks.

        Returns:
            Tuple of dictionaries, one per detected peak. Empty when the
            analysis has no peak events. Each dictionary has exactly the keys
            returned by :meth:`get_pool_peak_columns`.
        """
        return tuple(self._pool_peak_row(event) for event in self.get_peak_events())

    def get_peak_events(self) -> tuple[PeakEvent, ...]:
        """Return parsed peak-event records from the result summary.

        Returns:
            Tuple of peak events. Empty when analysis has not been run or when
            no peaks were detected.
        """
        records = self.result.summary.get("peak_events", ())
        return tuple(PeakEvent.from_json_dict(dict(record)) for record in records)

    @classmethod
    def _pool_peak_row(cls, event: PeakEvent) -> dict[str, object]:
        row: dict[str, object] = {
            "peak_id": int(event.peak_id),
            "peak_status": str(event.status),
            "peak_warning_count": len(event.warnings),
            "peak_warnings_text": "; ".join(str(item) for item in event.warnings),
            "onset_index": int(event.onset_index),
            "onset_time_sec": float(event.onset_time_sec),
            "onset_value": float(event.onset_value),
            "peak_index": pd.NA if event.peak_index is None else int(event.peak_index),
            "peak_time_sec": _pool_optional_float(event.peak_time_sec),
            "peak_value": _pool_optional_float(event.peak_value),
            "peak_amplitude": _pool_optional_float(event.peak_amplitude),
            "peak_detection_method": str(event.detection_method),
            "onset_to_onset_interval_sec": _pool_optional_float(
                event.onset_to_onset_interval_sec
            ),
            "peak_to_peak_interval_sec": _pool_optional_float(
                event.peak_to_peak_interval_sec
            ),
        }
        for schema in cls.get_feature_schema():
            feature = getattr(event, schema.name)
            row.update(_pool_feature_values(schema.name, feature))
        crossings_by_prefix = {
            _pool_width_prefix(crossing): crossing for crossing in event.level_crossings
        }
        for level in PeakWidthLevel:
            row.update(_pool_width_values(level.value, crossings_by_prefix.get(level.value)))
        return {column: row.get(column, pd.NA) for column in cls.get_pool_peak_columns()}

    def get_trace(self, key: SumIntensityTraceKey) -> ResultTrace:
        """Return one named continuous trace.

        Args:
            key: Trace key to retrieve.

        Returns:
            Result trace with ``time_sec`` as x values.

        Raises:
            ValueError: If analysis has no result table.
            KeyError: If the requested trace column is missing.
        """
        if self.result.table is None:
            raise ValueError("analysis has no result table")
        table = self.result.table
        if "time_sec" not in table.columns:
            raise KeyError("Sum intensity trace requires 'time_sec' column")
        if key.value not in table.columns:
            raise KeyError(f"Sum intensity trace column is missing: {key.value!r}")
        definition = _TRACE_DEFINITIONS[key]
        return ResultTrace(
            key=key,
            name=definition["display_name"],
            x=table["time_sec"].to_numpy(dtype=float),
            y=table[key.value].to_numpy(dtype=float),
            x_label="Time (s)",
            y_label=definition["y_label"],
            metadata={
                "description": definition["description"],
                "units": definition["units"],
            },
        )

    def get_event_points(self, key: SumIntensityEventPointKey) -> ResultPoints:
        """Return sparse event marker points.

        Args:
            key: Event point collection to retrieve.

        Returns:
            Result points for plotting event markers.

        Raises:
            KeyError: If the point key is unknown.
        """
        events = self.get_peak_events()
        if key == SumIntensityEventPointKey.ONSETS:
            return ResultPoints(
                key=key,
                name="Onsets",
                x=_optional_event_array(events, "onset_time_sec"),
                y=_optional_event_array(events, "onset_value"),
                x_label="Time (s)",
                y_label="Detection source",
                metadata={"description": "Accepted onset threshold crossings."},
            )
        if key == SumIntensityEventPointKey.PEAKS:
            return ResultPoints(
                key=key,
                name="Peaks",
                x=_optional_event_array(events, "peak_time_sec"),
                y=_optional_event_array(events, "peak_value"),
                x_label="Time (s)",
                y_label="Detection source",
                metadata={"description": "Refined peak locations."},
            )
        raise KeyError(f"Unknown sum-intensity event point key: {key!r}")

    def get_width_trace(
        self,
        peak_width_level: PeakWidthLevel | None = None,
    ) -> ResultTrace | tuple[ResultTrace, ...]:
        """Return NaN-separated width segment traces.

        Args:
            peak_width_level: Specific width level to return. When None, traces
                for all standard width levels are returned.

        Returns:
            One ``ResultTrace`` when ``peak_width_level`` is supplied, otherwise
            a tuple of traces for all levels.

        Raises:
            KeyError: If a requested width level is unknown.
        """
        if peak_width_level is None:
            return tuple(self.get_width_trace(level) for level in PeakWidthLevel)
        fraction = WIDTH_LEVEL_FRACTIONS[peak_width_level]
        x_values: list[float] = []
        y_values: list[float] = []
        for event in self.get_peak_events():
            crossing = _find_event_crossing(event, fraction)
            if crossing is None or crossing.status != "ok":
                continue
            if crossing.left_index is None or crossing.right_index is None or crossing.value is None:
                continue
            seconds_per_line = float(
                self.result.summary.get(SumIntensitySummaryKey.SECONDS_PER_LINE.value, 1.0)
            )
            x_values.extend(
                [
                    float(crossing.left_index) * seconds_per_line,
                    float(crossing.right_index) * seconds_per_line,
                    float("nan"),
                ]
            )
            y_values.extend([float(crossing.value), float(crossing.value), float("nan")])
        return ResultTrace(
            key=peak_width_level.value,
            name=f"Peak {peak_width_level.value.replace('_', ' ')}",
            x=np.asarray(x_values, dtype=float),
            y=np.asarray(y_values, dtype=float),
            x_label="Time (s)",
            y_label="Detection source",
            metadata={
                "fraction": fraction,
                "trace_type": "width_segments",
                "connectgaps": False,
            },
        )

    def get_summary_value(self, key: SumIntensitySummaryKey) -> object:
        """Return one named summary value.

        Args:
            key: Summary key to retrieve.

        Returns:
            Stored summary value, or None when the key is absent.
        """
        return self.result.summary.get(key.value)

    def get_plot_data(self) -> AnalysisPlotData | None:
        """Return canonical df/f0 plot data.

        Returns:
            Plot data using ``time_sec`` for the x axis and ``df_f_signal`` for
            the y axis. Returns None when the analysis has no table output yet.

        Raises:
            KeyError: If the table is present but missing required columns.
        """
        if self.result.table is None:
            return None
        trace = self.get_trace(SumIntensityTraceKey.DF_F_SIGNAL)
        return AnalysisPlotData(
            x=tuple(float(value) for value in trace.x.tolist()),
            y=tuple(float(value) for value in trace.y.tolist()),
            x_label=trace.x_label,
            y_label=trace.y_label,
            series_name=trace.name,
        )

get_detection_presets classmethod

get_detection_presets() -> tuple[
    SumIntensityDetectionPreset, ...
]

Return built-in detection presets for this analysis type.

Returns:

Type Description
tuple[SumIntensityDetectionPreset, ...]

Tuple of immutable preset descriptors in stable UI order.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
275
276
277
278
279
280
281
282
@classmethod
def get_detection_presets(cls) -> tuple[SumIntensityDetectionPreset, ...]:
    """Return built-in detection presets for this analysis type.

    Returns:
        Tuple of immutable preset descriptors in stable UI order.
    """
    return list_sum_intensity_detection_presets()

get_detection_preset classmethod

get_detection_preset(
    name: SumIntensityPresetName | str,
) -> SumIntensityDetectionPreset

Return one built-in detection preset.

Parameters:

Name Type Description Default
name SumIntensityPresetName | str

Preset enum value or its string value.

required

Returns:

Type Description
SumIntensityDetectionPreset

Matching preset descriptor.

Raises:

Type Description
KeyError

If name is not a built-in preset.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@classmethod
def get_detection_preset(
    cls,
    name: SumIntensityPresetName | str,
) -> SumIntensityDetectionPreset:
    """Return one built-in detection preset.

    Args:
        name: Preset enum value or its string value.

    Returns:
        Matching preset descriptor.

    Raises:
        KeyError: If ``name`` is not a built-in preset.
    """
    return get_sum_intensity_detection_preset(name)

get_detection_preset_params classmethod

get_detection_preset_params(
    name: SumIntensityPresetName | str,
) -> dict[str, object]

Return a copied detection-parameter mapping for one preset.

Parameters:

Name Type Description Default
name SumIntensityPresetName | str

Preset enum value or its string value.

required

Returns:

Type Description
dict[str, object]

Complete detection-parameter dictionary suitable for constructing or

dict[str, object]

updating a SumIntensityAnalysis instance.

Raises:

Type Description
KeyError

If name is not a built-in preset.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@classmethod
def get_detection_preset_params(
    cls,
    name: SumIntensityPresetName | str,
) -> dict[str, object]:
    """Return a copied detection-parameter mapping for one preset.

    Args:
        name: Preset enum value or its string value.

    Returns:
        Complete detection-parameter dictionary suitable for constructing or
        updating a ``SumIntensityAnalysis`` instance.

    Raises:
        KeyError: If ``name`` is not a built-in preset.
    """
    params = get_sum_intensity_detection_preset_params(name)
    cls.validate_detection_params(params)
    return params

get_feature_schema classmethod

get_feature_schema() -> tuple[
    SumIntensityFeatureSchema, ...
]

Return event-level result feature schema entries.

Returns:

Type Description
tuple[SumIntensityFeatureSchema, ...]

Tuple of feature schema records in stable report order.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
323
324
325
326
327
328
329
330
@classmethod
def get_feature_schema(cls) -> tuple[SumIntensityFeatureSchema, ...]:
    """Return event-level result feature schema entries.

    Returns:
        Tuple of feature schema records in stable report order.
    """
    return get_sum_intensity_feature_schema()

get_feature_schema_dataframe classmethod

get_feature_schema_dataframe() -> pd.DataFrame

Return event-level result feature schema as a DataFrame.

Returns:

Type Description
DataFrame

DataFrame with one row per documented event-level feature.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
332
333
334
335
336
337
338
339
@classmethod
def get_feature_schema_dataframe(cls) -> pd.DataFrame:
    """Return event-level result feature schema as a DataFrame.

    Returns:
        DataFrame with one row per documented event-level feature.
    """
    return get_sum_intensity_feature_schema_dataframe()

run

run(
    data_provider: AnalysisDataProvider,
    *,
    context: AnalysisRunContext | None = None,
    dependencies: dict[str, BaseAnalysis] | None = None,
) -> AnalysisResult

Run sum-intensity peak detection on one ROI crop.

Parameters:

Name Type Description Default
data_provider AnalysisDataProvider

Provider for ROI image data and physical spacing. get_roi_image must return a 2D (time, space) array.

required
context AnalysisRunContext | None

Optional progress/cancellation context.

None
dependencies dict[str, BaseAnalysis] | None

Unused for sum-intensity analysis.

None

Returns:

Type Description
AnalysisResult

Populated analysis result. The result table includes normalized

AnalysisResult

intensity, detection signal, onset mask, and peak mask columns.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def run(
    self,
    data_provider: AnalysisDataProvider,
    *,
    context: AnalysisRunContext | None = None,
    dependencies: dict[str, BaseAnalysis] | None = None,
) -> AnalysisResult:
    """Run sum-intensity peak detection on one ROI crop.

    Args:
        data_provider: Provider for ROI image data and physical spacing.
            ``get_roi_image`` must return a 2D ``(time, space)`` array.
        context: Optional progress/cancellation context.
        dependencies: Unused for sum-intensity analysis.

    Returns:
        Populated analysis result. The result table includes normalized
        intensity, detection signal, onset mask, and peak mask columns.
    """
    _ = dependencies
    context = context or AnalysisRunContext()
    context.raise_if_cancelled()
    context.report_progress(0.0, "Loading ROI image")
    image = data_provider.get_roi_image(channel=self.key.channel, roi_id=self.key.roi_id)
    physical_units = data_provider.get_image_physical_units()
    context.report_progress(0.25, "Running sum intensity analysis")
    result = run_sum_intensity(
        image,
        detection_params=self.detection_params,
        physical_units=physical_units,
    )
    context.raise_if_cancelled()
    self.result.summary = self.finalize_summary(result.summary)
    self.result.table = result.table
    self.set_dirty()
    context.report_progress(1.0, "Sum intensity analysis complete")
    return self.result

get_pool_summary_columns classmethod

get_pool_summary_columns() -> tuple[str, ...]

Return scalar summary columns for sum-intensity pool tables.

This additive pool-facing API keeps the existing rich summary and JSON APIs unchanged while exposing table-safe scalar columns. List-like summary values are flattened for collection-level DataFrame caches.

Returns:

Type Description
tuple[str, ...]

Tuple of scalar summary column names in stable order.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@classmethod
def get_pool_summary_columns(cls) -> tuple[str, ...]:
    """Return scalar summary columns for sum-intensity pool tables.

    This additive pool-facing API keeps the existing rich summary and JSON
    APIs unchanged while exposing table-safe scalar columns. List-like
    summary values are flattened for collection-level DataFrame caches.

    Returns:
        Tuple of scalar summary column names in stable order.
    """
    columns: list[str] = []
    for column in cls.get_summary_columns():
        if column == "errors":
            columns.extend(["error_count", "errors_text"])
        else:
            columns.append(column)
    return tuple(columns)

get_pool_summary_values

get_pool_summary_values() -> dict[str, object]

Return scalar summary values for sum-intensity pool tables.

Returns:

Type Description
dict[str, object]

Mapping whose keys match :meth:get_pool_summary_columns. Values

dict[str, object]

are scalar objects suitable for pandas table cells.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def get_pool_summary_values(self) -> dict[str, object]:
    """Return scalar summary values for sum-intensity pool tables.

    Returns:
        Mapping whose keys match :meth:`get_pool_summary_columns`. Values
        are scalar objects suitable for pandas table cells.
    """
    values: dict[str, object] = {}
    for column in self.get_summary_columns():
        if column == "errors":
            errors = self.result.summary.get("errors", ())
            if errors is pd.NA or errors is None:
                error_values: tuple[str, ...] = ()
            elif isinstance(errors, (list, tuple)):
                error_values = tuple(str(item) for item in errors)
            else:
                error_values = (str(errors),)
            values["error_count"] = len(error_values)
            values["errors_text"] = "; ".join(error_values)
        else:
            values[column] = self.result.summary.get(column, pd.NA)
    return values

get_pool_peak_columns classmethod

get_pool_peak_columns() -> tuple[str, ...]

Return scalar peak-row columns for sum-intensity pool tables.

Returns:

Type Description
str

Tuple of flattened peak-event column names. Feature columns are

...

generated from :meth:get_feature_schema so future event-level

tuple[str, ...]

features propagate to the pool API without pool changes.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
@classmethod
def get_pool_peak_columns(cls) -> tuple[str, ...]:
    """Return scalar peak-row columns for sum-intensity pool tables.

    Returns:
        Tuple of flattened peak-event column names. Feature columns are
        generated from :meth:`get_feature_schema` so future event-level
        features propagate to the pool API without pool changes.
    """
    base_columns = (
        "peak_id",
        "peak_status",
        "peak_warning_count",
        "peak_warnings_text",
        "onset_index",
        "onset_time_sec",
        "onset_value",
        "peak_index",
        "peak_time_sec",
        "peak_value",
        "peak_amplitude",
        "peak_detection_method",
        "onset_to_onset_interval_sec",
        "peak_to_peak_interval_sec",
    )
    feature_columns: list[str] = []
    for schema in cls.get_feature_schema():
        feature_columns.extend(
            [
                schema.name,
                f"{schema.name}_status",
                f"{schema.name}_reason",
            ]
        )
    width_columns: list[str] = []
    for level in PeakWidthLevel:
        prefix = level.value
        width_columns.extend(
            [
                f"{prefix}_value",
                f"{prefix}_left_index",
                f"{prefix}_right_index",
                f"{prefix}_points",
                f"{prefix}_sec",
                f"{prefix}_status",
            ]
        )
    return base_columns + tuple(feature_columns) + tuple(width_columns)

get_pool_peak_rows

get_pool_peak_rows() -> tuple[dict[str, object], ...]

Return flattened scalar rows for detected sum-intensity peaks.

Returns:

Type Description
dict[str, object]

Tuple of dictionaries, one per detected peak. Empty when the

...

analysis has no peak events. Each dictionary has exactly the keys

tuple[dict[str, object], ...]

returned by :meth:get_pool_peak_columns.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
486
487
488
489
490
491
492
493
494
def get_pool_peak_rows(self) -> tuple[dict[str, object], ...]:
    """Return flattened scalar rows for detected sum-intensity peaks.

    Returns:
        Tuple of dictionaries, one per detected peak. Empty when the
        analysis has no peak events. Each dictionary has exactly the keys
        returned by :meth:`get_pool_peak_columns`.
    """
    return tuple(self._pool_peak_row(event) for event in self.get_peak_events())

get_peak_events

get_peak_events() -> tuple[PeakEvent, ...]

Return parsed peak-event records from the result summary.

Returns:

Type Description
PeakEvent

Tuple of peak events. Empty when analysis has not been run or when

...

no peaks were detected.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
496
497
498
499
500
501
502
503
504
def get_peak_events(self) -> tuple[PeakEvent, ...]:
    """Return parsed peak-event records from the result summary.

    Returns:
        Tuple of peak events. Empty when analysis has not been run or when
        no peaks were detected.
    """
    records = self.result.summary.get("peak_events", ())
    return tuple(PeakEvent.from_json_dict(dict(record)) for record in records)

get_trace

get_trace(key: SumIntensityTraceKey) -> ResultTrace

Return one named continuous trace.

Parameters:

Name Type Description Default
key SumIntensityTraceKey

Trace key to retrieve.

required

Returns:

Type Description
ResultTrace

Result trace with time_sec as x values.

Raises:

Type Description
ValueError

If analysis has no result table.

KeyError

If the requested trace column is missing.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def get_trace(self, key: SumIntensityTraceKey) -> ResultTrace:
    """Return one named continuous trace.

    Args:
        key: Trace key to retrieve.

    Returns:
        Result trace with ``time_sec`` as x values.

    Raises:
        ValueError: If analysis has no result table.
        KeyError: If the requested trace column is missing.
    """
    if self.result.table is None:
        raise ValueError("analysis has no result table")
    table = self.result.table
    if "time_sec" not in table.columns:
        raise KeyError("Sum intensity trace requires 'time_sec' column")
    if key.value not in table.columns:
        raise KeyError(f"Sum intensity trace column is missing: {key.value!r}")
    definition = _TRACE_DEFINITIONS[key]
    return ResultTrace(
        key=key,
        name=definition["display_name"],
        x=table["time_sec"].to_numpy(dtype=float),
        y=table[key.value].to_numpy(dtype=float),
        x_label="Time (s)",
        y_label=definition["y_label"],
        metadata={
            "description": definition["description"],
            "units": definition["units"],
        },
    )

get_event_points

get_event_points(
    key: SumIntensityEventPointKey,
) -> ResultPoints

Return sparse event marker points.

Parameters:

Name Type Description Default
key SumIntensityEventPointKey

Event point collection to retrieve.

required

Returns:

Type Description
ResultPoints

Result points for plotting event markers.

Raises:

Type Description
KeyError

If the point key is unknown.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def get_event_points(self, key: SumIntensityEventPointKey) -> ResultPoints:
    """Return sparse event marker points.

    Args:
        key: Event point collection to retrieve.

    Returns:
        Result points for plotting event markers.

    Raises:
        KeyError: If the point key is unknown.
    """
    events = self.get_peak_events()
    if key == SumIntensityEventPointKey.ONSETS:
        return ResultPoints(
            key=key,
            name="Onsets",
            x=_optional_event_array(events, "onset_time_sec"),
            y=_optional_event_array(events, "onset_value"),
            x_label="Time (s)",
            y_label="Detection source",
            metadata={"description": "Accepted onset threshold crossings."},
        )
    if key == SumIntensityEventPointKey.PEAKS:
        return ResultPoints(
            key=key,
            name="Peaks",
            x=_optional_event_array(events, "peak_time_sec"),
            y=_optional_event_array(events, "peak_value"),
            x_label="Time (s)",
            y_label="Detection source",
            metadata={"description": "Refined peak locations."},
        )
    raise KeyError(f"Unknown sum-intensity event point key: {key!r}")

get_width_trace

get_width_trace(
    peak_width_level: PeakWidthLevel | None = None,
) -> ResultTrace | tuple[ResultTrace, ...]

Return NaN-separated width segment traces.

Parameters:

Name Type Description Default
peak_width_level PeakWidthLevel | None

Specific width level to return. When None, traces for all standard width levels are returned.

None

Returns:

Type Description
ResultTrace | tuple[ResultTrace, ...]

One ResultTrace when peak_width_level is supplied, otherwise

ResultTrace | tuple[ResultTrace, ...]

a tuple of traces for all levels.

Raises:

Type Description
KeyError

If a requested width level is unknown.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def get_width_trace(
    self,
    peak_width_level: PeakWidthLevel | None = None,
) -> ResultTrace | tuple[ResultTrace, ...]:
    """Return NaN-separated width segment traces.

    Args:
        peak_width_level: Specific width level to return. When None, traces
            for all standard width levels are returned.

    Returns:
        One ``ResultTrace`` when ``peak_width_level`` is supplied, otherwise
        a tuple of traces for all levels.

    Raises:
        KeyError: If a requested width level is unknown.
    """
    if peak_width_level is None:
        return tuple(self.get_width_trace(level) for level in PeakWidthLevel)
    fraction = WIDTH_LEVEL_FRACTIONS[peak_width_level]
    x_values: list[float] = []
    y_values: list[float] = []
    for event in self.get_peak_events():
        crossing = _find_event_crossing(event, fraction)
        if crossing is None or crossing.status != "ok":
            continue
        if crossing.left_index is None or crossing.right_index is None or crossing.value is None:
            continue
        seconds_per_line = float(
            self.result.summary.get(SumIntensitySummaryKey.SECONDS_PER_LINE.value, 1.0)
        )
        x_values.extend(
            [
                float(crossing.left_index) * seconds_per_line,
                float(crossing.right_index) * seconds_per_line,
                float("nan"),
            ]
        )
        y_values.extend([float(crossing.value), float(crossing.value), float("nan")])
    return ResultTrace(
        key=peak_width_level.value,
        name=f"Peak {peak_width_level.value.replace('_', ' ')}",
        x=np.asarray(x_values, dtype=float),
        y=np.asarray(y_values, dtype=float),
        x_label="Time (s)",
        y_label="Detection source",
        metadata={
            "fraction": fraction,
            "trace_type": "width_segments",
            "connectgaps": False,
        },
    )

get_summary_value

get_summary_value(key: SumIntensitySummaryKey) -> object

Return one named summary value.

Parameters:

Name Type Description Default
key SumIntensitySummaryKey

Summary key to retrieve.

required

Returns:

Type Description
object

Stored summary value, or None when the key is absent.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
660
661
662
663
664
665
666
667
668
669
def get_summary_value(self, key: SumIntensitySummaryKey) -> object:
    """Return one named summary value.

    Args:
        key: Summary key to retrieve.

    Returns:
        Stored summary value, or None when the key is absent.
    """
    return self.result.summary.get(key.value)

get_plot_data

get_plot_data() -> AnalysisPlotData | None

Return canonical df/f0 plot data.

Returns:

Type Description
AnalysisPlotData | None

Plot data using time_sec for the x axis and df_f_signal for

AnalysisPlotData | None

the y axis. Returns None when the analysis has no table output yet.

Raises:

Type Description
KeyError

If the table is present but missing required columns.

Source code in src/acqstore/acq_image/analysis/sum_intensity_analysis/sum_intensity_analysis.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
def get_plot_data(self) -> AnalysisPlotData | None:
    """Return canonical df/f0 plot data.

    Returns:
        Plot data using ``time_sec`` for the x axis and ``df_f_signal`` for
        the y axis. Returns None when the analysis has no table output yet.

    Raises:
        KeyError: If the table is present but missing required columns.
    """
    if self.result.table is None:
        return None
    trace = self.get_trace(SumIntensityTraceKey.DF_F_SIGNAL)
    return AnalysisPlotData(
        x=tuple(float(value) for value in trace.x.tolist()),
        y=tuple(float(value) for value in trace.y.tolist()),
        x_label=trace.x_label,
        y_label=trace.y_label,
        series_name=trace.name,
    )