Coverage for polars_analysis / plotting / pedestal_plotting.py: 84%

432 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2import pathlib 

3from itertools import product 

4from pathlib import Path 

5from typing import Any, Dict, List, Literal, Optional, Set, Union 

6 

7import matplotlib 

8from typing_extensions import deprecated 

9 

10matplotlib.use("agg") 

11import matplotlib.colors as mcolors 

12import matplotlib.pyplot as plt 

13import matplotlib.ticker as ticker 

14import numpy as np 

15import polars as pl 

16import scipy 

17from diptest import diptest # type: ignore 

18from scipy.stats import gamma 

19 

20from polars_analysis.analysis import constants 

21from polars_analysis.analysis.cut_thresholds import CutThresholds 

22from polars_analysis.plotting import helper 

23from polars_analysis.plotting.helper import Metadata 

24 

25# Instantiate logger 

26log = logging.getLogger(__name__) 

27 

28""" 

29Functions for plotting pedestal run data, which has already been processed. 

30""" 

31 

32 

33def plot_raw( 

34 info: Metadata, 

35 samples: np.ndarray, 

36 plot_dir: pathlib.Path, 

37) -> None: 

38 """ 

39 Plot the raw pedestal samples for a given channel and gain. 

40 

41 :param run_number: The run number. 

42 :type run_number: int 

43 :param channel: The channel number. 

44 :type channel: int 

45 :param gain: The gain setting. 

46 :type gain: Literal["hi", "lo"] 

47 :param samples: The raw pedestal samples. 

48 :type samples: np.ndarray 

49 :param plot_dir: The directory to save the plot. 

50 :type plot_dir: pathlib.Path 

51 """ 

52 plt.figure(figsize=(10, 10)) 

53 plt.plot( 

54 samples, 

55 "k.", 

56 label=rf"""Samples 

57$\mu$ = {np.mean(samples):.02f}, $\sigma$ = {np.std(samples):.02f}""", 

58 ) 

59 plt.legend(loc=1) 

60 plt.title( 

61 helper.plot_summary_string( 

62 name="Pedestal", 

63 board_id=info.board_id, 

64 info=info, 

65 ) 

66 ) 

67 plt.xlabel("Sample #") 

68 plt.ylabel("ADC Counts") 

69 plt.grid() 

70 plt.tight_layout() 

71 if info.channels is None: 

72 info.channels = "999" 

73 plt.savefig(Path(plot_dir) / f"rawPed_meas0_board_{info.board_id}_channel{info.channels.zfill(3)}_{info.gain}.png") 

74 plt.cla() 

75 plt.clf() 

76 plt.close() 

77 

78 

79@deprecated("outlier test is no longer used") 

80def outlier_test( 

81 run_number: int, 

82 channel: int, 

83 gain: Literal["hi", "lo"], 

84 samples: np.ndarray, 

85 plot_dir: pathlib.Path, 

86 board_id: Optional[str] = None, 

87 attenuation: Optional[str] = None, 

88 pas_mode: Optional[Any] = None, 

89 info: Optional[Metadata] = None, 

90) -> None: # pragma: no cover 

91 """ 

92 Perform an outlier test by creating a histogram of the deviation from mean, 

93 divided by standard deviation. 

94 

95 :param run_number: The run number. 

96 :type run_number: int 

97 :param channel: The channel number. 

98 :type channel: int 

99 :param gain: The gain setting. 

100 :type gain: Literal["hi", "lo"] 

101 :param samples: The raw pedestal samples. 

102 :type samples: np.ndarray 

103 :param plot_dir: The directory to save the plot. 

104 :type plot_dir: pathlib.Path 

105 """ 

106 mean = np.mean(samples) 

107 std = np.std(samples) 

108 deviations = (samples - mean) / std # Calculate the deviation from the mean in units of standard deviation 

109 

110 plt.figure(figsize=(10, 10)) 

111 plt.hist(deviations, bins=100, log=True, color="blue", alpha=0.7) # Logarithmic y-axis 

112 plt.title( 

113 helper.plot_summary_string( 

114 board_id=board_id, 

115 run_numbers=run_number, 

116 channels=helper.list_to_text_ranges(channel), 

117 gain=gain, 

118 name="Outlier Test", 

119 attenuation=attenuation, 

120 pas_mode=pas_mode, 

121 info=info, 

122 ) 

123 ) 

124 plt.xlabel("Deviation from Mean (σ)") 

125 plt.ylabel("log(N samples)") 

126 plt.grid(True) 

127 plt.tight_layout() 

128 plt.savefig(Path(plot_dir) / f"outlier_test_channel{channel:03}_{gain}.png") 

129 plt.cla() 

130 plt.clf() 

131 plt.close() 

132 

133 

134def plot_hist( 

135 info: Metadata, 

136 samples: np.ndarray, 

137 plot_dir: pathlib.Path, 

138) -> None: 

139 """ 

140 Plot the histogram of the pedestal samples for a given channel and gain. 

141 

142 :param run_number: The run number. 

143 :type run_number: int 

144 :param channel: The channel number. 

145 :type channel: int 

146 :param gain: The gain setting. 

147 :type gain: Literal["hi", "lo"] 

148 :param samples: The pedestal samples. 

149 :type samples: np.ndarray 

150 :param plot_dir: The directory to save the plot. 

151 :type plot_dir: pathlib.Path 

152 """ 

153 min_samples = min(samples) 

154 max_samples = max(samples) 

155 stdev = np.std(samples) 

156 skew = scipy.stats.skew(samples) 

157 

158 _, (ax1, ax2) = plt.subplots(2, gridspec_kw={"hspace": 0, "height_ratios": [3, 1]}, sharex=True) 

159 ax1.label_outer() 

160 ax2.label_outer() 

161 

162 bin_width = 1 

163 bins = np.arange(min_samples, max_samples + bin_width, bin_width) 

164 if len(bins) > 1: 

165 bin_content, _, h = ax1.hist(samples, bins=bins, align="mid", edgecolor="black") 

166 _, dip_pval = diptest(bin_content) # type: ignore 

167 h[0].set_label(f"RMS={stdev:.01f}, γ={skew:.02f}, dip={dip_pval:.02f}") 

168 else: 

169 bin_content, _, h = ax1.hist(samples, align="mid", edgecolor="black", label=f"RMS={stdev:.01f}") 

170 

171 fit_pars = helper.calc_gaussian(samples, bins) 

172 bin_centers = 0.5 * (bins[1:] + bins[:-1]) 

173 gauss_fit = helper.gauss(bin_centers, mu=fit_pars[0], sigma=fit_pars[2], N=fit_pars[4]) 

174 

175 ax1.plot( 

176 bin_centers, 

177 gauss_fit, 

178 color="r", 

179 label=rf"""$\mu$ = {fit_pars[0]:.03f} $\pm$ {fit_pars[1]:.03f}, 

180$\sigma$ = {fit_pars[2]:.03f} $\pm$ {fit_pars[3]:.03f}""", 

181 ) 

182 

183 ax1.legend(loc="upper right") 

184 ax1.set_xlabel("ADC Counts") 

185 left, right = ax1.get_xlim() 

186 ax1.set_xlim(left, right + (left - right) * -0.1) 

187 bottom, top = ax1.get_ylim() 

188 ax1.set_ylim(bottom, top + (bottom - top) * -0.1) 

189 ax1.set_ylabel("Entries") 

190 ax1.set_title( 

191 helper.plot_summary_string( 

192 name="Pedestal", 

193 board_id=info.board_id, 

194 info=info, 

195 ) 

196 ) 

197 ax1.grid(True) 

198 

199 dof = len(bin_centers) - 3 

200 a = 0.32 # 1 sigma 

201 err_up = gamma.ppf(1 - a / 2, gauss_fit + 1, scale=1) - gauss_fit # approx up side of Poisson interval 

202 err_dw = gauss_fit - gamma.ppf(a / 2, gauss_fit, scale=1) # approx down side of Poisson interval 

203 residuals = bin_content - gauss_fit 

204 err = np.where(residuals > 0, err_up, err_dw) 

205 residual_sigma = residuals / err 

206 chi2_dof = np.sum((bin_content - gauss_fit) ** 2 / err**2) / dof 

207 

208 ax2.plot(bin_centers, residual_sigma, color="r", label="Fit residual σ", marker=".", linestyle="None") 

209 ax2.text(0.8, 0.8, f"Χ²/dof={chi2_dof:.02f}", transform=ax2.transAxes) 

210 ax2.set_ylabel("σ") 

211 ax2.set_xlabel("ADC Counts") 

212 ax2.grid(True) 

213 

214 if info.channels is None: 

215 info.channels = "999" 

216 plt.savefig(Path(plot_dir) / f"channel{info.channels.zfill(3)}_{info.gain}_board_{info.board_id}_pedestal_hist.png") 

217 plt.cla() 

218 plt.clf() 

219 plt.close() 

220 

221 

222def plot_autocorrelation( 

223 run_number: int, 

224 channel: int, 

225 gain: Literal["hi", "lo"], 

226 autocorrelation: np.ndarray, 

227 plot_dir: pathlib.Path, 

228 board_id: Optional[str] = None, 

229 attenuation: Optional[str] = None, 

230 pas_mode: Optional[Any] = None, 

231 info: Optional[Metadata] = None, 

232) -> None: 

233 """ 

234 Plot the autocorrelation of the pedestal samples for a given channel and gain. 

235 

236 :param run_number: The run number. 

237 :type run_number: int 

238 :param channel: The channel number. 

239 :type channel: int 

240 :param gain: The gain setting. 

241 :type gain: Literal["hi", "lo"] 

242 :param autocorrelation: The autocorrelation of the pedestal samples. 

243 :type autocorrelation: np.ndarray 

244 :param plot_dir: The directory to save the plot. 

245 :type plot_dir: pathlib.Path 

246 """ 

247 plt.plot(autocorrelation[0:50], "k.-", label="Autocorrelation") 

248 plt.axhline(y=0, color="r", ls="--") 

249 plt.legend(loc="upper right") 

250 plt.title( 

251 helper.plot_summary_string( 

252 board_id=board_id, 

253 run_numbers=run_number, 

254 channels=helper.list_to_text_ranges(channel), 

255 gain=gain, 

256 name="Pedestal", 

257 attenuation=attenuation, 

258 pas_mode=pas_mode, 

259 info=info, 

260 ) 

261 ) 

262 plt.xlabel("Lag") 

263 plt.ylabel("Autocorrelation") 

264 plt.grid() 

265 plt.tight_layout() 

266 plt.savefig(Path(plot_dir) / f"autocorr_board_{board_id}_channel{channel:03}_{gain}.png") 

267 plt.cla() 

268 plt.clf() 

269 plt.close() 

270 

271 

272def plot_baseline_means_rms( 

273 derived_df: pl.DataFrame, 

274 plot_dir: pathlib.Path, 

275 skip_channels_hi: Optional[List[int]] = None, 

276 skip_channels_lo: Optional[List[int]] = None, 

277 draw_bounds: Optional[bool] = False, 

278 run_number: Optional[int] = None, 

279 board_id: Optional[str] = None, 

280 attenuation: Optional[str] = None, 

281 pas_mode: Optional[Any] = None, 

282 info: Optional[Metadata] = None, 

283) -> None: 

284 """ 

285 Plot the mean, RMS, and maxmin of the pedestal values for a given measurement and channels for high and low gain. 

286 

287 :param derived_df: DataFrame containing mean, std, and maxmin columns for hi and lo gains 

288 :type derived_df: pl.DataFrame 

289 :param plot_dir: The directory to save the plot. 

290 :type plot_dir: pathlib.Path 

291 """ 

292 n_channels = derived_df["channel"].n_unique() 

293 

294 color_dict = {"lo": "b", "hi": "r"} 

295 title_dict = {"lo": "LG", "hi": "HG"} 

296 

297 fig, ax = plt.subplots() 

298 plt.xticks(np.arange(0, n_channels, 4), rotation=70) 

299 ax.xaxis.set_tick_params(pad=0.1) 

300 fig2, ax2 = plt.subplots(1) 

301 plt.xticks(np.arange(0, n_channels, 4), rotation=70) 

302 ax2.xaxis.set_tick_params(pad=0.1) 

303 fig3, ax3 = plt.subplots(1) 

304 plt.xticks(np.arange(0, n_channels, 4), rotation=70) 

305 ax3.xaxis.set_tick_params(pad=0.1) 

306 max_means = 0 

307 max_rms = 0 

308 max_maxmins = 0 

309 gains: List[Literal["hi", "lo"]] = ["hi", "lo"] 

310 for gain in gains: 

311 gain_df = derived_df.filter(pl.col("gain") == gain) 

312 gain_df = gain_df.sort(by="measurement").unique("channel", keep="first").sort(by="channel") 

313 if gain == "hi" and skip_channels_hi is not None: 

314 gain_df = gain_df.filter(~pl.col("channel").is_in(skip_channels_hi)) 

315 elif gain == "lo" and skip_channels_lo is not None: 

316 gain_df = gain_df.filter(~pl.col("channel").is_in(skip_channels_lo)) 

317 means = gain_df["mean"].to_numpy() 

318 stds = gain_df["std"].to_numpy() 

319 maxmins = gain_df["maxmin"].to_numpy() 

320 channels = gain_df["channel"].to_numpy() 

321 color = color_dict[gain] 

322 title = title_dict[gain] 

323 

324 ax.grid(visible=True, zorder=0) 

325 mean = np.mean(means) 

326 std = np.std(means) / np.sqrt(len(means)) 

327 ax.bar(channels, means, fill=False, ec=color, label=f"{title} mean = {mean:.2f}±{std:.2f}", zorder=3) 

328 max_means = max(max_means, max(means)) 

329 ax.margins(x=0) 

330 

331 ax2.grid(visible=True, zorder=0) 

332 mean = np.mean(stds) 

333 std = np.std(stds) / np.sqrt(len(stds)) 

334 ax2.bar(channels, stds, fill=False, ec=color, label=f"{title} mean = {mean:.2f}±{std:.2f}", zorder=3) 

335 max_rms = max(max_rms, max(stds)) 

336 ax2.margins(x=0) 

337 

338 ax3.grid(visible=True, zorder=0) 

339 mean = np.mean(maxmins) 

340 std = np.std(maxmins) / np.sqrt(len(maxmins)) 

341 ax3.bar(channels, maxmins, fill=False, ec=color, label=f"{title} mean = {mean:.2f}±{std:.2f}", zorder=3) 

342 max_maxmins = max(max_maxmins, max(maxmins)) 

343 ax3.margins(x=0) 

344 # Draw acceptance cuts 

345 if draw_bounds: 

346 cuts = CutThresholds() 

347 cuts.draw_on(ax, "mean", gain=gain) 

348 cuts.draw_on(ax2, "std", gain=gain) 

349 cuts.draw_on(ax3, "maxmin", gain=gain) 

350 

351 all_channels = derived_df["channel"].unique().sort().to_list() 

352 ax.set_title( 

353 helper.plot_summary_string( 

354 board_id=board_id, 

355 run_numbers=run_number, 

356 channels=helper.list_to_text_ranges(all_channels), 

357 name="Mean Pedestal Value", 

358 attenuation=attenuation, 

359 pas_mode=pas_mode, 

360 info=info, 

361 ) 

362 ) 

363 ax.set_xlabel("Channel", loc="right") 

364 ax.set_ylabel("ADC Counts") 

365 ax.set_ylim(0, 1.33 * max_means) 

366 ax.legend() 

367 fig.tight_layout() 

368 fig.savefig(f"{plot_dir}/mu_board_{board_id}_summary.png") 

369 fig.clf() 

370 

371 ax2.set_title( 

372 helper.plot_summary_string( 

373 board_id=board_id, 

374 run_numbers=run_number, 

375 channels=helper.list_to_text_ranges(all_channels), 

376 name="Pedestal RMS", 

377 attenuation=attenuation, 

378 pas_mode=pas_mode, 

379 info=info, 

380 ) 

381 ) 

382 ax2.set_xlabel("Channel", loc="right") 

383 ax2.set_ylabel("ADC Counts") 

384 ax2.set_ylim(0, 1.33 * max_rms) 

385 ax2.legend() 

386 fig2.tight_layout() 

387 fig2.savefig(f"{plot_dir}/rms_board_{board_id}_summary.png") 

388 fig2.clf() 

389 

390 ax3.set_title( 

391 helper.plot_summary_string( 

392 board_id=board_id, 

393 run_numbers=run_number, 

394 channels=helper.list_to_text_ranges(all_channels), 

395 name="Pedestal Max-Min", 

396 attenuation=attenuation, 

397 pas_mode=pas_mode, 

398 info=info, 

399 ) 

400 ) 

401 ax3.set_xlabel("Channel", loc="right") 

402 ax3.set_ylabel("ADC Counts") 

403 ax3.set_ylim(0, 1.33 * max_maxmins) 

404 ax3.legend() 

405 fig3.tight_layout() 

406 fig3.savefig(f"{plot_dir}/maxmin_board_{board_id}_summary.png") 

407 fig3.clf() 

408 

409 plt.cla() 

410 plt.clf() 

411 plt.close() 

412 

413 

414def plot_coherent_noise( 

415 row: Dict[str, Any], 

416 plot_dir: pathlib.Path, 

417 meas_type: str = "pedestal", 

418 board_id: Optional[str] = None, 

419 attenuation: Optional[str] = None, 

420 pas_mode: Optional[Any] = None, 

421 use_log_scale: bool = False, 

422) -> Union[tuple[List[str], Any], tuple[None, None]]: 

423 """ 

424 Plot the coherent noise for a given measurement type and gain, given a DataFrame of coherent noise results. 

425 

426 :param row: A dictionary from pl.DataFrame.iter_rows(named=True) containing the coherent noise results. 

427 Must contain the following columns: 

428 

429 * min_channel: The minimum channel number. 

430 * n_channels: The number of channels. 

431 * gain: The gain setting. 

432 * data_sum_hist: Histogram of the per samples sum of the pedestal values across channels. 

433 * data_sum_bins: Bin edges for data_sum_hist 

434 * tot_noise: The total noise. 

435 * ch_noise: The channel noise. 

436 * avg_noise: The average noise. 

437 * coh_noise: The coherent noise. 

438 * pct_coh: The percentage of coherent noise. 

439 * d_tot_noise: The total noise uncertainty. 

440 * d_ch_noise: The channel noise uncertainty. 

441 * d_avg: The average noise uncertainty. 

442 * d_coh: The coherent noise uncertainty. 

443 * d_pct: The percentage of coherent noise uncertainty. 

444 :type row: dict[str, Any] 

445 :param run_number: The run number. 

446 :type run_number: int 

447 :param plot_dir: The directory to save the plot. 

448 :type plot_dir: pathlib.Path 

449 :param meas_type: The measurement type, defaults to "pedestal". 

450 :type meas_type: str, optional 

451 """ 

452 run_number: int = row["run_number"] 

453 if not board_id: 

454 board_id = row["board_id"] if "board_id" in row else None 

455 min_channel: int = row["min_channel"] 

456 n_channels: int = row["n_channels"] 

457 gain: Literal["hi", "lo"] = row["gain"] 

458 data_sum_hist: np.ndarray = np.array(row["data_sum_hist"]) 

459 bins: np.ndarray = np.array(row["data_sum_bins"]) 

460 channel_list = row["channel_list"] 

461 

462 centers = 0.5 * (bins[1:] + bins[:-1]) 

463 mu = helper.hist_mean(centers, data_sum_hist) 

464 skew = helper.hist_moment(centers, data_sum_hist, 3) 

465 

466 _, (ax1, ax2) = plt.subplots(2, gridspec_kw={"hspace": 0, "height_ratios": [3, 1]}, sharex=True) 

467 ax1.label_outer() 

468 ax2.label_outer() 

469 

470 _, dip_pval = diptest(data_sum_hist) # type: ignore 

471 _ = ax1.bar( 

472 x=bins[:-1], 

473 height=data_sum_hist, 

474 width=bins[1:] - bins[:-1], 

475 align="edge", 

476 color="steelblue", 

477 fill=True, 

478 edgecolor="black", 

479 label=rf"""$\mu$ = {mu:.02f}, $\sigma$ = {row["tot_noise"]:.02f}, 

480γ={skew:.02f}, dip={dip_pval:.02f}""", 

481 ) 

482 

483 fit_pars = helper.calc_gaussian_from_bins(data_sum_hist, bins) 

484 fit_mu, fit_sigma, fit_N = fit_pars[0], fit_pars[2], fit_pars[4] 

485 e_fit_mu, e_fit_sigma = fit_pars[1], fit_pars[3] 

486 bin_centers = 0.5 * (bins[1:] + bins[:-1]) 

487 gauss_fit = helper.gauss(bin_centers, fit_mu, fit_sigma, fit_N) 

488 ax1.plot( 

489 bin_centers, 

490 gauss_fit, 

491 color="r", 

492 label=rf"""Gaussian Fit 

493$\mu$ = {fit_mu:.02f}±{e_fit_mu:0.2f}, $\sigma$ = {fit_sigma:.02f}±{e_fit_sigma:.02f}""", 

494 ) 

495 ax1.set_ylim(0, max(data_sum_hist) * 1.4) 

496 ax1.set_xlabel("ADC Counts") 

497 ax1.set_ylabel("Entries") 

498 ax1.set_title( 

499 helper.plot_summary_string( 

500 board_id=board_id, 

501 run_numbers=run_number, 

502 channels=helper.list_to_text_ranges(channel_list), 

503 gain=gain, 

504 name=f"{meas_type} coherent noise", 

505 attenuation=attenuation, 

506 pas_mode=pas_mode, 

507 ) 

508 ) 

509 ax1.grid() 

510 ax1.text( 

511 0.995, 

512 0.84, 

513 rf"""$\sqrt{{\sigma_i^2}}$ = {row["ch_noise"]:.02f} $\pm$ {row["d_ch_noise"]:.02f} 

514 Avg. noise/ch = {row["avg_noise"]:.02f} $\pm$ {row["d_avg"]:.02f} 

515 Coh. noise/ch = {row["coh_noise"]:.02f} $\pm$ {row["d_coh"]:.02f} 

516 [%] Coh. noise = {row["pct_coh"]:.02f} $\pm$ {row["d_pct"]:.02f}""", 

517 horizontalalignment="right", 

518 verticalalignment="center", 

519 transform=ax1.transAxes, 

520 ) 

521 log_filename: Literal["", "_log"] = "" 

522 if use_log_scale: 

523 ax1.set_yscale("log") 

524 ax1.set_ylim(bottom=0.5) 

525 log_filename = "_log" 

526 ax1.legend(loc="upper left", handlelength=1.0) 

527 

528 dof = len(bin_centers) - 3 

529 a = 0.32 # 1 sigma 

530 err_up = gamma.ppf(1 - a / 2, gauss_fit + 1, scale=1) - gauss_fit # approx up side of Poisson interval 

531 err_dw = gauss_fit - gamma.ppf(a / 2, gauss_fit, scale=1) # approx down side of Poisson interval 

532 residuals = data_sum_hist - gauss_fit 

533 err = np.where(residuals > 0, err_up, err_dw) 

534 residual_sigma = residuals / err 

535 chi2_dof = np.sum((data_sum_hist - gauss_fit) ** 2 / err**2) / dof 

536 

537 ax2.plot(bin_centers, residual_sigma, color="r", label="Fit residual σ", marker=".", linestyle="None") 

538 ax2.text(0.8, 0.8, f"Χ²/dof={chi2_dof:.02f}", transform=ax2.transAxes) 

539 ax2.set_ylabel("σ") 

540 ax2.set_xlabel("ADC Counts") 

541 ax2.grid(True) 

542 

543 if n_channels % 128 == 0: 

544 plt.savefig(plot_dir / f"coherence_all_{gain}_board_{board_id}_pedestal_hist{log_filename}.png") 

545 else: 

546 plt.savefig( 

547 plot_dir / f"{gain}_board_{board_id}_cnoise_ch_{min_channel}_{min_channel + n_channels}{log_filename}.png" 

548 ) 

549 plt.cla() 

550 plt.clf() 

551 plt.close() 

552 

553 if board_id is not None: 

554 return board_id.split("_"), row["pct_coh"] 

555 else: 

556 return None, None 

557 

558 

559def plot_coherent_noise_matrix(coh_matrix: Dict[str, Dict[str, Any]], plot_dir: pathlib.Path) -> None: 

560 for gain in coh_matrix.keys(): 

561 # Parse keys to extract board combinations 

562 unsorted_individual_boards: Set[str] = set() 

563 board_combinations = {} 

564 

565 for key, value in coh_matrix[gain].items(): 

566 boards_in_key = key.split("_") 

567 board_combinations[tuple(sorted(boards_in_key))] = value 

568 unsorted_individual_boards.update(boards_in_key) 

569 

570 individual_boards: List[str] = sorted(unsorted_individual_boards) 

571 n_boards = len(individual_boards) 

572 

573 if n_boards == 0: 

574 log.error("Error: No boards found!") 

575 return 

576 

577 # Find the combined noise value 

578 all_boards_key = tuple(individual_boards) 

579 combined_noise = board_combinations.get(all_boards_key, np.nan) 

580 

581 matrix = np.zeros((n_boards, n_boards)) 

582 

583 for i, board_x in enumerate(individual_boards): 

584 for j, board_y in enumerate(individual_boards): 

585 if i == j: 

586 # Diagonal: individual board coherent noise 

587 single_board_key = (board_x,) 

588 if single_board_key in board_combinations: 

589 matrix[i, j] = board_combinations[single_board_key] 

590 else: 

591 log.warning(f"Warning: Individual board key {single_board_key} not found") 

592 matrix[i, j] = np.nan 

593 else: 

594 # Off-diagonal: pairwise board coherent noise 

595 pairwise_key = tuple(sorted([board_x, board_y])) 

596 if pairwise_key in board_combinations: 

597 matrix[i, j] = board_combinations[pairwise_key] 

598 else: 

599 log.warning(f"Warning: Pairwise key {pairwise_key} not found") 

600 matrix[i, j] = np.nan 

601 

602 vmin = 0.1 # Small positive value 

603 vmax = max(np.max(matrix).astype(float), 6) 

604 norm = mcolors.TwoSlopeNorm(vmin=vmin, vcenter=5, vmax=vmax) 

605 fig, ax = plt.subplots(figsize=(max(6, n_boards * 1.5), max(6, n_boards * 1.5))) 

606 im = ax.imshow(matrix, cmap="Reds", norm=norm) 

607 

608 ax.set_xticks(np.arange(n_boards)) 

609 ax.set_yticks(np.arange(n_boards)) 

610 ax.set_xticklabels(individual_boards, rotation=45, fontsize=8) 

611 ax.set_yticklabels(individual_boards, fontsize=8) 

612 

613 # Add text annotations 

614 for i, j in product(range(n_boards), range(n_boards)): 

615 if np.isnan(matrix[i, j]): 

616 val_to_write = "N/A" 

617 color = "gray" 

618 else: 

619 val_to_write = f"{matrix[i, j]:.1f}" 

620 color = "k" 

621 

622 ax.text(j, i, val_to_write, ha="center", fontsize=16, va="center", color=color) 

623 

624 cbar = fig.colorbar(im, ax=ax) 

625 cbar.set_label("Coherent Noise [%]", rotation=270, labelpad=15) 

626 gain_str = "HI" if gain == "hi" else "LO" 

627 if not np.isnan(combined_noise): 

628 title = f"{gain_str} Gain Coherent Noise Per Channel Matrix [%]\n \ 

629 Combined value for all boards: {combined_noise:.1f}%" 

630 else: 

631 title = f"{gain_str} Gain Coherent Noise Per Channel Matrix [%]" 

632 

633 ax.set_title(title) 

634 ax.set_xlabel("Board ID") 

635 ax.set_ylabel("Board ID") 

636 fig.tight_layout() 

637 plt.savefig(plot_dir / f"coherent_noise_matrix_{gain}.png", dpi=150, bbox_inches="tight") 

638 plt.close() 

639 

640 

641def plot_correlation_matrix( 

642 matrix: np.ndarray, 

643 gain: Literal["hi", "lo"], 

644 min_channel: int, 

645 nchannels: int, 

646 plot_dir: pathlib.Path, 

647 run_number: Optional[int] = None, 

648 board_id: Optional[str] = None, 

649 attenuation: Optional[str] = None, 

650 pas_mode: Optional[Any] = None, 

651 info: Optional[Metadata] = None, 

652 plot_numbers: bool = True, 

653) -> None: 

654 """ 

655 Plot the correlation matrix for a given gain, minimum channel, and number of channels. 

656 

657 :param matrix: The correlation matrix. 

658 :type matrix: np.ndarray 

659 :param gain: The gain setting. 

660 :type gain: Literal["hi", "lo"] 

661 :param min_channel: The minimum channel number. 

662 :type min_channel: int 

663 :param nchannels: The number of channels. 

664 :type nchannels: int 

665 :param plot_dir: The directory to save the plot. 

666 :type plot_dir: pathlib.Path 

667 """ 

668 log.debug("Plotting correlation matrix") 

669 

670 plot_all = nchannels % 128 == 0 

671 max_channel = min_channel + nchannels 

672 if min_channel > matrix.shape[0]: 

673 # The given interval does not overlap with available channels 

674 log.error(f"min_channel {min_channel} > matrix shape {matrix.shape[0]}!") 

675 return 

676 

677 submatrix = matrix[min_channel:max_channel, min_channel:max_channel] 

678 

679 fig, ax = plt.subplots(figsize=(nchannels / 4, nchannels / 4)) 

680 _ = ax.imshow(submatrix, cmap="RdBu", vmin=-0.3, vmax=0.3) 

681 ax.set_xticks(np.arange(0, nchannels, max(1, round(nchannels / 32)))) 

682 ax.set_yticks(np.arange(0, nchannels, max(1, round(nchannels / 32)))) 

683 ax.set_xticklabels( 

684 np.arange(min_channel, min_channel + nchannels, max(1, round(nchannels / 32))), 

685 rotation=90, 

686 fontsize=7, 

687 ) 

688 ax.set_yticklabels(np.arange(min_channel, min_channel + nchannels, max(1, round(nchannels / 32))), fontsize=7) 

689 

690 # Use grid lines at every 128 entries 

691 for pos in np.arange(128, nchannels, 128): 

692 ax.axvline(pos - 0.5, color="black", linewidth=3, alpha=0.7) 

693 ax.axhline(pos - 0.5, color="black", linewidth=3, alpha=0.7) 

694 

695 if plot_numbers: 

696 # for i, j in product(range(min_channel, min(128, max_channel)), range(min_channel, min(128, max_channel))): 

697 for i, j in product(range(min_channel, max_channel), range(min_channel, max_channel)): 

698 val_to_write = " " if np.isnan(matrix[i, j]) else f"{100 * matrix[i, j]:.0f}" 

699 color = "w" if ((i == j) and (val_to_write != "0")) else "k" 

700 _ = ax.text( 

701 j - min_channel, i - min_channel, val_to_write, ha="center", fontsize=6, va="center", color=color 

702 ) 

703 

704 default_fontsize = plt.rcParams["font.size"] 

705 ax.set_title( 

706 helper.plot_summary_string( 

707 board_id=board_id, 

708 run_numbers=run_number, 

709 channels=f"{min_channel}-{max_channel - 1}", 

710 gain=gain, 

711 name="pearson correlation [%]", 

712 attenuation=attenuation, 

713 pas_mode=pas_mode, 

714 info=info, 

715 ), 

716 fontsize=default_fontsize / (1.2 if nchannels < 64 else 1), 

717 ) 

718 fig.tight_layout() 

719 if plot_all: 

720 plt.savefig(plot_dir / f"{gain}_board_{board_id}_corr.png") 

721 else: 

722 plt.savefig(plot_dir / f"{gain}_board_{board_id}_corr_ch_{min_channel}_{max_channel}.png") 

723 plt.cla() 

724 plt.clf() 

725 plt.close() 

726 

727 

728def plot_fft( 

729 channel: int, 

730 gain: Literal["hi", "lo"], 

731 freq: List[float], 

732 psd: List[float], 

733 peaks: List[int], 

734 plot_dir: pathlib.Path, 

735 run_number: Optional[int] = None, 

736 board_id: Optional[str] = None, 

737 attenuation: Optional[str] = None, 

738 pas_mode: Optional[Any] = None, 

739 info: Optional[Metadata] = None, 

740) -> None: 

741 """ 

742 Plot the FFT of the pedestal samples for a given channel and gain. 

743 

744 :param channel: The channel number. 

745 :type channel: int 

746 :param gain: The gain setting. 

747 :type gain: Literal["hi", "lo"] 

748 :param freq: The frequency array. 

749 :type freq: np.ndarray 

750 :param psd: The power spectral density array. 

751 :type psd: np.ndarray 

752 :param plot_dir: The directory to save the plot. 

753 :type plot_dir: pathlib.Path 

754 """ 

755 

756 # # Convert to dBFS 

757 psd_dbfs = 10 * np.log10(np.array(psd) / (2**constants.ADC_BITS) ** 2) 

758 

759 plt.plot(freq, psd_dbfs, color="k") 

760 plt.plot([freq[peak] for peak in peaks], [psd_dbfs[peak] for peak in peaks], "xr") 

761 for peak in peaks: 

762 peak_psd_dbfs = float(psd_dbfs[peak]) 

763 plt.text(freq[peak] - 0.5, peak_psd_dbfs + 0.5, f"{freq[peak]:.02f}", rotation=45, color="k") 

764 plt.ylim(top=max(psd_dbfs) + 2) 

765 plt.xlabel("Frequency [MHz]") 

766 plt.ylabel("PSD [dB Full Scale]") 

767 plt.title( 

768 helper.plot_summary_string( 

769 board_id=board_id, 

770 run_numbers=run_number, 

771 channels=helper.list_to_text_ranges(channel), 

772 gain=gain, 

773 name="Power Spectral Density", 

774 attenuation=attenuation, 

775 pas_mode=pas_mode, 

776 info=info, 

777 ) 

778 ) 

779 plt.grid() 

780 plt.tight_layout() 

781 _ = plt.subplot(111) 

782 plt.savefig(plot_dir / f"pedestal_FFT_board_{board_id}_channel{channel:03}_{gain}.png") 

783 plt.cla() 

784 plt.clf() 

785 plt.close() 

786 

787 

788def plot_fft2d( 

789 freq: List[float], 

790 ffts: Union[pl.Series, np.ndarray], 

791 plot_dir: pathlib.Path, 

792 channels: Union[pl.Series, np.ndarray, List[int]], 

793 gain: Literal["hi", "lo"], 

794 run_number: Optional[int] = None, 

795 board_id: Optional[str] = None, 

796 pas_mode: Optional[Any] = None, 

797 unit: str = "MHz", 

798 extra_filename: str = "", 

799) -> None: 

800 """ 

801 Plot the 2D FFT of the pedestal samples for a given set of channels and gain. 

802 

803 :param freq: The frequency of the fft. 

804 :type freq: List[float] 

805 :param ffts: The 2D list of ffts. ffts[i] is the fft for channel i 

806 :type ffts: pl.Series 

807 :param plot_dir: The directory to save the plot. 

808 :type plot_dir: pathlib.Path 

809 :param channels: The channels that should be plotted 

810 :type channels: pl.Series 

811 :param gain: The gain setting. 

812 :type gain: Literal["hi", "lo"] 

813 :param run_number: The run number. 

814 :type run_number: int 

815 :param board_id: The board id. 

816 :type board_id: Optional[str] 

817 :param pas_mode: The ALFE mode or HPS mode. 

818 :type pas_mode: Optional[str] 

819 """ 

820 plt.figure(figsize=(12, 4)) 

821 

822 psd_dbfs = 10 * np.log10(np.stack(ffts) / (2**constants.ADC_BITS) ** 2) # type: ignore 

823 

824 arr = np.zeros((128, len(freq))) 

825 

826 for index, channel in enumerate(channels): 

827 arr[channel] = psd_dbfs[index] 

828 

829 plt.pcolormesh(freq, list(range(128)), arr) 

830 

831 if isinstance(channels, pl.Series): 

832 channels = channels.to_numpy() 

833 plt.title( 

834 helper.plot_summary_string( 

835 board_id=board_id, 

836 run_numbers=run_number, 

837 channels=helper.list_to_text_ranges(channels), 

838 gain=gain, 

839 name="Power Spectral Density", 

840 pas_mode=pas_mode, 

841 ) 

842 ) 

843 

844 plt.ylabel("Channel") 

845 plt.xlabel(f"Frequency [{unit}]") 

846 plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(1)) 

847 

848 cbar = plt.colorbar() 

849 cbar.ax.get_yaxis().labelpad = 15 

850 cbar.set_label("PSD [dB Full Scale]") 

851 

852 plt.tight_layout() 

853 

854 plt.savefig(plot_dir / f"pedestal_FFT_2D_board_{board_id}_{gain}{extra_filename}.png") 

855 plt.cla() 

856 plt.clf() 

857 plt.close() 

858 

859 

860def plot_coherence( 

861 channel1: int, 

862 channel2: int, 

863 gain: Literal["hi", "lo"], 

864 freq: np.ndarray, 

865 coh: np.ndarray, 

866 plot_dir: pathlib.Path, 

867 run_number: Optional[int] = None, 

868 board_id: Optional[str] = None, 

869 attenuation: Optional[str] = None, 

870 pas_mode: Optional[Any] = None, 

871 info: Optional[Metadata] = None, 

872) -> None: 

873 """ 

874 Plot the coherence between two channels. 

875 

876 :param channel: The channel number. 

877 :type channel: int 

878 :param gain: The gain setting. 

879 :type gain: Literal["hi", "lo"] 

880 :param freq: The frequency array. 

881 :type freq: np.ndarray 

882 :param coh: The coherence array. 

883 :type coh: np.ndarray 

884 :param plot_dir: The directory to save the plot. 

885 :type plot_dir: pathlib.Path 

886 """ 

887 plt.semilogy( 

888 freq, 

889 coh, 

890 color="k", 

891 ) 

892 # plt.legend(loc=1) 

893 plt.xlabel("Frequency [MHz]") 

894 plt.ylabel("Coherence") 

895 plt.title( 

896 helper.plot_summary_string( 

897 board_id=board_id, 

898 run_numbers=run_number, 

899 channels=f"{channel1} & {channel2}", 

900 gain=gain, 

901 name="Pedestal Coherence", 

902 attenuation=attenuation, 

903 pas_mode=pas_mode, 

904 info=info, 

905 ) 

906 ) 

907 yLow, _ = plt.gca().get_ylim() 

908 plt.gca().set_ylim(yLow, 2) 

909 plt.grid() 

910 plt.tight_layout() 

911 _ = plt.subplot(111) 

912 plt.savefig(plot_dir / f"pedestal_coherence_channel{channel1}-{channel2}_{gain}.png") 

913 plt.cla() 

914 plt.clf() 

915 plt.close() 

916 

917 log.info(f"Wrote {plot_dir}/pedestal_coherence_channel{channel1}-{channel2}_{gain}.png") 

918 

919 

920def plot_chi2( 

921 derived_df: pl.DataFrame, 

922 gain: Literal["hi", "lo"], 

923 plot_dir: pathlib.Path, 

924 run_number: Optional[int] = None, 

925 board_id: Optional[str] = None, 

926 pas_mode: Optional[Any] = None, 

927 info: Optional[Metadata] = None, 

928) -> None: 

929 """ 

930 Plot χ²/dof vs. channel for the specified gain using the derived DataFrame. 

931 

932 :param derived_df: Polars DataFrame containing "channel", "chi2_dof", and "gain" columns. 

933 :param gain: Gain setting ("hi" or "lo"). 

934 :param plot_dir: Directory where the plot will be saved. 

935 :param run_number: Optional run number metadata. 

936 :param board_id: Optional board id metadata. 

937 :param pas_mode: Optional ALFE or HPS mode metadata. 

938 :param info: Optional additional metadata. 

939 """ 

940 # Filter for the specified gain and sort by channel 

941 df_gain = derived_df.filter(pl.col("gain") == gain).sort("channel") 

942 channels = np.array(df_gain["channel"].to_numpy()) 

943 chi2_values = np.array(df_gain["chi2_dof"].to_numpy()) 

944 

945 color = "red" if gain == "hi" else "blue" 

946 

947 fig, ax = plt.subplots(figsize=(10, 6)) 

948 ax.plot(channels, chi2_values, marker="o", linestyle="-", color=color, label=f"{gain.upper()} Gain") 

949 ax.set_xlabel("Channel") 

950 ax.set_ylabel("χ²/dof") 

951 ax.set_yscale("log") 

952 ax.set_title( 

953 helper.plot_summary_string( 

954 board_id=board_id, 

955 run_numbers=run_number, 

956 channels="All", 

957 gain=gain, 

958 name="χ²/dof vs. Channel", 

959 pas_mode=pas_mode, 

960 info=info, 

961 ) 

962 ) 

963 ax.grid(True) 

964 ax.legend() 

965 

966 fig.tight_layout() 

967 plot_file = plot_dir / f"{gain}_board_{board_id}_chi2.png" 

968 plt.savefig(plot_file) 

969 plt.cla() 

970 plt.clf() 

971 plt.close() 

972 

973 

974def calc_chi2_threshold(chi2_values, k: float = 6.0, j: float = -0.0005) -> float: 

975 ### Calculate a threshold for chi² values using median, stdev and MAD ### 

976 chi2_arr = np.asarray(chi2_values) 

977 med = np.median(chi2_arr) 

978 mad = np.median(np.abs(chi2_arr - med)) 

979 std = np.std(chi2_arr) 

980 

981 threshold = med + k * mad + j * std 

982 return threshold.astype(float) 

983 

984 

985def plot_chi2_hist( 

986 derived_df: pl.DataFrame, 

987 gain: Literal["hi", "lo"], 

988 plot_dir: pathlib.Path, 

989 chi2_threshold: Optional[float] = 0.0, 

990 run_number: Optional[int] = None, 

991 board_id: Optional[str] = None, 

992 pas_mode: Optional[Any] = None, 

993 info: Optional[Metadata] = None, 

994) -> None: 

995 """ 

996 Plot a histogram of the χ²/dof values for the specified gain. 

997 Adds a vertical line at χ²/dof = 1000 and a text box (occupying the right 25% of the plot) 

998 listing flagged channels (those with χ²/dof >= 1000) sorted in descending order. 

999 

1000 :param derived_df: Polars DataFrame containing "chi2_dof", "channel", and "gain" columns. 

1001 :param gain: The gain setting ("hi" or "lo"). 

1002 :param plot_dir: The directory where the plot will be saved. 

1003 :param run_number: Optional run number metadata. 

1004 :param board_id: Optional board id metadata. 

1005 :param pas_mode: Optional ALFE or HPS mode metadata. 

1006 :param info: Optional additional metadata. 

1007 """ 

1008 df_gain = derived_df.filter(pl.col("gain") == gain) 

1009 chi2_values = np.array(df_gain["chi2_dof"].to_numpy()) 

1010 

1011 # Define threshold and identify flagged channels sorted by highest chi² to lowest. 

1012 chi2_threshold = constants.CHI2_THRESHOLD 

1013 if chi2_threshold == 0: 

1014 chi2_threshold = calc_chi2_threshold(chi2_values) 

1015 flagged_df = df_gain.filter(pl.col("chi2_dof") >= chi2_threshold).sort("chi2_dof", descending=True) 

1016 flagged_list = flagged_df.select(["channel", "chi2_dof"]).to_dicts() 

1017 

1018 color = "red" if gain == "hi" else "blue" 

1019 

1020 fig, ax = plt.subplots(figsize=(10, 6)) 

1021 _ = ax.hist(chi2_values, bins="auto", edgecolor="black", color=color, alpha=0.7) 

1022 

1023 if flagged_list: 

1024 flagged_text = f"Flagged channels:\n(χ²/dof > {chi2_threshold:.4g})\n" 

1025 for entry in flagged_list: 

1026 flagged_text += f"Ch {entry['channel']}: {entry['chi2_dof']:.1f}\n" 

1027 

1028 ax.axvline(x=chi2_threshold, color="black", linestyle="--", linewidth=2) 

1029 ax.text( 

1030 0.84, 

1031 0.98, 

1032 flagged_text, 

1033 transform=ax.transAxes, 

1034 fontsize=10, 

1035 verticalalignment="top", 

1036 horizontalalignment="left", 

1037 bbox=dict(boxstyle="round", facecolor="white", alpha=0.8), 

1038 ) 

1039 

1040 ax.set_xscale("log") 

1041 ax.set_yscale("log") 

1042 

1043 ax.set_xlabel("χ²/dof") 

1044 ax.set_ylabel("Count") 

1045 ax.set_title( 

1046 helper.plot_summary_string( 

1047 board_id=board_id, 

1048 run_numbers=run_number, 

1049 channels="All", 

1050 gain=gain, 

1051 name="χ²/dof Histogram", 

1052 pas_mode=pas_mode, 

1053 info=info, 

1054 ) 

1055 ) 

1056 ax.grid(True) 

1057 

1058 fig.tight_layout() 

1059 plt.savefig(plot_dir / f"{gain}_board_{board_id}_chi2_hist.png") 

1060 plt.cla() 

1061 plt.clf() 

1062 plt.close()