Coverage for polars_analysis / plotting / linearity_plotting.py: 98%
202 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import logging
2from pathlib import Path
4import matplotlib.pyplot as plt
5import numpy as np
6import polars as pl
7import seaborn as sns
8import seaborn.objects as so
9from scipy import stats
10from scipy.optimize import curve_fit
12from polars_analysis.plotting import helper
14log = logging.getLogger(__name__)
16# Otherwise it doesn't like using numbers as categories
17plt.set_loglevel("WARNING")
20def linearity_plot(
21 df: pl.DataFrame, plot_dir: Path, full_range: bool = True, plot_log_scale: bool = True, use_scaled_amp: bool = True
22):
23 """
24 Produces two plots on one axis showing the linearity of pulse height vs input current.
25 The left plot is a simple plot with a line of best fit, and the right plot is the integral
26 non-linearity (INL) of measured values vs values predicted by the fit
27 """
28 if use_scaled_amp:
29 df = df.with_columns(amp=pl.col("amp") * pl.col("scale_factor"))
31 num_colors = len(df["run_number"].unique())
32 cmap = plt.get_cmap("gist_rainbow")
33 colors = [cmap(i / num_colors) for i in range(num_colors)]
35 for gain in ["lo", "hi"]:
36 filtered_df = df.filter(gain=gain)
37 amp = filtered_df["amp"].to_numpy()
38 energy_mean = filtered_df["energy_mean"].to_numpy()
39 d_mu = filtered_df["d_mu"].to_numpy()
40 popt, pcov = curve_fit(
41 helper.lin,
42 amp,
43 energy_mean,
44 p0=[energy_mean[1] / amp[1], 0],
45 sigma=d_mu,
46 absolute_sigma=True,
47 )
48 m, b, dm, db = popt[0], popt[1], np.sqrt(pcov[0][0]), np.sqrt(pcov[1][1])
50 _, ax = plt.subplots(1, 2, figsize=(15, 5))
51 ax1, ax2 = ax[0], ax[1]
53 xspace: np.ndarray = np.linspace(0, max(amp) + 0.5 * max(amp), 500)
54 ax1.plot(xspace, m * xspace + b, "r-", label="Fit")
56 ax2.axvline(np.amax(amp))
57 ax2.annotate(
58 f"Max DR Injector Current: {round(max(amp), 2)}",
59 (0.6, 0.8),
60 xycoords="axes fraction",
61 color="b",
62 )
64 text = f"m = {m:1f} $\\pm$ {dm:.4f}\nb = {b:.1f} $\\pm$ {db:.4f}"
65 ax1.text(
66 0.1,
67 0.9,
68 text,
69 horizontalalignment="left",
70 verticalalignment="top",
71 transform=ax1.transAxes,
72 )
74 for i, run_number in enumerate(df["run_number"].unique().sort()):
75 if filtered_df.filter(run_number=run_number).is_empty():
76 continue
77 run_amps = filtered_df.filter(run_number=run_number)["amp"].to_numpy()
78 run_energy_means = filtered_df.filter(run_number=run_number)["energy_mean"].to_numpy()
79 run_d_mu = filtered_df.filter(run_number=run_number)["d_mu"].to_numpy()
80 run_y_pred: np.ndarray = helper.lin(run_amps, *popt)
82 INL: np.ndarray = 100 * (run_energy_means - run_y_pred) / 2**15
83 error: np.ndarray = 100 * np.sqrt((run_d_mu) ** 2 + (run_amps * dm) ** 2 + (db) ** 2) / 2**15
85 ax1.errorbar(
86 run_amps,
87 run_energy_means,
88 yerr=run_d_mu,
89 fmt="o",
90 color=colors[i],
91 )
92 ax2.errorbar(
93 run_amps,
94 INL,
95 yerr=error,
96 fmt="o",
97 color=colors[i],
98 label=(f"{gain} {filtered_df.filter(run_number=run_number)['att_val'][0]}dB"),
99 )
100 board_id = filtered_df["board_id"][0]
101 channel = filtered_df["channel"][0]
102 run_numbers = filtered_df["run_number"].unique().to_list()
103 summary_plot_string = f"board_id: {board_id}, channel: {channel}, runs: {run_numbers}, gain: {gain}"
105 ax1.set(ylabel="Pulse Height [ADC Counts]", xlabel="Input Current [mA]")
106 ax1.set_xlim(0, max(amp) * 1.3)
107 ax1.set_ylim(0, max(energy_mean) * 1.3)
109 ax2.set(ylabel="INL %", xlabel="Input Current [mA]")
110 if full_range:
111 if gain == "hi":
112 ax2.axhline(y=0.2, color="grey", label="0.2% limit for HG Full Range INL")
113 ax2.axhline(y=-0.2, color="grey")
114 else:
115 ax2.axhline(y=5, color="grey", label="5% limit for LG Full Range INL")
116 ax2.axhline(y=-5, color="grey")
117 else:
118 ax2.axhline(y=0.5, color="grey", label="0.5% limit for LG 80% INL")
119 ax2.axhline(y=-0.5, color="grey")
121 if plot_log_scale:
122 ax1.set_xlim(0.9 * min(amp), max(amp) * 1.3)
123 ax1.set_ylim(0.01, max(energy_mean) * 10)
124 ax1.set_xscale("log")
125 ax1.set_yscale("log")
126 ax2.set_xscale("log")
128 ax2.axhline(y=0, color="r", linestyle="-")
130 ax1.grid()
131 ax2.grid()
132 plt.legend(loc="upper left")
133 plt.tight_layout(pad=5.0)
135 pas_mode = filtered_df["pas_mode"][0]
136 pas_mode_string = f"{pas_mode}Ω" if pas_mode in [25, 50] else f"PS gain {pas_mode}"
137 if full_range:
138 plt.suptitle(f" Multi Run Linearity (Fit to Full DR) for {pas_mode_string} \n {summary_plot_string}")
139 plot_filename = f"Full_DR_Linearity_plot_{gain}.png"
140 if plot_log_scale:
141 plot_filename = plot_filename.replace(".png", "_log.png")
142 log.debug(f"Saving {plot_dir / plot_filename}")
143 plt.savefig(plot_dir / plot_filename)
145 elif not full_range and gain == "lo":
146 plt.suptitle(f"Multi Run Linearity (Fit to 80 % DR) for {pas_mode_string} \n {summary_plot_string}")
147 plot_filename = f"80_DR_Linearity_plot_{gain}.png"
148 if plot_log_scale:
149 plot_filename = plot_filename.replace(".png", "_log.png")
150 log.debug(f"Saving {plot_dir / plot_filename}")
151 plt.savefig(plot_dir / plot_filename)
153 ax1.cla()
154 ax2.cla()
156 plt.close()
157 plt.cla()
158 plt.clf()
161def energy_resolution(
162 df: pl.DataFrame,
163 plot_dir: Path,
164 awg_noise_subtracted: bool = False,
165 plot_log_scale: bool = True,
166):
167 """
168 Plots sigma_E / E vs input current across gains and attenuations.
169 Can subtract the noise from the AWG calculated based on attenuation, in which case
170 it is a plot of (sigma_E - AWG_noise) / E, with the appropriate uncertainties.
171 """
172 e_arr = pl.col("energy_mean")
173 dE_arr = pl.col("energy_std")
174 d_sigma = pl.col("d_sigma")
175 d_mu = pl.col("d_mu")
176 N = pl.col("awg_noise")
177 dN = pl.col("awg_noise_error")
179 if awg_noise_subtracted:
180 f = (dE_arr**2 - N**2).sqrt()
181 y_err = ((dE_arr * d_sigma / f) ** 2 + (f * d_mu / e_arr) ** 2 + (N * dN / f) ** 2).sqrt() / e_arr
182 y_val = f / e_arr
183 y_label = r"($\sigma_{E} - \sigma_{AWG})/ E$"
184 else:
185 y_val = dE_arr / e_arr
186 y_err = (d_sigma.pow(2) + (dE_arr * d_mu / e_arr).pow(2)).sqrt() / e_arr
187 y_label = r"$\sigma_E / E$"
189 df = df.with_columns(
190 amp=pl.col("amp") * pl.col("scale_factor"),
191 att_val_cat=pl.col("att_val").cast(pl.String),
192 y=y_val,
193 y_err=y_err.replace(np.inf, 0),
194 ).with_columns(
195 ymax=pl.col("y") + pl.col("y_err"),
196 ymin=pl.col("y") - pl.col("y_err"),
197 )
199 num_colors = df["att_val_cat"].unique().len()
200 cmap = plt.get_cmap("gist_rainbow")
201 colors = [cmap(i / num_colors) for i in range(num_colors)]
203 theme_dict = {**sns.axes_style("whitegrid"), "grid.linestyle": ":"}
204 p = (
205 so.Plot(
206 data=df.to_pandas(),
207 x="amp",
208 y="y",
209 color="att_val_cat",
210 ymin="ymin",
211 ymax="ymax",
212 marker="gain",
213 )
214 .add(so.Dots(fillalpha=1))
215 .scale(
216 color=colors, # type: ignore
217 marker=["o", "s"], # type: ignore
218 )
219 .theme(theme_dict)
220 )
222 if awg_noise_subtracted:
223 filename = "energy_res_subtracted"
224 else:
225 filename = "energy_res"
227 if plot_log_scale:
228 p = p.scale(x="log", y="log") # type: ignore
229 filename += "_log"
231 board_id = df["board_id"][0]
232 channel = df["channel"][0]
233 att_val_string = ", ".join(df["att_val"].unique().cast(pl.String).sort().to_list())
234 run_numbers = df["run_number"].unique().sort().cast(pl.String).to_list()
236 title = f"Energy Resolution for Board ID {board_id}, Channel {channel}\nAttenuation Values {att_val_string} dB"
237 title += f"\nRuns {', '.join(str(r) for r in run_numbers)}"
238 p = p.label(
239 x="Input Current [mA]",
240 y=y_label,
241 color="Att. [dB]",
242 title=title,
243 )
245 file = f"{filename}.png"
247 log.debug(f"Saving {plot_dir / file}")
248 p.save(plot_dir / file, bbox_inches="tight")
251def energy_range_resolution(
252 df: pl.DataFrame,
253 plot_dir: Path,
254 plot_log_scale: bool = True,
255):
256 """
257 Plots max(E) - min(E) / E vs input current across gains and attenuations.
258 """
259 e_arr = pl.col("energy_mean")
260 dE_arr = pl.col("energy_max") - pl.col("energy_min")
261 d_mu = pl.col("d_mu")
263 y_val = dE_arr / e_arr
264 y_err = (dE_arr / e_arr) * (d_mu / e_arr)
265 y_label = r"(E_max - E_min) / E"
267 df = df.with_columns(
268 amp=pl.col("amp") * pl.col("scale_factor"),
269 att_val_cat=pl.col("att_val").cast(pl.String),
270 y=y_val,
271 y_err=y_err.replace(np.inf, 0),
272 ).with_columns(
273 ymax=pl.col("y") + pl.col("y_err"),
274 ymin=pl.col("y") - pl.col("y_err"),
275 )
277 num_colors = df["att_val_cat"].unique().len()
278 cmap = plt.get_cmap("gist_rainbow")
279 colors = [cmap(i / num_colors) for i in range(num_colors)]
281 theme_dict = {**sns.axes_style("whitegrid"), "grid.linestyle": ":"}
282 p = (
283 so.Plot(
284 data=df.to_pandas(),
285 x="amp",
286 y="y",
287 color="att_val_cat",
288 ymin="ymin",
289 ymax="ymax",
290 marker="gain",
291 )
292 .add(so.Dots(fillalpha=1))
293 .scale(color=colors, marker=["o", "s"]) # type: ignore
294 .theme(theme_dict)
295 )
297 filename = "energy_range_res"
299 if plot_log_scale:
300 p = p.scale(x="log", y="log") # type: ignore
301 filename += "_log"
303 board_id = df["board_id"][0]
304 channel = df["channel"][0]
305 att_val_string = ", ".join(df["att_val"].unique().cast(pl.String).sort().to_list())
306 run_numbers = df["run_number"].unique().sort().cast(pl.String).to_list()
308 title = (
309 f"Energy Range Resolution for Board ID {board_id}, Channel {channel}\nAttenuation Values {att_val_string} dB"
310 )
311 title += f"\nRuns {', '.join(str(r) for r in run_numbers)}"
312 p = p.label(
313 x="Input Current [mA]",
314 y=y_label,
315 color="Att. [dB]",
316 title=title,
317 )
319 file = f"{filename}.png"
321 log.debug(f"Saving {plot_dir / file}")
322 p.save(plot_dir / file, bbox_inches="tight")
325#####AWG Noise fitting #######
326def quad_sum_noise(x, N, B):
327 """
328 Function of measured baseline noise vs attenuation (x).
329 N is the noise from the AWG.
330 B is the noise from the ADC/FEB2 itself.
331 """
332 return ((N * 10 ** (-x / 20)) ** 2 + B**2) ** (1 / 2)
335def fit_noise(x, y, gain):
336 """
337 Helper function for fitting AWG noise with reasonable initial parameters
338 """
339 if gain == "hi":
340 p0 = [24, 10]
341 else:
342 p0 = [24, 3]
343 pars, cov = curve_fit(quad_sum_noise, x, y, p0, bounds=(-np.inf, np.inf))
344 N = pars[0]
345 B = pars[1]
346 [dN, dB] = np.sqrt(np.diag(cov))
347 return (N, dN, B, dB)
350def awg_noise_fit_plot(df: pl.DataFrame, plot_dir: Path, att_val_name: str = "att_val") -> pl.DataFrame:
351 """
352 Fit baseline RMS vs attenuation to determine the portion of the noise due to the AWG.
353 Also produces a plot of the fitted curve compared to data.
355 Adds columns `awg_noise` and `awg_noise_error`, both floats
356 """
357 noise_dict = {}
358 dnoise_dict = {}
359 df = (
360 df.lazy()
361 .with_columns(pl.col("samples_interleaved").list.first().list.tail(500).alias("baseline"))
362 .with_columns(
363 pl.col("baseline").first().over(att_val_name, "gain").list.std().alias("baseline_std"),
364 pl.col("baseline").first().over(att_val_name, "gain").list.len().alias("baseline_len"),
365 )
366 .collect()
367 )
368 board_id = df["board_id"][0]
369 channel = df["channel"][0]
370 for gain in ["lo", "hi"]:
371 filtered_df = df.filter(gain=gain)
372 if filtered_df.is_empty():
373 noise_dict[gain] = 0
374 dnoise_dict[gain] = 0
375 continue
376 att_val = filtered_df[att_val_name].to_numpy()
377 baseline_std = filtered_df["baseline_std"].to_numpy()
378 baseline_len = filtered_df["baseline_len"].to_numpy()
380 _, ax = plt.subplots()
381 ax.errorbar(
382 x=att_val,
383 y=baseline_std,
384 yerr=baseline_std / np.sqrt(baseline_len),
385 marker="o",
386 color="black",
387 ls="none",
388 )
390 N, dN, B, dB = fit_noise(att_val, baseline_std, gain)
391 noise_dict[gain] = N
392 dnoise_dict[gain] = dN
394 y_exp = quad_sum_noise(att_val, N, B)
395 # To make chisquare frequencies sum to same thing
396 y_exp = y_exp / sum(y_exp) * sum(baseline_std)
397 chisq, _ = stats.chisquare(baseline_std, y_exp)
399 x = np.linspace(0, att_val.max(), 100)
400 ax.plot(
401 x,
402 quad_sum_noise(x, N, B),
403 linestyle="--",
404 linewidth=2,
405 color="red",
406 label=(
407 r"$y=\sqrt{(N \times 10^{-x/20})^2 + B^2}$"
408 + f"\nN = {N:.02g} ± {dN:.02g}"
409 + f"\nB = {B:.02g} ± {dB:.02g}"
410 + ("\n" + r"$\chi^2$" + f" = {chisq:.03g}")
411 ),
412 )
413 ax.legend(loc="upper right")
414 ax.set(xlabel="Attenuation [dB]", ylabel="Baseline RMS [ADC counts]")
415 ax.set_title(f"Fit of AWG Noise -- Board ID {board_id}, Channel {channel}, {gain} gain")
417 ax.grid()
418 plt.tight_layout()
419 plt.savefig(plot_dir / f"awg_noise_fitted_{gain}_gain.png")
420 plt.cla()
421 plt.clf()
422 plt.close()
424 return df.drop(["baseline", "baseline_std", "baseline_len"]).with_columns(
425 awg_noise=pl.when(pl.col("gain") == "hi")
426 .then(noise_dict["hi"] * 10 ** (-pl.col(att_val_name) / 20))
427 .otherwise(noise_dict["lo"] * 10 ** (-pl.col(att_val_name) / 20)),
428 awg_noise_error=pl.when(pl.col("gain") == "hi")
429 .then(dnoise_dict["hi"] * 10 ** (-pl.col(att_val_name) / 20))
430 .otherwise(dnoise_dict["lo"] * 10 ** (-pl.col(att_val_name) / 20)),
431 )