Coverage for polars_analysis / analysis / pedestal_analysis.py: 82%
139 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import logging
2from typing import List, Literal, Optional, Union
4import numpy as np
5import polars as pl
6import scipy.signal as sps # type: ignore
7from scipy.stats import gamma
9from polars_analysis.analysis import constants
10from polars_analysis.plotting.helper import calc_gaussian, gauss
12# Instantiate logger
13log = logging.getLogger(__name__)
15"""
16Functions to calculate derived values for pedestal runs.
17"""
20def next_power_of_2(x: int) -> int:
21 """
22 Utility function for determining if we should calculate coherent noise.
23 Calculate the nearest larger power of 2.
24 e.g. 3->4, 4->4, 5->8, ..., 100->128, etc.
25 """
26 return 2 ** (x - 1).bit_length()
29def calc_coherent_noise(
30 df: pl.DataFrame,
31 min_channel: int,
32 n_channels: int,
33 run_number: int,
34 board_id: str,
35 measurement: int,
36 pas_mode: int,
37 gain: Literal["hi", "lo"],
38 skip_channels: Optional[List[int]] = None,
39 col: str = "samples",
40) -> pl.DataFrame:
41 """
42 Calculate the coherent noise for a given measurement, gain, and channel range.
44 Refer to Section 5 of this paper for a description of coherent noise:
45 https://cds.cern.ch/record/683745/files/tilecal-98-168.pdf
46 In short, the total noise in n channels can be broken down as
48 .. math:: σ_n^2 = n * σ_{rnd}^2 + n^2 * σ_{coh}^2
50 where σ_n is the total noise, σ_rnd is the random per channel noise,
51 and sigma_coh is the coherent noise.
52 In terms of the output DataFrame column names:
54 * σ_n => tot_noise
55 * σ_rnd => avg_noise
56 * σ_coh => coh_noise
58 :param df: The DataFrame to calculate the coherent noise from.
59 :type df: pl.DataFrame
60 :param min_channel: The minimum channel to calculate the coherent noise from.
61 :type min_channel: int
62 :param n_channels: The number of channels to calculate the coherent noise from.
63 :type n_channels: int
64 :param run_number: The run number to calculate the coherent noise from.
65 :type run_number: int
66 :param board_id: The board ID.
67 :type board_id: str
68 :param measurement: The measurement number to calculate the coherent noise from.
69 :type measurement: int
70 :param gain: The gain to calculate the coherent noise from.
71 :type gain: Literal["hi", "lo"]
72 :param col: The column to calculate the coherent noise from. Defaults to "samples".
73 :type col: str, optional
74 :return: A DataFrame with one row containing the coherent noise results. It has the columns:
76 * ch_noise: The square root of the sum of squares of the channel noise
77 * d_ch_noise: ch_noise but each error is divided by the number of samples
78 * avg_noise: ch_noise divided by the square root of the number of channels
79 * d_avg: d_ch_noise divided by the square root of the number of channels
80 * data_sum: The sum of the baseline subtracted data, per sample rather than per channel
81 * tot_noise: The standard deviation of data_sum
82 * coh_noise: The coherent noise
83 * pct_coh: The coherent noise expressed as a percentage of the average noise
84 * d_coh: The coherent noise error
85 * d_pct: The percent coherent noise error
86 * gain: The measurement gain
87 * min_channel: The minimum channel used for the calculation
88 * n_channels: The number of channels used for the calculation
90 :rtype: pl.DataFrame
91 """
92 filtered_df = df.filter(
93 pl.col("run_number") == run_number,
94 pl.col("measurement") == measurement,
95 pl.col("gain") == gain,
96 pl.col("channel").is_in(range(min_channel, min_channel + n_channels)),
97 pl.col("samples").list.len() != 0,
98 )
100 # If we could do the same calculation with a smaller n_channels, return early
101 # We do every other power of 2 (besides 64 -> 128)
102 if next_power_of_2(filtered_df["channel"].unique().shape[0]) < n_channels // 2:
103 log.debug("Returning early from coh_noise")
104 return pl.DataFrame()
106 if min_channel == 0 and n_channels % 128 == 0:
107 if skip_channels is None:
108 skip_channels = []
109 present_channels = filtered_df["channel"].to_list()
110 for i in skip_channels:
111 if i not in present_channels:
112 log.warning(f"Error, channel {i} already not present in df")
114 if skip_channels:
115 filtered_df = filtered_df.filter(~pl.col("channel").is_in(skip_channels))
117 if filtered_df.is_empty():
118 return pl.DataFrame()
120 n_samples = filtered_df.select(pl.col(col).list.len().min()).item()
121 filtered_channels = filtered_df["channel"].unique().to_list()
122 n_filtered_channels = len(filtered_channels)
124 data_sum: np.ndarray = (
125 filtered_df.select(
126 pl.col(col).list.eval(pl.element() - pl.element().mean()).list.head(n_samples).list.to_array(n_samples)
127 )
128 .to_series()
129 .to_numpy()
130 .sum(axis=0)
131 )
132 bin_width = 2 * max(1, round((max(data_sum) - min(data_sum)) / 100))
133 data_sum_bins = np.arange(min(data_sum), max(data_sum) + bin_width / 2, bin_width)
134 data_sum_hist, _ = np.histogram(data_sum, bins=data_sum_bins)
136 results_df = (
137 filtered_df.lazy()
138 .select(
139 ch_noise=pl.col(col).list.std().pow(2).sum().sqrt(),
140 d_ch_noise=pl.col(col).list.eval(pl.element().std().pow(2) / pl.element().len()).list.first().sum().sqrt(),
141 avg_noise=pl.col(col).list.std().pow(2).sum().sqrt() / np.sqrt(n_filtered_channels),
142 d_avg=(pl.col(col).list.eval(pl.element().std().pow(2) / pl.element().len()).list.first().sum().sqrt())
143 / np.sqrt(n_filtered_channels),
144 )
145 .with_columns(
146 data_sum=pl.Series(name="data_sum", values=data_sum).implode(),
147 data_sum_hist=pl.Series(name="data_sum_hist", values=data_sum_hist).implode(),
148 data_sum_bins=pl.Series(name="data_sum_bins", values=data_sum_bins).implode(),
149 )
150 .with_columns(tot_noise=pl.col("data_sum").list.std())
151 .with_columns(coh_noise=(pl.col("tot_noise") ** 2 - pl.col("ch_noise") ** 2).sqrt() / n_filtered_channels)
152 .with_columns(
153 pct_coh=100 * pl.col("coh_noise") / pl.col("avg_noise"),
154 d_coh=(
155 (pl.col("tot_noise") ** 2 / (np.sqrt(n_samples) * pl.col("coh_noise") * n_filtered_channels**2)) ** 2
156 + (pl.col("d_ch_noise") * pl.col("ch_noise") / (pl.col("coh_noise") * n_filtered_channels**2)) ** 2
157 ).sqrt(),
158 )
159 .with_columns(
160 d_pct=pl.col("pct_coh")
161 * ((pl.col("d_coh") / pl.col("coh_noise")) ** 2 + (pl.col("d_avg") / pl.col("avg_noise")) ** 2).sqrt()
162 )
163 .with_columns(
164 pl.lit(run_number).alias("run_number"),
165 pl.lit(board_id).alias("board_id"),
166 pl.lit(pas_mode).alias("pas_mode"),
167 pl.lit(measurement).alias("measurement"),
168 pl.lit(gain).alias("gain"),
169 min_channel=min_channel, # this is a choice that only controls the plot file name
170 n_channels=n_channels, # this is a choice that only controls the plot file name
171 channel_list=filtered_channels,
172 )
173 .select(pl.exclude("data_sum"))
174 )
176 return results_df.collect()
179def calc_correlation_matrix(
180 df: pl.DataFrame,
181 measurements: List[int],
182 gain: Literal["hi", "lo"],
183 col: str = "samples",
184 multiple_boards: Optional[List[str]] = None,
185) -> np.ndarray:
186 """
187 Calculate the correlation matrix for a given measurement, gain, and channel range.
189 :param df: The DataFrame to calculate the correlation matrix from
190 :type df: pl.DataFrame
191 :param measurement: The measurement number to calculate the correlation matrix from
192 :type measurement: int
193 :param gain: The gain to calculate the correlation matrix from
194 :type gain: Literal["hi", "lo"]
195 :param col: The column to calculate the correlation matrix from. Defaults to "samples".
196 :type col: str, optional
197 :param multiple_boards: List of board IDs only for multiple board runs
198 :type multiple_boards: Optional[List[str]], optional
199 :return: A 128x128 correlation matrix, filled for input channels, and padded with zeros.
200 :rtype: npt.NDArray[np.float64]
201 """
202 log.debug("Computing correlation matrix")
204 # Make 128 channel matrix of zeros here and update with output below.
205 nchan = 128 * len(multiple_boards) if multiple_boards is not None else 128
206 tmp_matrix = np.zeros([nchan, nchan])
208 for measurement in measurements:
209 measurement_df = df.filter(
210 pl.col("measurement") == measurement,
211 pl.col("gain") == gain,
212 pl.col("samples").list.len() != 0,
213 )
215 width = df.select(pl.col(col).list.len().min()).item()
216 matrix = (
217 measurement_df.select(pl.col(col).list.slice(0, width).list.to_array(width).arr.to_struct())
218 .unnest(col)
219 .transpose()
220 .corr()
221 .to_numpy()
222 )
224 # An array of the actual channels available in the dataframe
225 channel_mask = np.unique(measurement_df.select(pl.col("channel")).transpose().to_numpy().flatten())
227 # Update return matrix
228 tmp_matrix[np.ix_(channel_mask, channel_mask)] = matrix
230 return tmp_matrix
233def pipe_psd(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame:
234 """
235 Calculate the PSD of the samples column as well as the frequency axis for the PSD
237 :param df: The DataFrame to calculate the PSD from
238 :type df: pl.DataFrame
239 :param col: The column to calculate the PSD from. Defaults to "samples".
240 :type col: str, optional
241 :return: A DataFrame with the psd and freq columns added
242 :rtype: pl.DataFrame
243 """
244 log.debug("Calculating PSD")
245 width = df.select(pl.col(col).list.len().min()).item()
246 samples = df.select(pl.col(col).list.slice(0, width).list.to_array(width)).to_series().to_numpy()
247 freq, psd = sps.welch(samples, fs=constants.FLX_FRQ_40MHZ, nperseg=2**10, axis=1, average="mean")
249 # Remove DC component and last bin, which often shows peak or dip which is likely an artifact
250 psd = psd[:, 1:-1]
251 freq = freq[1:-1]
253 freq = np.tile(freq, (psd.shape[0], 1))
255 ## Find peaks in distribution
257 # Convert to dBFS
258 psd_dbfs = 10 * np.log10(np.array(psd) / (2**constants.ADC_BITS) ** 2)
260 # Peak detection strategy for dB scale:
261 # 1. Calculate rolling median to establish local baseline
262 # 2. Find peaks relative to local baseline
264 # Use convolution for rolling median estimation
265 window_size = 7 # Should be odd; adjust based on your needs
266 window: np.ndarray = np.ones(window_size) / window_size
268 baseline = np.apply_along_axis(np.convolve, 1, psd_dbfs, window, mode="same")
270 # At edges, use the first/last valid baseline value
271 baseline[:, : window_size // 2] = np.repeat(baseline[:, window_size // 2, None], window_size // 2, axis=1)
272 baseline[:, -window_size // 2 :] = np.repeat(
273 baseline[:, -window_size // 2 - 1, None], np.abs(-window_size // 2), axis=1
274 )
276 # Calculate deviation from local baseline
277 deviation = psd_dbfs - baseline
279 # Define peak parameters
280 min_peak_height = 0.5 # Minimum dB above local baseline
281 min_peak_distance = int(constants.FFT_SIZE / 100) # Minimum distance between peaks
283 # numpy can't handle jagged return array
284 # Find peaks relative to baseline
285 peaks = []
286 peak_heights = []
287 for row in deviation:
288 found_peaks, pkh_dict = sps.find_peaks(row, height=min_peak_height, distance=min_peak_distance)
289 peaks.append(found_peaks)
291 # Peak height in PSD
292 # peak_heights.append([psd_dbfs[i][p] for p in found_peaks])
293 # Maybe we want the height above baseline
294 # "peak_heights" is in pkh_dict as long as height kwarg is passed
295 peak_heights.append(pkh_dict["peak_heights"]) # type: ignore
297 return df.with_columns(
298 pl.Series(name="psd", values=psd, dtype=pl.List(pl.Float64)),
299 pl.Series(name="freq", values=freq, dtype=pl.List(pl.Float64)),
300 pl.Series(name="peaks", values=peaks, dtype=pl.List(pl.Float64)),
301 pl.Series(name="peak_heights", values=peak_heights, dtype=pl.List(pl.Float64)),
302 )
305def calc_coherence(c1: int, c2: int, df: pl.DataFrame) -> Union[tuple, None]:
306 # Check if channels are in DF
307 if c1 not in df["channel"]:
308 log.warning(f"Channel {c1} not found in dataframe")
309 return None
310 if c2 not in df["channel"]:
311 log.warning(f"Channel {c2} not found in dataframe")
312 return None
314 f, Cxy = sps.coherence(
315 df.filter(pl.col("channel") == c1).select(pl.col("samples")).to_series()[0].to_numpy(),
316 df.filter(pl.col("channel") == c2).select(pl.col("samples")).to_series()[0].to_numpy(),
317 fs=constants.FLX_FRQ_40MHZ,
318 nperseg=2**10,
319 )
321 return (f, Cxy)
324def pipe_fft(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame:
325 """
326 Calculate the FFT of the samples column as well as the frequency axis for the FFT
328 :param df: The DataFrame to calculate the FFT from
329 :type df: pl.DataFrame
330 :param col: The column to calculate the FFT from. Defaults to "samples".
331 :type col: str, optional
332 :return: A DataFrame with the fft and freq columns added
333 :rtype: pl.DataFrame
334 """
335 log.debug("Calculating FFT")
336 width = df.select(pl.col(col).list.len().min()).item()
337 fourier = np.fft.fft(
338 df.select(pl.col(col).list.slice(0, width).list.to_array(width)).to_series().to_numpy(), axis=1
339 )
340 fourier = np.abs(fourier)[:, 1:]
342 freq = np.fft.fftfreq(fourier.shape[1], d=1 / constants.FLX_FRQ_40MHZ)
343 freq = freq[: int(freq.shape[0] / 2)]
344 freq = np.tile(freq, (fourier.shape[0], 1))
346 fourier = fourier[:, : int(fourier.shape[1] / 2)]
347 fourier = fourier / np.max(fourier, axis=1)[:, None]
349 return df.with_columns(
350 pl.Series(name="fft", values=fourier, dtype=pl.List(pl.Float64)),
351 pl.Series(name="freq", values=freq, dtype=pl.List(pl.Float64)),
352 )
355def pipe_chi2(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame:
356 """
357 Calculate the chi² per degree of freedom for each entry in the samples column,
358 with all logic in-line.
360 Steps:
361 1. Convert the samples to a numpy array.
362 2. Define histogram bins with a fixed bin width of 1.
363 3. If there aren't enough bins, assign NaN.
364 4. Compute the histogram and bin centers.
365 5. Fit a Gaussian to the histogram using calc_gaussian and gauss.
366 6. Calculate degrees of freedom (number of bins minus 3).
367 7. Compute asymmetric Poisson errors using the gamma distribution.
368 8. Calculate the chi² per degree of freedom.
370 :param df: The input DataFrame.
371 :param col: The column containing the sample data (default "samples").
372 :return: A DataFrame with an added "chi2_dof" column.
373 """
374 log.debug("Calculating chi²/dof for the samples column")
375 chi2_values = []
377 for samples in df[col]:
378 s = np.asarray(samples)
380 bins = np.arange(s.min(), s.max() + 1, 1)
381 if bins.size <= 1:
382 chi2_values.append(np.nan)
383 continue
385 hist, _ = np.histogram(s, bins=bins)
386 centers = 0.5 * (bins[1:] + bins[:-1])
388 fp = calc_gaussian(s, bins)
389 gauss_fit = gauss(centers, mu=fp[0], sigma=fp[2], N=fp[4])
390 dof = centers.size - 3
392 # Compute asymmetric Poisson errors using the gamma distribution
393 a = 0.32 # approximately 1 sigma
394 err_up = gamma.ppf(1 - a / 2, gauss_fit + 1, scale=1) - gauss_fit
395 err_dw = gauss_fit - gamma.ppf(a / 2, gauss_fit, scale=1)
396 residuals = hist - gauss_fit
397 err = np.where(residuals > 0, err_up, err_dw)
399 chi2 = np.sum((residuals) ** 2 / (err**2)) / dof
400 chi2_values.append(chi2)
402 return df.with_columns(pl.Series("chi2_dof", chi2_values))
405def pipe_autocorr(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame:
406 """
407 Calculate the autocorrelation of the samples column
409 :param df: The DataFrame to calculate the autocorrelation from
410 :type df: pl.DataFrame
411 :param col: The column to calculate the autocorrelation from. Defaults to "samples".
412 :type col: str, optional
413 :return: A DataFrame with the autocorr column added
414 :rtype: pl.DataFrame
415 """
416 log.debug("Calculating autocorrelation")
417 width = df.select(pl.col(col).list.len().min()).item()
418 data = (
419 df.select(
420 pl.col(col)
421 .list.eval(pl.element() - pl.element().mean())
422 .list.slice(0, width)
423 .list.to_array(width)
424 .arr.to_struct()
425 )
426 .unnest(col)
427 .fill_null(0)
428 .to_numpy()
429 )
430 correlate = np.vectorize(sps.correlate, signature="(n),(n)->(k)")
431 result = correlate(data, data)
432 result /= np.max(result, axis=1)[:, None]
433 result = result[:, result.shape[1] // 2 :]
435 return df.with_columns(pl.Series(name="autocorr", values=result, dtype=pl.List(pl.Float64)))
438def expr_mean(col: str = "samples") -> pl.Expr:
439 """
440 Calculate the mean of the samples column
442 :param col: The column name to calculate the mean of
443 :type col: str, optional
444 :return: The mean of the samples column
445 :rtype: pl.Expr
446 """
447 return pl.col(col).list.mean()
450def expr_rms(col: str = "samples") -> pl.Expr:
451 """
452 Calculate the root mean square of the samples column
454 :param col: The column name to calculate the rms of
455 :type col: str, optional
456 :return: The rms of the samples column
457 :rtype: pl.Expr
458 """
459 return pl.col(col).list.std()
462def expr_max_min(col: str = "samples") -> pl.Expr:
463 """
464 Calculate the difference between the max and min of the samples columns
466 :param col: The column name to calculate the max-min of
467 :type col: str, optional
468 :return: The max-min of the samples column
469 :rtype: pl.Expr
470 """
471 return pl.col(col).list.max() - pl.col(col).list.min()
474def expr_psd(col: str = "fft") -> pl.Expr:
475 """
476 Calculate the power spectral density of the FFT column
478 :param col: The column to calculate the PSD from. Defaults to "fft".
479 :type col: str, optional
480 :return: The power spectral density of the FFT column
481 :rtype: pl.Expr
482 """
483 return pl.col(col).list.eval(20 * np.log10(pl.element()))
486def expr_sinad(col: str = "fft") -> pl.Expr:
487 """
488 Calculate the signal-to-noise and distortion ratio of the FFT column
490 :param col: The column to calculate the SINAD from. Defaults to "fft".
491 :type col: str, optional
492 :return: The signal-to-noise and distortion ratio of the FFT column
493 :rtype: pl.Expr
494 """
495 return -10 * pl.col(col).list.eval(pl.element().filter(pl.element() < 1) ** 2).list.sum().log10()
498def expr_enob(col: str = "sinad") -> pl.Expr:
499 """
500 Calculate the effective number of bits of the SINAD column
502 :param col: The column to calculate the ENOB from. Defaults to "sinad".
503 :type col: str, optional
504 :return: The effective number of bits of the SINAD column
505 :rtype: pl.Expr
506 """
507 return (pl.col(col) - 1.76) / 6.02
510def expr_snr(col: str = "fft") -> pl.Expr:
511 """
512 Calculate the signal-to-noise ratio of the FFT column
514 :param col: The column to calculate the SNR from. Defaults to "fft".
515 :type col: str, optional
516 :return: The signal-to-noise ratio of the FFT column
517 :rtype: pl.Expr
518 """
519 return (
520 -10 * pl.col(col).list.sort().list.eval(pl.element().slice(0, pl.element().len() - 3) ** 2).list.sum().log10()
521 )
524def expr_sfdr(col: str = "fft") -> pl.Expr:
525 """
526 Calculate the spurious-free dynamic range of the FFT column
528 :param col: The column to calculate the SFDR from. Defaults to "fft".
529 :type col: str, optional
530 :return: The spurious-free dynamic range of the FFT column
531 :rtype: pl.Expr
532 """
533 return -20 * pl.col(col).list.set_difference(pl.col(col).list.max()).list.max().log10()