Coverage for polars_analysis / plotting / pedestal_plotting.py: 84%
432 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import logging
2import pathlib
3from itertools import product
4from pathlib import Path
5from typing import Any, Dict, List, Literal, Optional, Set, Union
7import matplotlib
8from typing_extensions import deprecated
10matplotlib.use("agg")
11import matplotlib.colors as mcolors
12import matplotlib.pyplot as plt
13import matplotlib.ticker as ticker
14import numpy as np
15import polars as pl
16import scipy
17from diptest import diptest # type: ignore
18from scipy.stats import gamma
20from polars_analysis.analysis import constants
21from polars_analysis.analysis.cut_thresholds import CutThresholds
22from polars_analysis.plotting import helper
23from polars_analysis.plotting.helper import Metadata
25# Instantiate logger
26log = logging.getLogger(__name__)
28"""
29Functions for plotting pedestal run data, which has already been processed.
30"""
33def plot_raw(
34 info: Metadata,
35 samples: np.ndarray,
36 plot_dir: pathlib.Path,
37) -> None:
38 """
39 Plot the raw pedestal samples for a given channel and gain.
41 :param run_number: The run number.
42 :type run_number: int
43 :param channel: The channel number.
44 :type channel: int
45 :param gain: The gain setting.
46 :type gain: Literal["hi", "lo"]
47 :param samples: The raw pedestal samples.
48 :type samples: np.ndarray
49 :param plot_dir: The directory to save the plot.
50 :type plot_dir: pathlib.Path
51 """
52 plt.figure(figsize=(10, 10))
53 plt.plot(
54 samples,
55 "k.",
56 label=rf"""Samples
57$\mu$ = {np.mean(samples):.02f}, $\sigma$ = {np.std(samples):.02f}""",
58 )
59 plt.legend(loc=1)
60 plt.title(
61 helper.plot_summary_string(
62 name="Pedestal",
63 board_id=info.board_id,
64 info=info,
65 )
66 )
67 plt.xlabel("Sample #")
68 plt.ylabel("ADC Counts")
69 plt.grid()
70 plt.tight_layout()
71 if info.channels is None:
72 info.channels = "999"
73 plt.savefig(Path(plot_dir) / f"rawPed_meas0_board_{info.board_id}_channel{info.channels.zfill(3)}_{info.gain}.png")
74 plt.cla()
75 plt.clf()
76 plt.close()
79@deprecated("outlier test is no longer used")
80def outlier_test(
81 run_number: int,
82 channel: int,
83 gain: Literal["hi", "lo"],
84 samples: np.ndarray,
85 plot_dir: pathlib.Path,
86 board_id: Optional[str] = None,
87 attenuation: Optional[str] = None,
88 pas_mode: Optional[Any] = None,
89 info: Optional[Metadata] = None,
90) -> None: # pragma: no cover
91 """
92 Perform an outlier test by creating a histogram of the deviation from mean,
93 divided by standard deviation.
95 :param run_number: The run number.
96 :type run_number: int
97 :param channel: The channel number.
98 :type channel: int
99 :param gain: The gain setting.
100 :type gain: Literal["hi", "lo"]
101 :param samples: The raw pedestal samples.
102 :type samples: np.ndarray
103 :param plot_dir: The directory to save the plot.
104 :type plot_dir: pathlib.Path
105 """
106 mean = np.mean(samples)
107 std = np.std(samples)
108 deviations = (samples - mean) / std # Calculate the deviation from the mean in units of standard deviation
110 plt.figure(figsize=(10, 10))
111 plt.hist(deviations, bins=100, log=True, color="blue", alpha=0.7) # Logarithmic y-axis
112 plt.title(
113 helper.plot_summary_string(
114 board_id=board_id,
115 run_numbers=run_number,
116 channels=helper.list_to_text_ranges(channel),
117 gain=gain,
118 name="Outlier Test",
119 attenuation=attenuation,
120 pas_mode=pas_mode,
121 info=info,
122 )
123 )
124 plt.xlabel("Deviation from Mean (σ)")
125 plt.ylabel("log(N samples)")
126 plt.grid(True)
127 plt.tight_layout()
128 plt.savefig(Path(plot_dir) / f"outlier_test_channel{channel:03}_{gain}.png")
129 plt.cla()
130 plt.clf()
131 plt.close()
134def plot_hist(
135 info: Metadata,
136 samples: np.ndarray,
137 plot_dir: pathlib.Path,
138) -> None:
139 """
140 Plot the histogram of the pedestal samples for a given channel and gain.
142 :param run_number: The run number.
143 :type run_number: int
144 :param channel: The channel number.
145 :type channel: int
146 :param gain: The gain setting.
147 :type gain: Literal["hi", "lo"]
148 :param samples: The pedestal samples.
149 :type samples: np.ndarray
150 :param plot_dir: The directory to save the plot.
151 :type plot_dir: pathlib.Path
152 """
153 min_samples = min(samples)
154 max_samples = max(samples)
155 stdev = np.std(samples)
156 skew = scipy.stats.skew(samples)
158 _, (ax1, ax2) = plt.subplots(2, gridspec_kw={"hspace": 0, "height_ratios": [3, 1]}, sharex=True)
159 ax1.label_outer()
160 ax2.label_outer()
162 bin_width = 1
163 bins = np.arange(min_samples, max_samples + bin_width, bin_width)
164 if len(bins) > 1:
165 bin_content, _, h = ax1.hist(samples, bins=bins, align="mid", edgecolor="black")
166 _, dip_pval = diptest(bin_content) # type: ignore
167 h[0].set_label(f"RMS={stdev:.01f}, γ={skew:.02f}, dip={dip_pval:.02f}")
168 else:
169 bin_content, _, h = ax1.hist(samples, align="mid", edgecolor="black", label=f"RMS={stdev:.01f}")
171 fit_pars = helper.calc_gaussian(samples, bins)
172 bin_centers = 0.5 * (bins[1:] + bins[:-1])
173 gauss_fit = helper.gauss(bin_centers, mu=fit_pars[0], sigma=fit_pars[2], N=fit_pars[4])
175 ax1.plot(
176 bin_centers,
177 gauss_fit,
178 color="r",
179 label=rf"""$\mu$ = {fit_pars[0]:.03f} $\pm$ {fit_pars[1]:.03f},
180$\sigma$ = {fit_pars[2]:.03f} $\pm$ {fit_pars[3]:.03f}""",
181 )
183 ax1.legend(loc="upper right")
184 ax1.set_xlabel("ADC Counts")
185 left, right = ax1.get_xlim()
186 ax1.set_xlim(left, right + (left - right) * -0.1)
187 bottom, top = ax1.get_ylim()
188 ax1.set_ylim(bottom, top + (bottom - top) * -0.1)
189 ax1.set_ylabel("Entries")
190 ax1.set_title(
191 helper.plot_summary_string(
192 name="Pedestal",
193 board_id=info.board_id,
194 info=info,
195 )
196 )
197 ax1.grid(True)
199 dof = len(bin_centers) - 3
200 a = 0.32 # 1 sigma
201 err_up = gamma.ppf(1 - a / 2, gauss_fit + 1, scale=1) - gauss_fit # approx up side of Poisson interval
202 err_dw = gauss_fit - gamma.ppf(a / 2, gauss_fit, scale=1) # approx down side of Poisson interval
203 residuals = bin_content - gauss_fit
204 err = np.where(residuals > 0, err_up, err_dw)
205 residual_sigma = residuals / err
206 chi2_dof = np.sum((bin_content - gauss_fit) ** 2 / err**2) / dof
208 ax2.plot(bin_centers, residual_sigma, color="r", label="Fit residual σ", marker=".", linestyle="None")
209 ax2.text(0.8, 0.8, f"Χ²/dof={chi2_dof:.02f}", transform=ax2.transAxes)
210 ax2.set_ylabel("σ")
211 ax2.set_xlabel("ADC Counts")
212 ax2.grid(True)
214 if info.channels is None:
215 info.channels = "999"
216 plt.savefig(Path(plot_dir) / f"channel{info.channels.zfill(3)}_{info.gain}_board_{info.board_id}_pedestal_hist.png")
217 plt.cla()
218 plt.clf()
219 plt.close()
222def plot_autocorrelation(
223 run_number: int,
224 channel: int,
225 gain: Literal["hi", "lo"],
226 autocorrelation: np.ndarray,
227 plot_dir: pathlib.Path,
228 board_id: Optional[str] = None,
229 attenuation: Optional[str] = None,
230 pas_mode: Optional[Any] = None,
231 info: Optional[Metadata] = None,
232) -> None:
233 """
234 Plot the autocorrelation of the pedestal samples for a given channel and gain.
236 :param run_number: The run number.
237 :type run_number: int
238 :param channel: The channel number.
239 :type channel: int
240 :param gain: The gain setting.
241 :type gain: Literal["hi", "lo"]
242 :param autocorrelation: The autocorrelation of the pedestal samples.
243 :type autocorrelation: np.ndarray
244 :param plot_dir: The directory to save the plot.
245 :type plot_dir: pathlib.Path
246 """
247 plt.plot(autocorrelation[0:50], "k.-", label="Autocorrelation")
248 plt.axhline(y=0, color="r", ls="--")
249 plt.legend(loc="upper right")
250 plt.title(
251 helper.plot_summary_string(
252 board_id=board_id,
253 run_numbers=run_number,
254 channels=helper.list_to_text_ranges(channel),
255 gain=gain,
256 name="Pedestal",
257 attenuation=attenuation,
258 pas_mode=pas_mode,
259 info=info,
260 )
261 )
262 plt.xlabel("Lag")
263 plt.ylabel("Autocorrelation")
264 plt.grid()
265 plt.tight_layout()
266 plt.savefig(Path(plot_dir) / f"autocorr_board_{board_id}_channel{channel:03}_{gain}.png")
267 plt.cla()
268 plt.clf()
269 plt.close()
272def plot_baseline_means_rms(
273 derived_df: pl.DataFrame,
274 plot_dir: pathlib.Path,
275 skip_channels_hi: Optional[List[int]] = None,
276 skip_channels_lo: Optional[List[int]] = None,
277 draw_bounds: Optional[bool] = False,
278 run_number: Optional[int] = None,
279 board_id: Optional[str] = None,
280 attenuation: Optional[str] = None,
281 pas_mode: Optional[Any] = None,
282 info: Optional[Metadata] = None,
283) -> None:
284 """
285 Plot the mean, RMS, and maxmin of the pedestal values for a given measurement and channels for high and low gain.
287 :param derived_df: DataFrame containing mean, std, and maxmin columns for hi and lo gains
288 :type derived_df: pl.DataFrame
289 :param plot_dir: The directory to save the plot.
290 :type plot_dir: pathlib.Path
291 """
292 n_channels = derived_df["channel"].n_unique()
294 color_dict = {"lo": "b", "hi": "r"}
295 title_dict = {"lo": "LG", "hi": "HG"}
297 fig, ax = plt.subplots()
298 plt.xticks(np.arange(0, n_channels, 4), rotation=70)
299 ax.xaxis.set_tick_params(pad=0.1)
300 fig2, ax2 = plt.subplots(1)
301 plt.xticks(np.arange(0, n_channels, 4), rotation=70)
302 ax2.xaxis.set_tick_params(pad=0.1)
303 fig3, ax3 = plt.subplots(1)
304 plt.xticks(np.arange(0, n_channels, 4), rotation=70)
305 ax3.xaxis.set_tick_params(pad=0.1)
306 max_means = 0
307 max_rms = 0
308 max_maxmins = 0
309 gains: List[Literal["hi", "lo"]] = ["hi", "lo"]
310 for gain in gains:
311 gain_df = derived_df.filter(pl.col("gain") == gain)
312 gain_df = gain_df.sort(by="measurement").unique("channel", keep="first").sort(by="channel")
313 if gain == "hi" and skip_channels_hi is not None:
314 gain_df = gain_df.filter(~pl.col("channel").is_in(skip_channels_hi))
315 elif gain == "lo" and skip_channels_lo is not None:
316 gain_df = gain_df.filter(~pl.col("channel").is_in(skip_channels_lo))
317 means = gain_df["mean"].to_numpy()
318 stds = gain_df["std"].to_numpy()
319 maxmins = gain_df["maxmin"].to_numpy()
320 channels = gain_df["channel"].to_numpy()
321 color = color_dict[gain]
322 title = title_dict[gain]
324 ax.grid(visible=True, zorder=0)
325 mean = np.mean(means)
326 std = np.std(means) / np.sqrt(len(means))
327 ax.bar(channels, means, fill=False, ec=color, label=f"{title} mean = {mean:.2f}±{std:.2f}", zorder=3)
328 max_means = max(max_means, max(means))
329 ax.margins(x=0)
331 ax2.grid(visible=True, zorder=0)
332 mean = np.mean(stds)
333 std = np.std(stds) / np.sqrt(len(stds))
334 ax2.bar(channels, stds, fill=False, ec=color, label=f"{title} mean = {mean:.2f}±{std:.2f}", zorder=3)
335 max_rms = max(max_rms, max(stds))
336 ax2.margins(x=0)
338 ax3.grid(visible=True, zorder=0)
339 mean = np.mean(maxmins)
340 std = np.std(maxmins) / np.sqrt(len(maxmins))
341 ax3.bar(channels, maxmins, fill=False, ec=color, label=f"{title} mean = {mean:.2f}±{std:.2f}", zorder=3)
342 max_maxmins = max(max_maxmins, max(maxmins))
343 ax3.margins(x=0)
344 # Draw acceptance cuts
345 if draw_bounds:
346 cuts = CutThresholds()
347 cuts.draw_on(ax, "mean", gain=gain)
348 cuts.draw_on(ax2, "std", gain=gain)
349 cuts.draw_on(ax3, "maxmin", gain=gain)
351 all_channels = derived_df["channel"].unique().sort().to_list()
352 ax.set_title(
353 helper.plot_summary_string(
354 board_id=board_id,
355 run_numbers=run_number,
356 channels=helper.list_to_text_ranges(all_channels),
357 name="Mean Pedestal Value",
358 attenuation=attenuation,
359 pas_mode=pas_mode,
360 info=info,
361 )
362 )
363 ax.set_xlabel("Channel", loc="right")
364 ax.set_ylabel("ADC Counts")
365 ax.set_ylim(0, 1.33 * max_means)
366 ax.legend()
367 fig.tight_layout()
368 fig.savefig(f"{plot_dir}/mu_board_{board_id}_summary.png")
369 fig.clf()
371 ax2.set_title(
372 helper.plot_summary_string(
373 board_id=board_id,
374 run_numbers=run_number,
375 channels=helper.list_to_text_ranges(all_channels),
376 name="Pedestal RMS",
377 attenuation=attenuation,
378 pas_mode=pas_mode,
379 info=info,
380 )
381 )
382 ax2.set_xlabel("Channel", loc="right")
383 ax2.set_ylabel("ADC Counts")
384 ax2.set_ylim(0, 1.33 * max_rms)
385 ax2.legend()
386 fig2.tight_layout()
387 fig2.savefig(f"{plot_dir}/rms_board_{board_id}_summary.png")
388 fig2.clf()
390 ax3.set_title(
391 helper.plot_summary_string(
392 board_id=board_id,
393 run_numbers=run_number,
394 channels=helper.list_to_text_ranges(all_channels),
395 name="Pedestal Max-Min",
396 attenuation=attenuation,
397 pas_mode=pas_mode,
398 info=info,
399 )
400 )
401 ax3.set_xlabel("Channel", loc="right")
402 ax3.set_ylabel("ADC Counts")
403 ax3.set_ylim(0, 1.33 * max_maxmins)
404 ax3.legend()
405 fig3.tight_layout()
406 fig3.savefig(f"{plot_dir}/maxmin_board_{board_id}_summary.png")
407 fig3.clf()
409 plt.cla()
410 plt.clf()
411 plt.close()
414def plot_coherent_noise(
415 row: Dict[str, Any],
416 plot_dir: pathlib.Path,
417 meas_type: str = "pedestal",
418 board_id: Optional[str] = None,
419 attenuation: Optional[str] = None,
420 pas_mode: Optional[Any] = None,
421 use_log_scale: bool = False,
422) -> Union[tuple[List[str], Any], tuple[None, None]]:
423 """
424 Plot the coherent noise for a given measurement type and gain, given a DataFrame of coherent noise results.
426 :param row: A dictionary from pl.DataFrame.iter_rows(named=True) containing the coherent noise results.
427 Must contain the following columns:
429 * min_channel: The minimum channel number.
430 * n_channels: The number of channels.
431 * gain: The gain setting.
432 * data_sum_hist: Histogram of the per samples sum of the pedestal values across channels.
433 * data_sum_bins: Bin edges for data_sum_hist
434 * tot_noise: The total noise.
435 * ch_noise: The channel noise.
436 * avg_noise: The average noise.
437 * coh_noise: The coherent noise.
438 * pct_coh: The percentage of coherent noise.
439 * d_tot_noise: The total noise uncertainty.
440 * d_ch_noise: The channel noise uncertainty.
441 * d_avg: The average noise uncertainty.
442 * d_coh: The coherent noise uncertainty.
443 * d_pct: The percentage of coherent noise uncertainty.
444 :type row: dict[str, Any]
445 :param run_number: The run number.
446 :type run_number: int
447 :param plot_dir: The directory to save the plot.
448 :type plot_dir: pathlib.Path
449 :param meas_type: The measurement type, defaults to "pedestal".
450 :type meas_type: str, optional
451 """
452 run_number: int = row["run_number"]
453 if not board_id:
454 board_id = row["board_id"] if "board_id" in row else None
455 min_channel: int = row["min_channel"]
456 n_channels: int = row["n_channels"]
457 gain: Literal["hi", "lo"] = row["gain"]
458 data_sum_hist: np.ndarray = np.array(row["data_sum_hist"])
459 bins: np.ndarray = np.array(row["data_sum_bins"])
460 channel_list = row["channel_list"]
462 centers = 0.5 * (bins[1:] + bins[:-1])
463 mu = helper.hist_mean(centers, data_sum_hist)
464 skew = helper.hist_moment(centers, data_sum_hist, 3)
466 _, (ax1, ax2) = plt.subplots(2, gridspec_kw={"hspace": 0, "height_ratios": [3, 1]}, sharex=True)
467 ax1.label_outer()
468 ax2.label_outer()
470 _, dip_pval = diptest(data_sum_hist) # type: ignore
471 _ = ax1.bar(
472 x=bins[:-1],
473 height=data_sum_hist,
474 width=bins[1:] - bins[:-1],
475 align="edge",
476 color="steelblue",
477 fill=True,
478 edgecolor="black",
479 label=rf"""$\mu$ = {mu:.02f}, $\sigma$ = {row["tot_noise"]:.02f},
480γ={skew:.02f}, dip={dip_pval:.02f}""",
481 )
483 fit_pars = helper.calc_gaussian_from_bins(data_sum_hist, bins)
484 fit_mu, fit_sigma, fit_N = fit_pars[0], fit_pars[2], fit_pars[4]
485 e_fit_mu, e_fit_sigma = fit_pars[1], fit_pars[3]
486 bin_centers = 0.5 * (bins[1:] + bins[:-1])
487 gauss_fit = helper.gauss(bin_centers, fit_mu, fit_sigma, fit_N)
488 ax1.plot(
489 bin_centers,
490 gauss_fit,
491 color="r",
492 label=rf"""Gaussian Fit
493$\mu$ = {fit_mu:.02f}±{e_fit_mu:0.2f}, $\sigma$ = {fit_sigma:.02f}±{e_fit_sigma:.02f}""",
494 )
495 ax1.set_ylim(0, max(data_sum_hist) * 1.4)
496 ax1.set_xlabel("ADC Counts")
497 ax1.set_ylabel("Entries")
498 ax1.set_title(
499 helper.plot_summary_string(
500 board_id=board_id,
501 run_numbers=run_number,
502 channels=helper.list_to_text_ranges(channel_list),
503 gain=gain,
504 name=f"{meas_type} coherent noise",
505 attenuation=attenuation,
506 pas_mode=pas_mode,
507 )
508 )
509 ax1.grid()
510 ax1.text(
511 0.995,
512 0.84,
513 rf"""$\sqrt{{\sigma_i^2}}$ = {row["ch_noise"]:.02f} $\pm$ {row["d_ch_noise"]:.02f}
514 Avg. noise/ch = {row["avg_noise"]:.02f} $\pm$ {row["d_avg"]:.02f}
515 Coh. noise/ch = {row["coh_noise"]:.02f} $\pm$ {row["d_coh"]:.02f}
516 [%] Coh. noise = {row["pct_coh"]:.02f} $\pm$ {row["d_pct"]:.02f}""",
517 horizontalalignment="right",
518 verticalalignment="center",
519 transform=ax1.transAxes,
520 )
521 log_filename: Literal["", "_log"] = ""
522 if use_log_scale:
523 ax1.set_yscale("log")
524 ax1.set_ylim(bottom=0.5)
525 log_filename = "_log"
526 ax1.legend(loc="upper left", handlelength=1.0)
528 dof = len(bin_centers) - 3
529 a = 0.32 # 1 sigma
530 err_up = gamma.ppf(1 - a / 2, gauss_fit + 1, scale=1) - gauss_fit # approx up side of Poisson interval
531 err_dw = gauss_fit - gamma.ppf(a / 2, gauss_fit, scale=1) # approx down side of Poisson interval
532 residuals = data_sum_hist - gauss_fit
533 err = np.where(residuals > 0, err_up, err_dw)
534 residual_sigma = residuals / err
535 chi2_dof = np.sum((data_sum_hist - gauss_fit) ** 2 / err**2) / dof
537 ax2.plot(bin_centers, residual_sigma, color="r", label="Fit residual σ", marker=".", linestyle="None")
538 ax2.text(0.8, 0.8, f"Χ²/dof={chi2_dof:.02f}", transform=ax2.transAxes)
539 ax2.set_ylabel("σ")
540 ax2.set_xlabel("ADC Counts")
541 ax2.grid(True)
543 if n_channels % 128 == 0:
544 plt.savefig(plot_dir / f"coherence_all_{gain}_board_{board_id}_pedestal_hist{log_filename}.png")
545 else:
546 plt.savefig(
547 plot_dir / f"{gain}_board_{board_id}_cnoise_ch_{min_channel}_{min_channel + n_channels}{log_filename}.png"
548 )
549 plt.cla()
550 plt.clf()
551 plt.close()
553 if board_id is not None:
554 return board_id.split("_"), row["pct_coh"]
555 else:
556 return None, None
559def plot_coherent_noise_matrix(coh_matrix: Dict[str, Dict[str, Any]], plot_dir: pathlib.Path) -> None:
560 for gain in coh_matrix.keys():
561 # Parse keys to extract board combinations
562 unsorted_individual_boards: Set[str] = set()
563 board_combinations = {}
565 for key, value in coh_matrix[gain].items():
566 boards_in_key = key.split("_")
567 board_combinations[tuple(sorted(boards_in_key))] = value
568 unsorted_individual_boards.update(boards_in_key)
570 individual_boards: List[str] = sorted(unsorted_individual_boards)
571 n_boards = len(individual_boards)
573 if n_boards == 0:
574 log.error("Error: No boards found!")
575 return
577 # Find the combined noise value
578 all_boards_key = tuple(individual_boards)
579 combined_noise = board_combinations.get(all_boards_key, np.nan)
581 matrix = np.zeros((n_boards, n_boards))
583 for i, board_x in enumerate(individual_boards):
584 for j, board_y in enumerate(individual_boards):
585 if i == j:
586 # Diagonal: individual board coherent noise
587 single_board_key = (board_x,)
588 if single_board_key in board_combinations:
589 matrix[i, j] = board_combinations[single_board_key]
590 else:
591 log.warning(f"Warning: Individual board key {single_board_key} not found")
592 matrix[i, j] = np.nan
593 else:
594 # Off-diagonal: pairwise board coherent noise
595 pairwise_key = tuple(sorted([board_x, board_y]))
596 if pairwise_key in board_combinations:
597 matrix[i, j] = board_combinations[pairwise_key]
598 else:
599 log.warning(f"Warning: Pairwise key {pairwise_key} not found")
600 matrix[i, j] = np.nan
602 vmin = 0.1 # Small positive value
603 vmax = max(np.max(matrix).astype(float), 6)
604 norm = mcolors.TwoSlopeNorm(vmin=vmin, vcenter=5, vmax=vmax)
605 fig, ax = plt.subplots(figsize=(max(6, n_boards * 1.5), max(6, n_boards * 1.5)))
606 im = ax.imshow(matrix, cmap="Reds", norm=norm)
608 ax.set_xticks(np.arange(n_boards))
609 ax.set_yticks(np.arange(n_boards))
610 ax.set_xticklabels(individual_boards, rotation=45, fontsize=8)
611 ax.set_yticklabels(individual_boards, fontsize=8)
613 # Add text annotations
614 for i, j in product(range(n_boards), range(n_boards)):
615 if np.isnan(matrix[i, j]):
616 val_to_write = "N/A"
617 color = "gray"
618 else:
619 val_to_write = f"{matrix[i, j]:.1f}"
620 color = "k"
622 ax.text(j, i, val_to_write, ha="center", fontsize=16, va="center", color=color)
624 cbar = fig.colorbar(im, ax=ax)
625 cbar.set_label("Coherent Noise [%]", rotation=270, labelpad=15)
626 gain_str = "HI" if gain == "hi" else "LO"
627 if not np.isnan(combined_noise):
628 title = f"{gain_str} Gain Coherent Noise Per Channel Matrix [%]\n \
629 Combined value for all boards: {combined_noise:.1f}%"
630 else:
631 title = f"{gain_str} Gain Coherent Noise Per Channel Matrix [%]"
633 ax.set_title(title)
634 ax.set_xlabel("Board ID")
635 ax.set_ylabel("Board ID")
636 fig.tight_layout()
637 plt.savefig(plot_dir / f"coherent_noise_matrix_{gain}.png", dpi=150, bbox_inches="tight")
638 plt.close()
641def plot_correlation_matrix(
642 matrix: np.ndarray,
643 gain: Literal["hi", "lo"],
644 min_channel: int,
645 nchannels: int,
646 plot_dir: pathlib.Path,
647 run_number: Optional[int] = None,
648 board_id: Optional[str] = None,
649 attenuation: Optional[str] = None,
650 pas_mode: Optional[Any] = None,
651 info: Optional[Metadata] = None,
652 plot_numbers: bool = True,
653) -> None:
654 """
655 Plot the correlation matrix for a given gain, minimum channel, and number of channels.
657 :param matrix: The correlation matrix.
658 :type matrix: np.ndarray
659 :param gain: The gain setting.
660 :type gain: Literal["hi", "lo"]
661 :param min_channel: The minimum channel number.
662 :type min_channel: int
663 :param nchannels: The number of channels.
664 :type nchannels: int
665 :param plot_dir: The directory to save the plot.
666 :type plot_dir: pathlib.Path
667 """
668 log.debug("Plotting correlation matrix")
670 plot_all = nchannels % 128 == 0
671 max_channel = min_channel + nchannels
672 if min_channel > matrix.shape[0]:
673 # The given interval does not overlap with available channels
674 log.error(f"min_channel {min_channel} > matrix shape {matrix.shape[0]}!")
675 return
677 submatrix = matrix[min_channel:max_channel, min_channel:max_channel]
679 fig, ax = plt.subplots(figsize=(nchannels / 4, nchannels / 4))
680 _ = ax.imshow(submatrix, cmap="RdBu", vmin=-0.3, vmax=0.3)
681 ax.set_xticks(np.arange(0, nchannels, max(1, round(nchannels / 32))))
682 ax.set_yticks(np.arange(0, nchannels, max(1, round(nchannels / 32))))
683 ax.set_xticklabels(
684 np.arange(min_channel, min_channel + nchannels, max(1, round(nchannels / 32))),
685 rotation=90,
686 fontsize=7,
687 )
688 ax.set_yticklabels(np.arange(min_channel, min_channel + nchannels, max(1, round(nchannels / 32))), fontsize=7)
690 # Use grid lines at every 128 entries
691 for pos in np.arange(128, nchannels, 128):
692 ax.axvline(pos - 0.5, color="black", linewidth=3, alpha=0.7)
693 ax.axhline(pos - 0.5, color="black", linewidth=3, alpha=0.7)
695 if plot_numbers:
696 # for i, j in product(range(min_channel, min(128, max_channel)), range(min_channel, min(128, max_channel))):
697 for i, j in product(range(min_channel, max_channel), range(min_channel, max_channel)):
698 val_to_write = " " if np.isnan(matrix[i, j]) else f"{100 * matrix[i, j]:.0f}"
699 color = "w" if ((i == j) and (val_to_write != "0")) else "k"
700 _ = ax.text(
701 j - min_channel, i - min_channel, val_to_write, ha="center", fontsize=6, va="center", color=color
702 )
704 default_fontsize = plt.rcParams["font.size"]
705 ax.set_title(
706 helper.plot_summary_string(
707 board_id=board_id,
708 run_numbers=run_number,
709 channels=f"{min_channel}-{max_channel - 1}",
710 gain=gain,
711 name="pearson correlation [%]",
712 attenuation=attenuation,
713 pas_mode=pas_mode,
714 info=info,
715 ),
716 fontsize=default_fontsize / (1.2 if nchannels < 64 else 1),
717 )
718 fig.tight_layout()
719 if plot_all:
720 plt.savefig(plot_dir / f"{gain}_board_{board_id}_corr.png")
721 else:
722 plt.savefig(plot_dir / f"{gain}_board_{board_id}_corr_ch_{min_channel}_{max_channel}.png")
723 plt.cla()
724 plt.clf()
725 plt.close()
728def plot_fft(
729 channel: int,
730 gain: Literal["hi", "lo"],
731 freq: List[float],
732 psd: List[float],
733 peaks: List[int],
734 plot_dir: pathlib.Path,
735 run_number: Optional[int] = None,
736 board_id: Optional[str] = None,
737 attenuation: Optional[str] = None,
738 pas_mode: Optional[Any] = None,
739 info: Optional[Metadata] = None,
740) -> None:
741 """
742 Plot the FFT of the pedestal samples for a given channel and gain.
744 :param channel: The channel number.
745 :type channel: int
746 :param gain: The gain setting.
747 :type gain: Literal["hi", "lo"]
748 :param freq: The frequency array.
749 :type freq: np.ndarray
750 :param psd: The power spectral density array.
751 :type psd: np.ndarray
752 :param plot_dir: The directory to save the plot.
753 :type plot_dir: pathlib.Path
754 """
756 # # Convert to dBFS
757 psd_dbfs = 10 * np.log10(np.array(psd) / (2**constants.ADC_BITS) ** 2)
759 plt.plot(freq, psd_dbfs, color="k")
760 plt.plot([freq[peak] for peak in peaks], [psd_dbfs[peak] for peak in peaks], "xr")
761 for peak in peaks:
762 peak_psd_dbfs = float(psd_dbfs[peak])
763 plt.text(freq[peak] - 0.5, peak_psd_dbfs + 0.5, f"{freq[peak]:.02f}", rotation=45, color="k")
764 plt.ylim(top=max(psd_dbfs) + 2)
765 plt.xlabel("Frequency [MHz]")
766 plt.ylabel("PSD [dB Full Scale]")
767 plt.title(
768 helper.plot_summary_string(
769 board_id=board_id,
770 run_numbers=run_number,
771 channels=helper.list_to_text_ranges(channel),
772 gain=gain,
773 name="Power Spectral Density",
774 attenuation=attenuation,
775 pas_mode=pas_mode,
776 info=info,
777 )
778 )
779 plt.grid()
780 plt.tight_layout()
781 _ = plt.subplot(111)
782 plt.savefig(plot_dir / f"pedestal_FFT_board_{board_id}_channel{channel:03}_{gain}.png")
783 plt.cla()
784 plt.clf()
785 plt.close()
788def plot_fft2d(
789 freq: List[float],
790 ffts: Union[pl.Series, np.ndarray],
791 plot_dir: pathlib.Path,
792 channels: Union[pl.Series, np.ndarray, List[int]],
793 gain: Literal["hi", "lo"],
794 run_number: Optional[int] = None,
795 board_id: Optional[str] = None,
796 pas_mode: Optional[Any] = None,
797 unit: str = "MHz",
798 extra_filename: str = "",
799) -> None:
800 """
801 Plot the 2D FFT of the pedestal samples for a given set of channels and gain.
803 :param freq: The frequency of the fft.
804 :type freq: List[float]
805 :param ffts: The 2D list of ffts. ffts[i] is the fft for channel i
806 :type ffts: pl.Series
807 :param plot_dir: The directory to save the plot.
808 :type plot_dir: pathlib.Path
809 :param channels: The channels that should be plotted
810 :type channels: pl.Series
811 :param gain: The gain setting.
812 :type gain: Literal["hi", "lo"]
813 :param run_number: The run number.
814 :type run_number: int
815 :param board_id: The board id.
816 :type board_id: Optional[str]
817 :param pas_mode: The ALFE mode or HPS mode.
818 :type pas_mode: Optional[str]
819 """
820 plt.figure(figsize=(12, 4))
822 psd_dbfs = 10 * np.log10(np.stack(ffts) / (2**constants.ADC_BITS) ** 2) # type: ignore
824 arr = np.zeros((128, len(freq)))
826 for index, channel in enumerate(channels):
827 arr[channel] = psd_dbfs[index]
829 plt.pcolormesh(freq, list(range(128)), arr)
831 if isinstance(channels, pl.Series):
832 channels = channels.to_numpy()
833 plt.title(
834 helper.plot_summary_string(
835 board_id=board_id,
836 run_numbers=run_number,
837 channels=helper.list_to_text_ranges(channels),
838 gain=gain,
839 name="Power Spectral Density",
840 pas_mode=pas_mode,
841 )
842 )
844 plt.ylabel("Channel")
845 plt.xlabel(f"Frequency [{unit}]")
846 plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(1))
848 cbar = plt.colorbar()
849 cbar.ax.get_yaxis().labelpad = 15
850 cbar.set_label("PSD [dB Full Scale]")
852 plt.tight_layout()
854 plt.savefig(plot_dir / f"pedestal_FFT_2D_board_{board_id}_{gain}{extra_filename}.png")
855 plt.cla()
856 plt.clf()
857 plt.close()
860def plot_coherence(
861 channel1: int,
862 channel2: int,
863 gain: Literal["hi", "lo"],
864 freq: np.ndarray,
865 coh: np.ndarray,
866 plot_dir: pathlib.Path,
867 run_number: Optional[int] = None,
868 board_id: Optional[str] = None,
869 attenuation: Optional[str] = None,
870 pas_mode: Optional[Any] = None,
871 info: Optional[Metadata] = None,
872) -> None:
873 """
874 Plot the coherence between two channels.
876 :param channel: The channel number.
877 :type channel: int
878 :param gain: The gain setting.
879 :type gain: Literal["hi", "lo"]
880 :param freq: The frequency array.
881 :type freq: np.ndarray
882 :param coh: The coherence array.
883 :type coh: np.ndarray
884 :param plot_dir: The directory to save the plot.
885 :type plot_dir: pathlib.Path
886 """
887 plt.semilogy(
888 freq,
889 coh,
890 color="k",
891 )
892 # plt.legend(loc=1)
893 plt.xlabel("Frequency [MHz]")
894 plt.ylabel("Coherence")
895 plt.title(
896 helper.plot_summary_string(
897 board_id=board_id,
898 run_numbers=run_number,
899 channels=f"{channel1} & {channel2}",
900 gain=gain,
901 name="Pedestal Coherence",
902 attenuation=attenuation,
903 pas_mode=pas_mode,
904 info=info,
905 )
906 )
907 yLow, _ = plt.gca().get_ylim()
908 plt.gca().set_ylim(yLow, 2)
909 plt.grid()
910 plt.tight_layout()
911 _ = plt.subplot(111)
912 plt.savefig(plot_dir / f"pedestal_coherence_channel{channel1}-{channel2}_{gain}.png")
913 plt.cla()
914 plt.clf()
915 plt.close()
917 log.info(f"Wrote {plot_dir}/pedestal_coherence_channel{channel1}-{channel2}_{gain}.png")
920def plot_chi2(
921 derived_df: pl.DataFrame,
922 gain: Literal["hi", "lo"],
923 plot_dir: pathlib.Path,
924 run_number: Optional[int] = None,
925 board_id: Optional[str] = None,
926 pas_mode: Optional[Any] = None,
927 info: Optional[Metadata] = None,
928) -> None:
929 """
930 Plot χ²/dof vs. channel for the specified gain using the derived DataFrame.
932 :param derived_df: Polars DataFrame containing "channel", "chi2_dof", and "gain" columns.
933 :param gain: Gain setting ("hi" or "lo").
934 :param plot_dir: Directory where the plot will be saved.
935 :param run_number: Optional run number metadata.
936 :param board_id: Optional board id metadata.
937 :param pas_mode: Optional ALFE or HPS mode metadata.
938 :param info: Optional additional metadata.
939 """
940 # Filter for the specified gain and sort by channel
941 df_gain = derived_df.filter(pl.col("gain") == gain).sort("channel")
942 channels = np.array(df_gain["channel"].to_numpy())
943 chi2_values = np.array(df_gain["chi2_dof"].to_numpy())
945 color = "red" if gain == "hi" else "blue"
947 fig, ax = plt.subplots(figsize=(10, 6))
948 ax.plot(channels, chi2_values, marker="o", linestyle="-", color=color, label=f"{gain.upper()} Gain")
949 ax.set_xlabel("Channel")
950 ax.set_ylabel("χ²/dof")
951 ax.set_yscale("log")
952 ax.set_title(
953 helper.plot_summary_string(
954 board_id=board_id,
955 run_numbers=run_number,
956 channels="All",
957 gain=gain,
958 name="χ²/dof vs. Channel",
959 pas_mode=pas_mode,
960 info=info,
961 )
962 )
963 ax.grid(True)
964 ax.legend()
966 fig.tight_layout()
967 plot_file = plot_dir / f"{gain}_board_{board_id}_chi2.png"
968 plt.savefig(plot_file)
969 plt.cla()
970 plt.clf()
971 plt.close()
974def calc_chi2_threshold(chi2_values, k: float = 6.0, j: float = -0.0005) -> float:
975 ### Calculate a threshold for chi² values using median, stdev and MAD ###
976 chi2_arr = np.asarray(chi2_values)
977 med = np.median(chi2_arr)
978 mad = np.median(np.abs(chi2_arr - med))
979 std = np.std(chi2_arr)
981 threshold = med + k * mad + j * std
982 return threshold.astype(float)
985def plot_chi2_hist(
986 derived_df: pl.DataFrame,
987 gain: Literal["hi", "lo"],
988 plot_dir: pathlib.Path,
989 chi2_threshold: Optional[float] = 0.0,
990 run_number: Optional[int] = None,
991 board_id: Optional[str] = None,
992 pas_mode: Optional[Any] = None,
993 info: Optional[Metadata] = None,
994) -> None:
995 """
996 Plot a histogram of the χ²/dof values for the specified gain.
997 Adds a vertical line at χ²/dof = 1000 and a text box (occupying the right 25% of the plot)
998 listing flagged channels (those with χ²/dof >= 1000) sorted in descending order.
1000 :param derived_df: Polars DataFrame containing "chi2_dof", "channel", and "gain" columns.
1001 :param gain: The gain setting ("hi" or "lo").
1002 :param plot_dir: The directory where the plot will be saved.
1003 :param run_number: Optional run number metadata.
1004 :param board_id: Optional board id metadata.
1005 :param pas_mode: Optional ALFE or HPS mode metadata.
1006 :param info: Optional additional metadata.
1007 """
1008 df_gain = derived_df.filter(pl.col("gain") == gain)
1009 chi2_values = np.array(df_gain["chi2_dof"].to_numpy())
1011 # Define threshold and identify flagged channels sorted by highest chi² to lowest.
1012 chi2_threshold = constants.CHI2_THRESHOLD
1013 if chi2_threshold == 0:
1014 chi2_threshold = calc_chi2_threshold(chi2_values)
1015 flagged_df = df_gain.filter(pl.col("chi2_dof") >= chi2_threshold).sort("chi2_dof", descending=True)
1016 flagged_list = flagged_df.select(["channel", "chi2_dof"]).to_dicts()
1018 color = "red" if gain == "hi" else "blue"
1020 fig, ax = plt.subplots(figsize=(10, 6))
1021 _ = ax.hist(chi2_values, bins="auto", edgecolor="black", color=color, alpha=0.7)
1023 if flagged_list:
1024 flagged_text = f"Flagged channels:\n(χ²/dof > {chi2_threshold:.4g})\n"
1025 for entry in flagged_list:
1026 flagged_text += f"Ch {entry['channel']}: {entry['chi2_dof']:.1f}\n"
1028 ax.axvline(x=chi2_threshold, color="black", linestyle="--", linewidth=2)
1029 ax.text(
1030 0.84,
1031 0.98,
1032 flagged_text,
1033 transform=ax.transAxes,
1034 fontsize=10,
1035 verticalalignment="top",
1036 horizontalalignment="left",
1037 bbox=dict(boxstyle="round", facecolor="white", alpha=0.8),
1038 )
1040 ax.set_xscale("log")
1041 ax.set_yscale("log")
1043 ax.set_xlabel("χ²/dof")
1044 ax.set_ylabel("Count")
1045 ax.set_title(
1046 helper.plot_summary_string(
1047 board_id=board_id,
1048 run_numbers=run_number,
1049 channels="All",
1050 gain=gain,
1051 name="χ²/dof Histogram",
1052 pas_mode=pas_mode,
1053 info=info,
1054 )
1055 )
1056 ax.grid(True)
1058 fig.tight_layout()
1059 plt.savefig(plot_dir / f"{gain}_board_{board_id}_chi2_hist.png")
1060 plt.cla()
1061 plt.clf()
1062 plt.close()