Skip to content

kymflow_core.kym_flow_radon_gpt¤

Radon transform-based flow analysis for kymograph images.

This module implements a multiprocessing-capable flow analysis algorithm using Radon transforms to detect flow direction and velocity in kymograph data. The algorithm uses a sliding window approach with coarse and fine angle search to efficiently determine flow angles.

Classes¤

FlowCancelled ¤

Bases: Exception

Exception raised when flow analysis is cancelled.

This exception is raised when the analysis is cancelled via the is_cancelled callback during processing.

Source code in src/kymflow_core/kym_flow_radon_gpt.py
21
22
23
24
25
26
27
28
class FlowCancelled(Exception):
    """Exception raised when flow analysis is cancelled.

    This exception is raised when the analysis is cancelled via the
    is_cancelled callback during processing.
    """

    pass

Functions¤

mp_analyze_flow(data, windowsize, start_pixel=None, stop_pixel=None, *, verbose=False, progress_callback=None, progress_every=1, is_cancelled=None, use_multiprocessing=True, processes=None) ¤

Analyze blood flow in a kymograph using Radon transforms.

Performs a sliding window analysis along the time axis to detect flow direction and velocity. Uses a two-stage Radon transform approach: coarse search over 0-179 degrees, then fine refinement around the best angle.

Data convention

data is a 2D numpy array with shape (time, space), where: - axis 0 (index 0) is time (aka 'lines', 'line scans') - axis 1 (index 1) is space (aka 'pixels')

Algorithm
  • Use a sliding window along the time axis with 25% overlap.
  • For each window, run a coarse Radon transform over 0..179 degrees.
  • Find the angle that maximizes the variance in Radon space.
  • Refine around that angle with a fine grid (±2 degrees, 0.25 step).
  • Return best angles and associated fine spread.

Parameters:

Name Type Description Default
data ndarray

2D numpy array (time, space) containing the kymograph data.

required
windowsize int

Number of time lines per analysis window. Must be a multiple of 4 (stepsize is 25% of windowsize).

required
start_pixel Optional[int]

Start index in space dimension (axis 1), inclusive. If None, uses 0.

None
stop_pixel Optional[int]

Stop index in space dimension (axis 1), exclusive. If None, uses full width.

None
verbose bool

If True, prints timing and shape information to stdout.

False
progress_callback Optional[Callable[[int, int], Any]]

Optional callable(completed, total_windows) called periodically to report progress.

None
progress_every int

Emit progress every N completed windows. Defaults to 1.

1
is_cancelled Optional[Callable[[], bool]]

Optional callable() -> bool that returns True if computation should be cancelled.

None
use_multiprocessing bool

If True, uses multiprocessing.Pool for parallel computation. If False, runs sequentially. Defaults to True.

True
processes Optional[int]

Optional number of worker processes. If None, uses cpu_count() - 1 (minimum 1). Defaults to None.

None

Returns:

Type Description

Tuple containing: - thetas: 1D array (nsteps,) of best angle in degrees per window. - the_t: 1D array (nsteps,) of center time index for each window. - spread_matrix_fine: 2D array (nsteps, len(angles_fine)) of variance values for fine angles.

Raises:

Type Description
ValueError

If data is not 2D or windowsize is invalid.

FlowCancelled

If is_cancelled() returns True during processing.

Source code in src/kymflow_core/kym_flow_radon_gpt.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def mp_analyze_flow(
    data: np.ndarray,
    windowsize: int,
    start_pixel: Optional[int] = None,
    stop_pixel: Optional[int] = None,
    *,
    verbose: bool = False,
    progress_callback: Optional[Callable[[int, int], Any]] = None,
    progress_every: int = 1,
    is_cancelled: Optional[Callable[[], bool]] = None,
    use_multiprocessing: bool = True,
    processes: Optional[int] = None,
):
    """Analyze blood flow in a kymograph using Radon transforms.

    Performs a sliding window analysis along the time axis to detect flow
    direction and velocity. Uses a two-stage Radon transform approach:
    coarse search over 0-179 degrees, then fine refinement around the best
    angle.

    Data convention:
        data is a 2D numpy array with shape (time, space), where:
        - axis 0 (index 0) is time (aka 'lines', 'line scans')
        - axis 1 (index 1) is space (aka 'pixels')

    Algorithm:
        - Use a sliding window along the time axis with 25% overlap.
        - For each window, run a coarse Radon transform over 0..179 degrees.
        - Find the angle that maximizes the variance in Radon space.
        - Refine around that angle with a fine grid (±2 degrees, 0.25 step).
        - Return best angles and associated fine spread.

    Args:
        data: 2D numpy array (time, space) containing the kymograph data.
        windowsize: Number of time lines per analysis window. Must be a
            multiple of 4 (stepsize is 25% of windowsize).
        start_pixel: Start index in space dimension (axis 1), inclusive.
            If None, uses 0.
        stop_pixel: Stop index in space dimension (axis 1), exclusive.
            If None, uses full width.
        verbose: If True, prints timing and shape information to stdout.
        progress_callback: Optional callable(completed, total_windows) called
            periodically to report progress.
        progress_every: Emit progress every N completed windows. Defaults to 1.
        is_cancelled: Optional callable() -> bool that returns True if
            computation should be cancelled.
        use_multiprocessing: If True, uses multiprocessing.Pool for parallel
            computation. If False, runs sequentially. Defaults to True.
        processes: Optional number of worker processes. If None, uses
            cpu_count() - 1 (minimum 1). Defaults to None.

    Returns:
        Tuple containing:
            - thetas: 1D array (nsteps,) of best angle in degrees per window.
            - the_t: 1D array (nsteps,) of center time index for each window.
            - spread_matrix_fine: 2D array (nsteps, len(angles_fine)) of
              variance values for fine angles.

    Raises:
        ValueError: If data is not 2D or windowsize is invalid.
        FlowCancelled: If is_cancelled() returns True during processing.
    """
    start_sec = time.time()

    if data.ndim != 2:
        raise ValueError(f"data must be 2D (time, space); got shape {data.shape}")

    # time axis = 0, space axis = 1
    n_time = data.shape[0]
    n_space = data.shape[1]

    stepsize = int(0.25 * windowsize)
    if stepsize <= 0:
        raise ValueError(f"windowsize too small to compute stepsize: {windowsize}")

    nsteps = math.floor(n_time / stepsize) - 3
    if nsteps <= 0:
        raise ValueError(
            f"Invalid nsteps={nsteps}. Check windowsize={windowsize} and data.shape={data.shape}"
        )

    if start_pixel is None:
        start_pixel = 0
    if stop_pixel is None:
        stop_pixel = n_space

    # Coarse and fine angle grids (degrees)
    angles = np.arange(180, dtype=np.float32)  # 0..179 degrees
    fine_step = 0.25
    angles_fine = np.arange(-2.0, 2.0 + fine_step, fine_step, dtype=np.float32)

    # Outputs
    thetas = np.zeros(nsteps, dtype=np.float32)
    the_t = np.ones(nsteps, dtype=np.float32) * np.nan
    spread_matrix_fine = np.zeros((nsteps, len(angles_fine)), dtype=np.float32)

    if verbose:
        print(f"data shape (time, space): {data.shape}")
        print(f"  windowsize: {windowsize}, stepsize: {stepsize}")
        print(f"  n_time: {n_time}, n_space: {n_space}, nsteps: {nsteps}")
        print(f"  start_pixel: {start_pixel}, stop_pixel: {stop_pixel}")

    completed = 0
    last_emit = 0

    def cancelled() -> bool:
        return bool(is_cancelled and is_cancelled())

    def maybe_progress():
        nonlocal last_emit, completed
        if progress_callback is None:
            return
        if (completed - last_emit) >= max(1, progress_every):
            try:
                progress_callback(completed, nsteps)
            except Exception:
                # Swallow progress errors; they shouldn't kill the computation.
                pass
            last_emit = completed

    # --- Multiprocessing path ---
    if use_multiprocessing and nsteps > 1:
        proc_count = processes or (os.cpu_count() or 1) - 1
        proc_count = max(1, proc_count)

        with Pool(processes=proc_count) as pool:
            result_objs = []

            # Enqueue all windows
            for k in range(nsteps):
                if cancelled():
                    pool.terminate()
                    pool.join()
                    raise FlowCancelled(
                        "Flow analysis cancelled before submitting all windows."
                    )

                # Center time index for this window
                the_t[k] = 1 + k * stepsize + windowsize / 2.0

                t_start = k * stepsize
                t_stop = k * stepsize + windowsize
                data_window = data[t_start:t_stop, start_pixel:stop_pixel]

                params = (data_window, angles, angles_fine)
                result = pool.apply_async(radon_worker, params)
                result_objs.append(result)

            # Collect results
            for k, result in enumerate(result_objs):
                if cancelled():
                    pool.terminate()
                    pool.join()
                    raise FlowCancelled(
                        "Flow analysis cancelled while processing windows."
                    )

                worker_theta, worker_spread_fine = result.get()
                thetas[k] = worker_theta
                spread_matrix_fine[k, :] = worker_spread_fine

                completed += 1
                maybe_progress()

    # --- Single-process path (debug / small data) ---
    else:
        for k in range(nsteps):
            if cancelled():
                raise FlowCancelled("Flow analysis cancelled (single-process mode).")

            the_t[k] = 1 + k * stepsize + windowsize / 2.0

            t_start = k * stepsize
            t_stop = k * stepsize + windowsize
            data_window = data[t_start:t_stop, start_pixel:stop_pixel]

            worker_theta, worker_spread_fine = radon_worker(
                data_window, angles, angles_fine
            )
            thetas[k] = worker_theta
            spread_matrix_fine[k, :] = worker_spread_fine

            completed += 1
            maybe_progress()

    # Final progress update
    if progress_callback is not None:
        try:
            progress_callback(nsteps, nsteps)
        except Exception:
            pass

    if verbose:
        stop_sec = time.time()
        print(f"Flow analysis took {round(stop_sec - start_sec, 2)} seconds")

    return thetas, the_t, spread_matrix_fine

radon_worker(data_window, angles, angles_fine) ¤

Calculate flow angle for a single time window using Radon transforms.

This function is designed to be used as a multiprocessing worker. It performs a two-stage Radon transform: first a coarse search over all angles, then a fine search around the best coarse angle.

Parameters:

Name Type Description Default
data_window ndarray

2D numpy array (time, space) for this window slice. Time axis is 0, space axis is 1. The mean is subtracted before processing.

required
angles ndarray

1D array of coarse angles in degrees (typically 0-179).

required
angles_fine ndarray

1D array of fine angle offsets in degrees, typically small values around 0 (e.g., -2 to +2 degrees in 0.25 degree steps).

required

Returns:

Type Description
Tuple[float, ndarray]

Tuple containing: - Best angle in degrees (float) for this window. - 1D array of variance values for each fine angle.

Source code in src/kymflow_core/kym_flow_radon_gpt.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def radon_worker(
    data_window: np.ndarray,
    angles: np.ndarray,
    angles_fine: np.ndarray,
) -> Tuple[float, np.ndarray]:
    """Calculate flow angle for a single time window using Radon transforms.

    This function is designed to be used as a multiprocessing worker. It
    performs a two-stage Radon transform: first a coarse search over all
    angles, then a fine search around the best coarse angle.

    Args:
        data_window: 2D numpy array (time, space) for this window slice.
            Time axis is 0, space axis is 1. The mean is subtracted before
            processing.
        angles: 1D array of coarse angles in degrees (typically 0-179).
        angles_fine: 1D array of fine angle offsets in degrees, typically
            small values around 0 (e.g., -2 to +2 degrees in 0.25 degree steps).

    Returns:
        Tuple containing:
            - Best angle in degrees (float) for this window.
            - 1D array of variance values for each fine angle.
    """
    # Ensure float for radon + mean subtraction
    data_window = data_window.astype(np.float32, copy=False)

    # Subtract mean over entire window
    mean_val = float(np.mean(data_window))
    data_window = data_window - mean_val

    # Coarse radon transform
    # radon will return shape (len(time_projection), len(angles))
    radon_coarse = radon(data_window, theta=angles, circle=False)
    spread_coarse = np.var(radon_coarse, axis=0)  # variance per angle

    # Coarse maximum
    max_idx = int(np.argmax(spread_coarse))
    coarse_theta = float(angles[max_idx])

    # Fine search around coarse max
    fine_angles = coarse_theta + angles_fine
    radon_fine = radon(data_window, theta=fine_angles, circle=False)
    spread_fine = np.var(radon_fine, axis=0)

    fine_idx = int(np.argmax(spread_fine))
    best_theta = coarse_theta + float(angles_fine[fine_idx])

    return best_theta, spread_fine
All material is Copyright 2011-2023 Robert H. Cudmore