Coverage for polars_analysis / analysis / pedestal_analysis.py: 82%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2from typing import List, Literal, Optional, Union 

3 

4import numpy as np 

5import polars as pl 

6import scipy.signal as sps # type: ignore 

7from scipy.stats import gamma 

8 

9from polars_analysis.analysis import constants 

10from polars_analysis.plotting.helper import calc_gaussian, gauss 

11 

12# Instantiate logger 

13log = logging.getLogger(__name__) 

14 

15""" 

16Functions to calculate derived values for pedestal runs. 

17""" 

18 

19 

20def next_power_of_2(x: int) -> int: 

21 """ 

22 Utility function for determining if we should calculate coherent noise. 

23 Calculate the nearest larger power of 2. 

24 e.g. 3->4, 4->4, 5->8, ..., 100->128, etc. 

25 """ 

26 return 2 ** (x - 1).bit_length() 

27 

28 

29def calc_coherent_noise( 

30 df: pl.DataFrame, 

31 min_channel: int, 

32 n_channels: int, 

33 run_number: int, 

34 board_id: str, 

35 measurement: int, 

36 pas_mode: int, 

37 gain: Literal["hi", "lo"], 

38 skip_channels: Optional[List[int]] = None, 

39 col: str = "samples", 

40) -> pl.DataFrame: 

41 """ 

42 Calculate the coherent noise for a given measurement, gain, and channel range. 

43 

44 Refer to Section 5 of this paper for a description of coherent noise: 

45 https://cds.cern.ch/record/683745/files/tilecal-98-168.pdf 

46 In short, the total noise in n channels can be broken down as 

47 

48 .. math:: σ_n^2 = n * σ_{rnd}^2 + n^2 * σ_{coh}^2 

49 

50 where σ_n is the total noise, σ_rnd is the random per channel noise, 

51 and sigma_coh is the coherent noise. 

52 In terms of the output DataFrame column names: 

53 

54 * σ_n => tot_noise 

55 * σ_rnd => avg_noise 

56 * σ_coh => coh_noise 

57 

58 :param df: The DataFrame to calculate the coherent noise from. 

59 :type df: pl.DataFrame 

60 :param min_channel: The minimum channel to calculate the coherent noise from. 

61 :type min_channel: int 

62 :param n_channels: The number of channels to calculate the coherent noise from. 

63 :type n_channels: int 

64 :param run_number: The run number to calculate the coherent noise from. 

65 :type run_number: int 

66 :param board_id: The board ID. 

67 :type board_id: str 

68 :param measurement: The measurement number to calculate the coherent noise from. 

69 :type measurement: int 

70 :param gain: The gain to calculate the coherent noise from. 

71 :type gain: Literal["hi", "lo"] 

72 :param col: The column to calculate the coherent noise from. Defaults to "samples". 

73 :type col: str, optional 

74 :return: A DataFrame with one row containing the coherent noise results. It has the columns: 

75 

76 * ch_noise: The square root of the sum of squares of the channel noise 

77 * d_ch_noise: ch_noise but each error is divided by the number of samples 

78 * avg_noise: ch_noise divided by the square root of the number of channels 

79 * d_avg: d_ch_noise divided by the square root of the number of channels 

80 * data_sum: The sum of the baseline subtracted data, per sample rather than per channel 

81 * tot_noise: The standard deviation of data_sum 

82 * coh_noise: The coherent noise 

83 * pct_coh: The coherent noise expressed as a percentage of the average noise 

84 * d_coh: The coherent noise error 

85 * d_pct: The percent coherent noise error 

86 * gain: The measurement gain 

87 * min_channel: The minimum channel used for the calculation 

88 * n_channels: The number of channels used for the calculation 

89 

90 :rtype: pl.DataFrame 

91 """ 

92 filtered_df = df.filter( 

93 pl.col("run_number") == run_number, 

94 pl.col("measurement") == measurement, 

95 pl.col("gain") == gain, 

96 pl.col("channel").is_in(range(min_channel, min_channel + n_channels)), 

97 pl.col("samples").list.len() != 0, 

98 ) 

99 

100 # If we could do the same calculation with a smaller n_channels, return early 

101 # We do every other power of 2 (besides 64 -> 128) 

102 if next_power_of_2(filtered_df["channel"].unique().shape[0]) < n_channels // 2: 

103 log.debug("Returning early from coh_noise") 

104 return pl.DataFrame() 

105 

106 if min_channel == 0 and n_channels % 128 == 0: 

107 if skip_channels is None: 

108 skip_channels = [] 

109 present_channels = filtered_df["channel"].to_list() 

110 for i in skip_channels: 

111 if i not in present_channels: 

112 log.warning(f"Error, channel {i} already not present in df") 

113 

114 if skip_channels: 

115 filtered_df = filtered_df.filter(~pl.col("channel").is_in(skip_channels)) 

116 

117 if filtered_df.is_empty(): 

118 return pl.DataFrame() 

119 

120 n_samples = filtered_df.select(pl.col(col).list.len().min()).item() 

121 filtered_channels = filtered_df["channel"].unique().to_list() 

122 n_filtered_channels = len(filtered_channels) 

123 

124 data_sum: np.ndarray = ( 

125 filtered_df.select( 

126 pl.col(col).list.eval(pl.element() - pl.element().mean()).list.head(n_samples).list.to_array(n_samples) 

127 ) 

128 .to_series() 

129 .to_numpy() 

130 .sum(axis=0) 

131 ) 

132 bin_width = 2 * max(1, round((max(data_sum) - min(data_sum)) / 100)) 

133 data_sum_bins = np.arange(min(data_sum), max(data_sum) + bin_width / 2, bin_width) 

134 data_sum_hist, _ = np.histogram(data_sum, bins=data_sum_bins) 

135 

136 results_df = ( 

137 filtered_df.lazy() 

138 .select( 

139 ch_noise=pl.col(col).list.std().pow(2).sum().sqrt(), 

140 d_ch_noise=pl.col(col).list.eval(pl.element().std().pow(2) / pl.element().len()).list.first().sum().sqrt(), 

141 avg_noise=pl.col(col).list.std().pow(2).sum().sqrt() / np.sqrt(n_filtered_channels), 

142 d_avg=(pl.col(col).list.eval(pl.element().std().pow(2) / pl.element().len()).list.first().sum().sqrt()) 

143 / np.sqrt(n_filtered_channels), 

144 ) 

145 .with_columns( 

146 data_sum=pl.Series(name="data_sum", values=data_sum).implode(), 

147 data_sum_hist=pl.Series(name="data_sum_hist", values=data_sum_hist).implode(), 

148 data_sum_bins=pl.Series(name="data_sum_bins", values=data_sum_bins).implode(), 

149 ) 

150 .with_columns(tot_noise=pl.col("data_sum").list.std()) 

151 .with_columns(coh_noise=(pl.col("tot_noise") ** 2 - pl.col("ch_noise") ** 2).sqrt() / n_filtered_channels) 

152 .with_columns( 

153 pct_coh=100 * pl.col("coh_noise") / pl.col("avg_noise"), 

154 d_coh=( 

155 (pl.col("tot_noise") ** 2 / (np.sqrt(n_samples) * pl.col("coh_noise") * n_filtered_channels**2)) ** 2 

156 + (pl.col("d_ch_noise") * pl.col("ch_noise") / (pl.col("coh_noise") * n_filtered_channels**2)) ** 2 

157 ).sqrt(), 

158 ) 

159 .with_columns( 

160 d_pct=pl.col("pct_coh") 

161 * ((pl.col("d_coh") / pl.col("coh_noise")) ** 2 + (pl.col("d_avg") / pl.col("avg_noise")) ** 2).sqrt() 

162 ) 

163 .with_columns( 

164 pl.lit(run_number).alias("run_number"), 

165 pl.lit(board_id).alias("board_id"), 

166 pl.lit(pas_mode).alias("pas_mode"), 

167 pl.lit(measurement).alias("measurement"), 

168 pl.lit(gain).alias("gain"), 

169 min_channel=min_channel, # this is a choice that only controls the plot file name 

170 n_channels=n_channels, # this is a choice that only controls the plot file name 

171 channel_list=filtered_channels, 

172 ) 

173 .select(pl.exclude("data_sum")) 

174 ) 

175 

176 return results_df.collect() 

177 

178 

179def calc_correlation_matrix( 

180 df: pl.DataFrame, 

181 measurements: List[int], 

182 gain: Literal["hi", "lo"], 

183 col: str = "samples", 

184 multiple_boards: Optional[List[str]] = None, 

185) -> np.ndarray: 

186 """ 

187 Calculate the correlation matrix for a given measurement, gain, and channel range. 

188 

189 :param df: The DataFrame to calculate the correlation matrix from 

190 :type df: pl.DataFrame 

191 :param measurement: The measurement number to calculate the correlation matrix from 

192 :type measurement: int 

193 :param gain: The gain to calculate the correlation matrix from 

194 :type gain: Literal["hi", "lo"] 

195 :param col: The column to calculate the correlation matrix from. Defaults to "samples". 

196 :type col: str, optional 

197 :param multiple_boards: List of board IDs only for multiple board runs 

198 :type multiple_boards: Optional[List[str]], optional 

199 :return: A 128x128 correlation matrix, filled for input channels, and padded with zeros. 

200 :rtype: npt.NDArray[np.float64] 

201 """ 

202 log.debug("Computing correlation matrix") 

203 

204 # Make 128 channel matrix of zeros here and update with output below. 

205 nchan = 128 * len(multiple_boards) if multiple_boards is not None else 128 

206 tmp_matrix = np.zeros([nchan, nchan]) 

207 

208 for measurement in measurements: 

209 measurement_df = df.filter( 

210 pl.col("measurement") == measurement, 

211 pl.col("gain") == gain, 

212 pl.col("samples").list.len() != 0, 

213 ) 

214 

215 width = df.select(pl.col(col).list.len().min()).item() 

216 matrix = ( 

217 measurement_df.select(pl.col(col).list.slice(0, width).list.to_array(width).arr.to_struct()) 

218 .unnest(col) 

219 .transpose() 

220 .corr() 

221 .to_numpy() 

222 ) 

223 

224 # An array of the actual channels available in the dataframe 

225 channel_mask = np.unique(measurement_df.select(pl.col("channel")).transpose().to_numpy().flatten()) 

226 

227 # Update return matrix 

228 tmp_matrix[np.ix_(channel_mask, channel_mask)] = matrix 

229 

230 return tmp_matrix 

231 

232 

233def pipe_psd(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame: 

234 """ 

235 Calculate the PSD of the samples column as well as the frequency axis for the PSD 

236 

237 :param df: The DataFrame to calculate the PSD from 

238 :type df: pl.DataFrame 

239 :param col: The column to calculate the PSD from. Defaults to "samples". 

240 :type col: str, optional 

241 :return: A DataFrame with the psd and freq columns added 

242 :rtype: pl.DataFrame 

243 """ 

244 log.debug("Calculating PSD") 

245 width = df.select(pl.col(col).list.len().min()).item() 

246 samples = df.select(pl.col(col).list.slice(0, width).list.to_array(width)).to_series().to_numpy() 

247 freq, psd = sps.welch(samples, fs=constants.FLX_FRQ_40MHZ, nperseg=2**10, axis=1, average="mean") 

248 

249 # Remove DC component and last bin, which often shows peak or dip which is likely an artifact 

250 psd = psd[:, 1:-1] 

251 freq = freq[1:-1] 

252 

253 freq = np.tile(freq, (psd.shape[0], 1)) 

254 

255 ## Find peaks in distribution 

256 

257 # Convert to dBFS 

258 psd_dbfs = 10 * np.log10(np.array(psd) / (2**constants.ADC_BITS) ** 2) 

259 

260 # Peak detection strategy for dB scale: 

261 # 1. Calculate rolling median to establish local baseline 

262 # 2. Find peaks relative to local baseline 

263 

264 # Use convolution for rolling median estimation 

265 window_size = 7 # Should be odd; adjust based on your needs 

266 window: np.ndarray = np.ones(window_size) / window_size 

267 

268 baseline = np.apply_along_axis(np.convolve, 1, psd_dbfs, window, mode="same") 

269 

270 # At edges, use the first/last valid baseline value 

271 baseline[:, : window_size // 2] = np.repeat(baseline[:, window_size // 2, None], window_size // 2, axis=1) 

272 baseline[:, -window_size // 2 :] = np.repeat( 

273 baseline[:, -window_size // 2 - 1, None], np.abs(-window_size // 2), axis=1 

274 ) 

275 

276 # Calculate deviation from local baseline 

277 deviation = psd_dbfs - baseline 

278 

279 # Define peak parameters 

280 min_peak_height = 0.5 # Minimum dB above local baseline 

281 min_peak_distance = int(constants.FFT_SIZE / 100) # Minimum distance between peaks 

282 

283 # numpy can't handle jagged return array 

284 # Find peaks relative to baseline 

285 peaks = [] 

286 peak_heights = [] 

287 for row in deviation: 

288 found_peaks, pkh_dict = sps.find_peaks(row, height=min_peak_height, distance=min_peak_distance) 

289 peaks.append(found_peaks) 

290 

291 # Peak height in PSD 

292 # peak_heights.append([psd_dbfs[i][p] for p in found_peaks]) 

293 # Maybe we want the height above baseline 

294 # "peak_heights" is in pkh_dict as long as height kwarg is passed 

295 peak_heights.append(pkh_dict["peak_heights"]) # type: ignore 

296 

297 return df.with_columns( 

298 pl.Series(name="psd", values=psd, dtype=pl.List(pl.Float64)), 

299 pl.Series(name="freq", values=freq, dtype=pl.List(pl.Float64)), 

300 pl.Series(name="peaks", values=peaks, dtype=pl.List(pl.Float64)), 

301 pl.Series(name="peak_heights", values=peak_heights, dtype=pl.List(pl.Float64)), 

302 ) 

303 

304 

305def calc_coherence(c1: int, c2: int, df: pl.DataFrame) -> Union[tuple, None]: 

306 # Check if channels are in DF 

307 if c1 not in df["channel"]: 

308 log.warning(f"Channel {c1} not found in dataframe") 

309 return None 

310 if c2 not in df["channel"]: 

311 log.warning(f"Channel {c2} not found in dataframe") 

312 return None 

313 

314 f, Cxy = sps.coherence( 

315 df.filter(pl.col("channel") == c1).select(pl.col("samples")).to_series()[0].to_numpy(), 

316 df.filter(pl.col("channel") == c2).select(pl.col("samples")).to_series()[0].to_numpy(), 

317 fs=constants.FLX_FRQ_40MHZ, 

318 nperseg=2**10, 

319 ) 

320 

321 return (f, Cxy) 

322 

323 

324def pipe_fft(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame: 

325 """ 

326 Calculate the FFT of the samples column as well as the frequency axis for the FFT 

327 

328 :param df: The DataFrame to calculate the FFT from 

329 :type df: pl.DataFrame 

330 :param col: The column to calculate the FFT from. Defaults to "samples". 

331 :type col: str, optional 

332 :return: A DataFrame with the fft and freq columns added 

333 :rtype: pl.DataFrame 

334 """ 

335 log.debug("Calculating FFT") 

336 width = df.select(pl.col(col).list.len().min()).item() 

337 fourier = np.fft.fft( 

338 df.select(pl.col(col).list.slice(0, width).list.to_array(width)).to_series().to_numpy(), axis=1 

339 ) 

340 fourier = np.abs(fourier)[:, 1:] 

341 

342 freq = np.fft.fftfreq(fourier.shape[1], d=1 / constants.FLX_FRQ_40MHZ) 

343 freq = freq[: int(freq.shape[0] / 2)] 

344 freq = np.tile(freq, (fourier.shape[0], 1)) 

345 

346 fourier = fourier[:, : int(fourier.shape[1] / 2)] 

347 fourier = fourier / np.max(fourier, axis=1)[:, None] 

348 

349 return df.with_columns( 

350 pl.Series(name="fft", values=fourier, dtype=pl.List(pl.Float64)), 

351 pl.Series(name="freq", values=freq, dtype=pl.List(pl.Float64)), 

352 ) 

353 

354 

355def pipe_chi2(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame: 

356 """ 

357 Calculate the chi² per degree of freedom for each entry in the samples column, 

358 with all logic in-line. 

359 

360 Steps: 

361 1. Convert the samples to a numpy array. 

362 2. Define histogram bins with a fixed bin width of 1. 

363 3. If there aren't enough bins, assign NaN. 

364 4. Compute the histogram and bin centers. 

365 5. Fit a Gaussian to the histogram using calc_gaussian and gauss. 

366 6. Calculate degrees of freedom (number of bins minus 3). 

367 7. Compute asymmetric Poisson errors using the gamma distribution. 

368 8. Calculate the chi² per degree of freedom. 

369 

370 :param df: The input DataFrame. 

371 :param col: The column containing the sample data (default "samples"). 

372 :return: A DataFrame with an added "chi2_dof" column. 

373 """ 

374 log.debug("Calculating chi²/dof for the samples column") 

375 chi2_values = [] 

376 

377 for samples in df[col]: 

378 s = np.asarray(samples) 

379 

380 bins = np.arange(s.min(), s.max() + 1, 1) 

381 if bins.size <= 1: 

382 chi2_values.append(np.nan) 

383 continue 

384 

385 hist, _ = np.histogram(s, bins=bins) 

386 centers = 0.5 * (bins[1:] + bins[:-1]) 

387 

388 fp = calc_gaussian(s, bins) 

389 gauss_fit = gauss(centers, mu=fp[0], sigma=fp[2], N=fp[4]) 

390 dof = centers.size - 3 

391 

392 # Compute asymmetric Poisson errors using the gamma distribution 

393 a = 0.32 # approximately 1 sigma 

394 err_up = gamma.ppf(1 - a / 2, gauss_fit + 1, scale=1) - gauss_fit 

395 err_dw = gauss_fit - gamma.ppf(a / 2, gauss_fit, scale=1) 

396 residuals = hist - gauss_fit 

397 err = np.where(residuals > 0, err_up, err_dw) 

398 

399 chi2 = np.sum((residuals) ** 2 / (err**2)) / dof 

400 chi2_values.append(chi2) 

401 

402 return df.with_columns(pl.Series("chi2_dof", chi2_values)) 

403 

404 

405def pipe_autocorr(df: pl.DataFrame, col: str = "samples") -> pl.DataFrame: 

406 """ 

407 Calculate the autocorrelation of the samples column 

408 

409 :param df: The DataFrame to calculate the autocorrelation from 

410 :type df: pl.DataFrame 

411 :param col: The column to calculate the autocorrelation from. Defaults to "samples". 

412 :type col: str, optional 

413 :return: A DataFrame with the autocorr column added 

414 :rtype: pl.DataFrame 

415 """ 

416 log.debug("Calculating autocorrelation") 

417 width = df.select(pl.col(col).list.len().min()).item() 

418 data = ( 

419 df.select( 

420 pl.col(col) 

421 .list.eval(pl.element() - pl.element().mean()) 

422 .list.slice(0, width) 

423 .list.to_array(width) 

424 .arr.to_struct() 

425 ) 

426 .unnest(col) 

427 .fill_null(0) 

428 .to_numpy() 

429 ) 

430 correlate = np.vectorize(sps.correlate, signature="(n),(n)->(k)") 

431 result = correlate(data, data) 

432 result /= np.max(result, axis=1)[:, None] 

433 result = result[:, result.shape[1] // 2 :] 

434 

435 return df.with_columns(pl.Series(name="autocorr", values=result, dtype=pl.List(pl.Float64))) 

436 

437 

438def expr_mean(col: str = "samples") -> pl.Expr: 

439 """ 

440 Calculate the mean of the samples column 

441 

442 :param col: The column name to calculate the mean of 

443 :type col: str, optional 

444 :return: The mean of the samples column 

445 :rtype: pl.Expr 

446 """ 

447 return pl.col(col).list.mean() 

448 

449 

450def expr_rms(col: str = "samples") -> pl.Expr: 

451 """ 

452 Calculate the root mean square of the samples column 

453 

454 :param col: The column name to calculate the rms of 

455 :type col: str, optional 

456 :return: The rms of the samples column 

457 :rtype: pl.Expr 

458 """ 

459 return pl.col(col).list.std() 

460 

461 

462def expr_max_min(col: str = "samples") -> pl.Expr: 

463 """ 

464 Calculate the difference between the max and min of the samples columns 

465 

466 :param col: The column name to calculate the max-min of 

467 :type col: str, optional 

468 :return: The max-min of the samples column 

469 :rtype: pl.Expr 

470 """ 

471 return pl.col(col).list.max() - pl.col(col).list.min() 

472 

473 

474def expr_psd(col: str = "fft") -> pl.Expr: 

475 """ 

476 Calculate the power spectral density of the FFT column 

477 

478 :param col: The column to calculate the PSD from. Defaults to "fft". 

479 :type col: str, optional 

480 :return: The power spectral density of the FFT column 

481 :rtype: pl.Expr 

482 """ 

483 return pl.col(col).list.eval(20 * np.log10(pl.element())) 

484 

485 

486def expr_sinad(col: str = "fft") -> pl.Expr: 

487 """ 

488 Calculate the signal-to-noise and distortion ratio of the FFT column 

489 

490 :param col: The column to calculate the SINAD from. Defaults to "fft". 

491 :type col: str, optional 

492 :return: The signal-to-noise and distortion ratio of the FFT column 

493 :rtype: pl.Expr 

494 """ 

495 return -10 * pl.col(col).list.eval(pl.element().filter(pl.element() < 1) ** 2).list.sum().log10() 

496 

497 

498def expr_enob(col: str = "sinad") -> pl.Expr: 

499 """ 

500 Calculate the effective number of bits of the SINAD column 

501 

502 :param col: The column to calculate the ENOB from. Defaults to "sinad". 

503 :type col: str, optional 

504 :return: The effective number of bits of the SINAD column 

505 :rtype: pl.Expr 

506 """ 

507 return (pl.col(col) - 1.76) / 6.02 

508 

509 

510def expr_snr(col: str = "fft") -> pl.Expr: 

511 """ 

512 Calculate the signal-to-noise ratio of the FFT column 

513 

514 :param col: The column to calculate the SNR from. Defaults to "fft". 

515 :type col: str, optional 

516 :return: The signal-to-noise ratio of the FFT column 

517 :rtype: pl.Expr 

518 """ 

519 return ( 

520 -10 * pl.col(col).list.sort().list.eval(pl.element().slice(0, pl.element().len() - 3) ** 2).list.sum().log10() 

521 ) 

522 

523 

524def expr_sfdr(col: str = "fft") -> pl.Expr: 

525 """ 

526 Calculate the spurious-free dynamic range of the FFT column 

527 

528 :param col: The column to calculate the SFDR from. Defaults to "fft". 

529 :type col: str, optional 

530 :return: The spurious-free dynamic range of the FFT column 

531 :rtype: pl.Expr 

532 """ 

533 return -20 * pl.col(col).list.set_difference(pl.col(col).list.max()).list.max().log10()