Skip to content

Batch analysis

Run one analysis strategy over many acquisition files.

File-level work uses a ThreadPoolExecutor. Each file is independent; inner analysis code may also use multiprocessing or threads as configured by the concrete strategy. This preserves the same scientific analysis path used by one-file GUI and scripting workflows while reducing wall-clock time for large file collections.

Parameters:

Name Type Description Default
files Sequence['AcqImage']

Acquisition images to process.

required
strategy BatchAnalysisStrategy

Per-file analysis strategy.

required
max_parallel_files int

Maximum concurrent file workers. Values smaller than one are clamped to one.

4
Source code in src/acqstore/acq_image/analysis/batch/acq_analysis_batch.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class AcqAnalysisBatch:
    """Run one analysis strategy over many acquisition files.

    File-level work uses a ``ThreadPoolExecutor``. Each file is independent;
    inner analysis code may also use multiprocessing or threads as configured by
    the concrete strategy. This preserves the same scientific analysis path used
    by one-file GUI and scripting workflows while reducing wall-clock time for
    large file collections.

    Args:
        files: Acquisition images to process.
        strategy: Per-file analysis strategy.
        max_parallel_files: Maximum concurrent file workers. Values smaller
            than one are clamped to one.
    """

    def __init__(
        self,
        files: Sequence['AcqImage'],
        strategy: BatchAnalysisStrategy,
        *,
        max_parallel_files: int = 4,
    ) -> None:
        self._files = list(files)
        self._strategy = strategy
        self._max_parallel_files = max(1, int(max_parallel_files))

    def run(
        self,
        *,
        cancel_event: threading.Event | None = None,
        on_file_result: Callable[[BatchFileResult], None] | None = None,
    ) -> list[BatchFileResult]:
        """Run the batch and return results in input order.

        Args:
            cancel_event: Optional shared cancellation event. If omitted, a new
                unset event is used.
            on_file_result: Optional callback invoked as each file completes.

        Returns:
            Per-file results in the same order as input files.
        """
        if not self._files:
            return []

        cancel_event = cancel_event or threading.Event()
        workers = min(self._max_parallel_files, len(self._files))
        results: list[BatchFileResult | None] = [None] * len(self._files)

        with ThreadPoolExecutor(max_workers=workers) as executor:
            future_to_index = {
                executor.submit(
                    self._strategy.process_file,
                    acq_image,
                    cancel_event=cancel_event,
                ): index
                for index, acq_image in enumerate(self._files)
            }
            for future in as_completed(future_to_index):
                index = future_to_index[future]
                try:
                    result = future.result()
                except Exception as exc:  # pragma: no cover - defensive fallback
                    acq_image = self._files[index]
                    result = BatchFileResult(
                        file_path=str(acq_image.path),
                        analysis_name="unknown",
                        channel=-1,
                        roi_id=None,
                        outcome=BatchFileOutcome.FAILED,
                        message=repr(exc),
                    )
                results[index] = result
                if on_file_result is not None:
                    on_file_result(result)

        return [result for result in results if result is not None]

run

run(
    *,
    cancel_event: Event | None = None,
    on_file_result: Callable[[BatchFileResult], None]
    | None = None,
) -> list[BatchFileResult]

Run the batch and return results in input order.

Parameters:

Name Type Description Default
cancel_event Event | None

Optional shared cancellation event. If omitted, a new unset event is used.

None
on_file_result Callable[[BatchFileResult], None] | None

Optional callback invoked as each file completes.

None

Returns:

Type Description
list[BatchFileResult]

Per-file results in the same order as input files.

Source code in src/acqstore/acq_image/analysis/batch/acq_analysis_batch.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def run(
    self,
    *,
    cancel_event: threading.Event | None = None,
    on_file_result: Callable[[BatchFileResult], None] | None = None,
) -> list[BatchFileResult]:
    """Run the batch and return results in input order.

    Args:
        cancel_event: Optional shared cancellation event. If omitted, a new
            unset event is used.
        on_file_result: Optional callback invoked as each file completes.

    Returns:
        Per-file results in the same order as input files.
    """
    if not self._files:
        return []

    cancel_event = cancel_event or threading.Event()
    workers = min(self._max_parallel_files, len(self._files))
    results: list[BatchFileResult | None] = [None] * len(self._files)

    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_index = {
            executor.submit(
                self._strategy.process_file,
                acq_image,
                cancel_event=cancel_event,
            ): index
            for index, acq_image in enumerate(self._files)
        }
        for future in as_completed(future_to_index):
            index = future_to_index[future]
            try:
                result = future.result()
            except Exception as exc:  # pragma: no cover - defensive fallback
                acq_image = self._files[index]
                result = BatchFileResult(
                    file_path=str(acq_image.path),
                    analysis_name="unknown",
                    channel=-1,
                    roi_id=None,
                    outcome=BatchFileOutcome.FAILED,
                    message=repr(exc),
                )
            results[index] = result
            if on_file_result is not None:
                on_file_result(result)

    return [result for result in results if result is not None]

Run the same Radon velocity analysis over a list of files.

Parameters:

Name Type Description Default
channel int

Explicit channel index used for every file.

required
roi_mode RoiBatchMode

How each file's target ROI is selected.

required
roi_id int | None

Required ROI identifier for ANALYZE_EXISTING_ROI mode.

required
detection_params dict[str, object]

Detection parameters passed to each new analysis.

required
use_multiprocessing bool

Whether the per-file Radon analysis may use multiprocessing. This parameter is stored for API clarity; the current wrapper uses RadonVelocityAnalysis defaults until the analysis wrapper exposes runtime execution options.

True
processes int | None

Optional number of worker processes for the per-file Radon algorithm. None means the algorithm chooses a CPU-count based default. This parameter is stored for API clarity; the current wrapper uses RadonVelocityAnalysis defaults until execution options are exposed.

None
Source code in src/acqstore/acq_image/analysis/batch/radon_velocity_batch_strategy.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class RadonVelocityBatchStrategy:
    """Run the same Radon velocity analysis over a list of files.

    Args:
        channel: Explicit channel index used for every file.
        roi_mode: How each file's target ROI is selected.
        roi_id: Required ROI identifier for ``ANALYZE_EXISTING_ROI`` mode.
        detection_params: Detection parameters passed to each new analysis.
        use_multiprocessing: Whether the per-file Radon analysis may use
            multiprocessing. This parameter is stored for API clarity; the
            current wrapper uses ``RadonVelocityAnalysis`` defaults until the
            analysis wrapper exposes runtime execution options.
        processes: Optional number of worker processes for the per-file Radon
            algorithm. ``None`` means the algorithm chooses a CPU-count based
            default. This parameter is stored for API clarity; the current
            wrapper uses ``RadonVelocityAnalysis`` defaults until execution
            options are exposed.
    """

    analysis_name = RadonVelocityAnalysis.analysis_name
    kind = AnalysisBatchKind.RADON_VELOCITY

    def __init__(
        self,
        *,
        channel: int,
        roi_mode: RoiBatchMode,
        roi_id: int | None,
        detection_params: dict[str, object],
        use_multiprocessing: bool = True,
        processes: int | None = None,
    ) -> None:
        self._channel = int(channel)
        self._roi_mode = RoiBatchMode(roi_mode)
        self._roi_id = None if roi_id is None else int(roi_id)
        self._detection_params = dict(detection_params)
        self._use_multiprocessing = bool(use_multiprocessing)
        self._processes = processes
        RadonVelocityAnalysis.validate_detection_params(self._detection_params)
        if self._roi_mode is RoiBatchMode.ANALYZE_EXISTING_ROI and self._roi_id is None:
            raise ValueError("roi_id is required for ANALYZE_EXISTING_ROI mode")

    def process_file(
        self,
        acq_image: 'AcqImage',
        *,
        cancel_event: threading.Event,
    ) -> BatchFileResult:
        """Run Radon velocity analysis for one file.

        Args:
            acq_image: Acquisition image to analyze.
            cancel_event: Shared cancellation event.

        Returns:
            Per-file batch result. The ``AcqImage`` is mutated on success, but
            it is not saved.
        """
        if cancel_event.is_set():
            return self._result(acq_image, None, BatchFileOutcome.CANCELLED, "cancelled")

        target_roi_id = self._resolve_roi_id(acq_image)
        if target_roi_id is None:
            return self._result(
                acq_image,
                None,
                BatchFileOutcome.SKIPPED_MISSING_ROI,
                f"ROI {self._roi_id} not in file",
            )

        key = AnalysisKey(self.analysis_name, self._channel, target_roi_id)
        acq_image.analysis_set.remove(key)

        try:
            analysis = acq_image.analysis_set.create(
                self.analysis_name,
                channel=self._channel,
                roi_id=target_roi_id,
                detection_params=self._detection_params,
            )
            if isinstance(analysis, RadonVelocityAnalysis):
                analysis.set_execution_options(
                    use_multiprocessing=self._use_multiprocessing,
                    processes=self._processes,
                )
            context = AnalysisRunContext(cancel_callback=cancel_event.is_set)
            acq_image.analysis_set.run_analysis(analysis.key, context=context)
        except AnalysisCancelled:
            return self._result(acq_image, target_roi_id, BatchFileOutcome.CANCELLED, "cancelled")
        except AnalysisExclusionError as exc:
            return self._result(acq_image, target_roi_id, BatchFileOutcome.SKIPPED_CONFLICT, str(exc))
        except Exception as exc:
            return self._result(acq_image, target_roi_id, BatchFileOutcome.FAILED, repr(exc))

        return self._result(acq_image, target_roi_id, BatchFileOutcome.OK, "ok")

    def _resolve_roi_id(self, acq_image: 'AcqImage') -> int | None:
        """Resolve the target ROI identifier for one file.

        Args:
            acq_image: Acquisition image to analyze.

        Returns:
            ROI identifier, or None when an existing ROI is required but absent.
        """
        if self._roi_mode is RoiBatchMode.ADD_NEW_ROI:
            roi = acq_image.rois.create_rect_roi()
            return roi.roi_id

        assert self._roi_id is not None
        if not acq_image.rois.has_roi(self._roi_id):
            return None
        return self._roi_id

    def _result(
        self,
        acq_image: 'AcqImage',
        roi_id: int | None,
        outcome: BatchFileOutcome,
        message: str,
    ) -> BatchFileResult:
        """Build a per-file batch result.

        Args:
            acq_image: Acquisition image that was processed.
            roi_id: ROI identifier used for analysis, if any.
            outcome: Batch outcome.
            message: Human-readable outcome message.

        Returns:
            Batch file result.
        """
        return BatchFileResult(
            file_path=str(acq_image.path),
            analysis_name=self.analysis_name,
            channel=self._channel,
            roi_id=roi_id,
            outcome=outcome,
            message=message,
        )

process_file

process_file(
    acq_image: 'AcqImage', *, cancel_event: Event
) -> BatchFileResult

Run Radon velocity analysis for one file.

Parameters:

Name Type Description Default
acq_image 'AcqImage'

Acquisition image to analyze.

required
cancel_event Event

Shared cancellation event.

required

Returns:

Type Description
BatchFileResult

Per-file batch result. The AcqImage is mutated on success, but

BatchFileResult

it is not saved.

Source code in src/acqstore/acq_image/analysis/batch/radon_velocity_batch_strategy.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def process_file(
    self,
    acq_image: 'AcqImage',
    *,
    cancel_event: threading.Event,
) -> BatchFileResult:
    """Run Radon velocity analysis for one file.

    Args:
        acq_image: Acquisition image to analyze.
        cancel_event: Shared cancellation event.

    Returns:
        Per-file batch result. The ``AcqImage`` is mutated on success, but
        it is not saved.
    """
    if cancel_event.is_set():
        return self._result(acq_image, None, BatchFileOutcome.CANCELLED, "cancelled")

    target_roi_id = self._resolve_roi_id(acq_image)
    if target_roi_id is None:
        return self._result(
            acq_image,
            None,
            BatchFileOutcome.SKIPPED_MISSING_ROI,
            f"ROI {self._roi_id} not in file",
        )

    key = AnalysisKey(self.analysis_name, self._channel, target_roi_id)
    acq_image.analysis_set.remove(key)

    try:
        analysis = acq_image.analysis_set.create(
            self.analysis_name,
            channel=self._channel,
            roi_id=target_roi_id,
            detection_params=self._detection_params,
        )
        if isinstance(analysis, RadonVelocityAnalysis):
            analysis.set_execution_options(
                use_multiprocessing=self._use_multiprocessing,
                processes=self._processes,
            )
        context = AnalysisRunContext(cancel_callback=cancel_event.is_set)
        acq_image.analysis_set.run_analysis(analysis.key, context=context)
    except AnalysisCancelled:
        return self._result(acq_image, target_roi_id, BatchFileOutcome.CANCELLED, "cancelled")
    except AnalysisExclusionError as exc:
        return self._result(acq_image, target_roi_id, BatchFileOutcome.SKIPPED_CONFLICT, str(exc))
    except Exception as exc:
        return self._result(acq_image, target_roi_id, BatchFileOutcome.FAILED, repr(exc))

    return self._result(acq_image, target_roi_id, BatchFileOutcome.OK, "ok")

Run diameter analysis over one file at a time for AcqAnalysisBatch.

The strategy intentionally uses the same AcqAnalysisSet.create and AcqAnalysisSet.run_analysis path as single-file GUI analysis. It only chooses the target ROI, configures runtime execution options, and converts exceptions/cancellation into per-file batch results.

Parameters:

Name Type Description Default
channel int

Explicit channel index used for every file.

required
roi_mode RoiBatchMode

How each file's target ROI is selected.

required
roi_id int | None

Required ROI identifier for ANALYZE_EXISTING_ROI mode.

required
detection_params dict[str, object]

Detection parameters passed to each new analysis.

required
use_threads bool

Whether each per-file diameter analysis may use threads.

True
max_workers int | None

Optional worker count for each per-file diameter analysis.

None
Source code in src/acqstore/acq_image/analysis/batch/diameter_batch_strategy.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class DiameterBatchStrategy:
    """Run diameter analysis over one file at a time for ``AcqAnalysisBatch``.

    The strategy intentionally uses the same ``AcqAnalysisSet.create`` and
    ``AcqAnalysisSet.run_analysis`` path as single-file GUI analysis. It only
    chooses the target ROI, configures runtime execution options, and converts
    exceptions/cancellation into per-file batch results.

    Args:
        channel: Explicit channel index used for every file.
        roi_mode: How each file's target ROI is selected.
        roi_id: Required ROI identifier for ``ANALYZE_EXISTING_ROI`` mode.
        detection_params: Detection parameters passed to each new analysis.
        use_threads: Whether each per-file diameter analysis may use threads.
        max_workers: Optional worker count for each per-file diameter analysis.
    """

    analysis_name = DiameterAnalysis.analysis_name
    kind = AnalysisBatchKind.DIAMETER

    def __init__(
        self,
        *,
        channel: int,
        roi_mode: RoiBatchMode,
        roi_id: int | None,
        detection_params: dict[str, object],
        use_threads: bool = True,
        max_workers: int | None = None,
    ) -> None:
        self._channel = int(channel)
        self._roi_mode = RoiBatchMode(roi_mode)
        self._roi_id = None if roi_id is None else int(roi_id)
        self._detection_params = dict(detection_params)
        self._use_threads = bool(use_threads)
        self._max_workers = max_workers
        DiameterAnalysis.validate_detection_params(self._detection_params)
        if self._roi_mode is RoiBatchMode.ANALYZE_EXISTING_ROI and self._roi_id is None:
            raise ValueError("roi_id is required for ANALYZE_EXISTING_ROI mode")

    def process_file(
        self,
        acq_image: AcqImage,
        *,
        cancel_event: threading.Event,
    ) -> BatchFileResult:
        """Run diameter analysis for one file.

        Args:
            acq_image: Acquisition image to analyze.
            cancel_event: Shared cancellation event.

        Returns:
            Per-file batch result. The ``AcqImage`` is mutated on success, but
            it is not saved.
        """
        if cancel_event.is_set():
            return self._result(acq_image, None, BatchFileOutcome.CANCELLED, "cancelled")

        target_roi_id = self._resolve_roi_id(acq_image)
        if target_roi_id is None:
            return self._result(
                acq_image,
                None,
                BatchFileOutcome.SKIPPED_MISSING_ROI,
                f"ROI {self._roi_id} not in file",
            )

        key = AnalysisKey(self.analysis_name, self._channel, target_roi_id)
        acq_image.analysis_set.remove(key)

        try:
            analysis = acq_image.analysis_set.create(
                self.analysis_name,
                channel=self._channel,
                roi_id=target_roi_id,
                detection_params=self._detection_params,
            )
            if isinstance(analysis, DiameterAnalysis):
                analysis.set_execution_options(
                    use_threads=self._use_threads,
                    max_workers=self._max_workers,
                )
            context = AnalysisRunContext(cancel_callback=cancel_event.is_set)
            acq_image.analysis_set.run_analysis(analysis.key, context=context)
        except AnalysisCancelled:
            return self._result(acq_image, target_roi_id, BatchFileOutcome.CANCELLED, "cancelled")
        except AnalysisExclusionError as exc:
            return self._result(acq_image, target_roi_id, BatchFileOutcome.SKIPPED_CONFLICT, str(exc))
        except Exception as exc:
            return self._result(acq_image, target_roi_id, BatchFileOutcome.FAILED, repr(exc))

        return self._result(acq_image, target_roi_id, BatchFileOutcome.OK, "ok")

    def _resolve_roi_id(self, acq_image: AcqImage) -> int | None:
        """Resolve the target ROI identifier for one file.

        Args:
            acq_image: Acquisition image to analyze.

        Returns:
            ROI identifier, or None when an existing ROI is required but absent.
        """
        if self._roi_mode is RoiBatchMode.ADD_NEW_ROI:
            roi = acq_image.rois.create_rect_roi()
            return roi.roi_id

        assert self._roi_id is not None
        if not acq_image.rois.has_roi(self._roi_id):
            return None
        return self._roi_id

    def _result(
        self,
        acq_image: AcqImage,
        roi_id: int | None,
        outcome: BatchFileOutcome,
        message: str,
    ) -> BatchFileResult:
        """Build a per-file batch result.

        Args:
            acq_image: Acquisition image that was processed.
            roi_id: ROI identifier used for analysis, if any.
            outcome: Batch outcome.
            message: Human-readable outcome message.

        Returns:
            Batch file result.
        """
        return BatchFileResult(
            file_path=str(acq_image.path),
            analysis_name=self.analysis_name,
            channel=self._channel,
            roi_id=roi_id,
            outcome=outcome,
            message=message,
        )

process_file

process_file(
    acq_image: AcqImage, *, cancel_event: Event
) -> BatchFileResult

Run diameter analysis for one file.

Parameters:

Name Type Description Default
acq_image AcqImage

Acquisition image to analyze.

required
cancel_event Event

Shared cancellation event.

required

Returns:

Type Description
BatchFileResult

Per-file batch result. The AcqImage is mutated on success, but

BatchFileResult

it is not saved.

Source code in src/acqstore/acq_image/analysis/batch/diameter_batch_strategy.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def process_file(
    self,
    acq_image: AcqImage,
    *,
    cancel_event: threading.Event,
) -> BatchFileResult:
    """Run diameter analysis for one file.

    Args:
        acq_image: Acquisition image to analyze.
        cancel_event: Shared cancellation event.

    Returns:
        Per-file batch result. The ``AcqImage`` is mutated on success, but
        it is not saved.
    """
    if cancel_event.is_set():
        return self._result(acq_image, None, BatchFileOutcome.CANCELLED, "cancelled")

    target_roi_id = self._resolve_roi_id(acq_image)
    if target_roi_id is None:
        return self._result(
            acq_image,
            None,
            BatchFileOutcome.SKIPPED_MISSING_ROI,
            f"ROI {self._roi_id} not in file",
        )

    key = AnalysisKey(self.analysis_name, self._channel, target_roi_id)
    acq_image.analysis_set.remove(key)

    try:
        analysis = acq_image.analysis_set.create(
            self.analysis_name,
            channel=self._channel,
            roi_id=target_roi_id,
            detection_params=self._detection_params,
        )
        if isinstance(analysis, DiameterAnalysis):
            analysis.set_execution_options(
                use_threads=self._use_threads,
                max_workers=self._max_workers,
            )
        context = AnalysisRunContext(cancel_callback=cancel_event.is_set)
        acq_image.analysis_set.run_analysis(analysis.key, context=context)
    except AnalysisCancelled:
        return self._result(acq_image, target_roi_id, BatchFileOutcome.CANCELLED, "cancelled")
    except AnalysisExclusionError as exc:
        return self._result(acq_image, target_roi_id, BatchFileOutcome.SKIPPED_CONFLICT, str(exc))
    except Exception as exc:
        return self._result(acq_image, target_roi_id, BatchFileOutcome.FAILED, repr(exc))

    return self._result(acq_image, target_roi_id, BatchFileOutcome.OK, "ok")