Coverage for polars_analysis / plotting / linearity_plotting.py: 98%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2from pathlib import Path 

3 

4import matplotlib.pyplot as plt 

5import numpy as np 

6import polars as pl 

7import seaborn as sns 

8import seaborn.objects as so 

9from scipy import stats 

10from scipy.optimize import curve_fit 

11 

12from polars_analysis.plotting import helper 

13 

14log = logging.getLogger(__name__) 

15 

16# Otherwise it doesn't like using numbers as categories 

17plt.set_loglevel("WARNING") 

18 

19 

20def linearity_plot( 

21 df: pl.DataFrame, plot_dir: Path, full_range: bool = True, plot_log_scale: bool = True, use_scaled_amp: bool = True 

22): 

23 """ 

24 Produces two plots on one axis showing the linearity of pulse height vs input current. 

25 The left plot is a simple plot with a line of best fit, and the right plot is the integral 

26 non-linearity (INL) of measured values vs values predicted by the fit 

27 """ 

28 if use_scaled_amp: 

29 df = df.with_columns(amp=pl.col("amp") * pl.col("scale_factor")) 

30 

31 num_colors = len(df["run_number"].unique()) 

32 cmap = plt.get_cmap("gist_rainbow") 

33 colors = [cmap(i / num_colors) for i in range(num_colors)] 

34 

35 for gain in ["lo", "hi"]: 

36 filtered_df = df.filter(gain=gain) 

37 amp = filtered_df["amp"].to_numpy() 

38 energy_mean = filtered_df["energy_mean"].to_numpy() 

39 d_mu = filtered_df["d_mu"].to_numpy() 

40 popt, pcov = curve_fit( 

41 helper.lin, 

42 amp, 

43 energy_mean, 

44 p0=[energy_mean[1] / amp[1], 0], 

45 sigma=d_mu, 

46 absolute_sigma=True, 

47 ) 

48 m, b, dm, db = popt[0], popt[1], np.sqrt(pcov[0][0]), np.sqrt(pcov[1][1]) 

49 

50 _, ax = plt.subplots(1, 2, figsize=(15, 5)) 

51 ax1, ax2 = ax[0], ax[1] 

52 

53 xspace: np.ndarray = np.linspace(0, max(amp) + 0.5 * max(amp), 500) 

54 ax1.plot(xspace, m * xspace + b, "r-", label="Fit") 

55 

56 ax2.axvline(np.amax(amp)) 

57 ax2.annotate( 

58 f"Max DR Injector Current: {round(max(amp), 2)}", 

59 (0.6, 0.8), 

60 xycoords="axes fraction", 

61 color="b", 

62 ) 

63 

64 text = f"m = {m:1f} $\\pm$ {dm:.4f}\nb = {b:.1f} $\\pm$ {db:.4f}" 

65 ax1.text( 

66 0.1, 

67 0.9, 

68 text, 

69 horizontalalignment="left", 

70 verticalalignment="top", 

71 transform=ax1.transAxes, 

72 ) 

73 

74 for i, run_number in enumerate(df["run_number"].unique().sort()): 

75 if filtered_df.filter(run_number=run_number).is_empty(): 

76 continue 

77 run_amps = filtered_df.filter(run_number=run_number)["amp"].to_numpy() 

78 run_energy_means = filtered_df.filter(run_number=run_number)["energy_mean"].to_numpy() 

79 run_d_mu = filtered_df.filter(run_number=run_number)["d_mu"].to_numpy() 

80 run_y_pred: np.ndarray = helper.lin(run_amps, *popt) 

81 

82 INL: np.ndarray = 100 * (run_energy_means - run_y_pred) / 2**15 

83 error: np.ndarray = 100 * np.sqrt((run_d_mu) ** 2 + (run_amps * dm) ** 2 + (db) ** 2) / 2**15 

84 

85 ax1.errorbar( 

86 run_amps, 

87 run_energy_means, 

88 yerr=run_d_mu, 

89 fmt="o", 

90 color=colors[i], 

91 ) 

92 ax2.errorbar( 

93 run_amps, 

94 INL, 

95 yerr=error, 

96 fmt="o", 

97 color=colors[i], 

98 label=(f"{gain} {filtered_df.filter(run_number=run_number)['att_val'][0]}dB"), 

99 ) 

100 board_id = filtered_df["board_id"][0] 

101 channel = filtered_df["channel"][0] 

102 run_numbers = filtered_df["run_number"].unique().to_list() 

103 summary_plot_string = f"board_id: {board_id}, channel: {channel}, runs: {run_numbers}, gain: {gain}" 

104 

105 ax1.set(ylabel="Pulse Height [ADC Counts]", xlabel="Input Current [mA]") 

106 ax1.set_xlim(0, max(amp) * 1.3) 

107 ax1.set_ylim(0, max(energy_mean) * 1.3) 

108 

109 ax2.set(ylabel="INL %", xlabel="Input Current [mA]") 

110 if full_range: 

111 if gain == "hi": 

112 ax2.axhline(y=0.2, color="grey", label="0.2% limit for HG Full Range INL") 

113 ax2.axhline(y=-0.2, color="grey") 

114 else: 

115 ax2.axhline(y=5, color="grey", label="5% limit for LG Full Range INL") 

116 ax2.axhline(y=-5, color="grey") 

117 else: 

118 ax2.axhline(y=0.5, color="grey", label="0.5% limit for LG 80% INL") 

119 ax2.axhline(y=-0.5, color="grey") 

120 

121 if plot_log_scale: 

122 ax1.set_xlim(0.9 * min(amp), max(amp) * 1.3) 

123 ax1.set_ylim(0.01, max(energy_mean) * 10) 

124 ax1.set_xscale("log") 

125 ax1.set_yscale("log") 

126 ax2.set_xscale("log") 

127 

128 ax2.axhline(y=0, color="r", linestyle="-") 

129 

130 ax1.grid() 

131 ax2.grid() 

132 plt.legend(loc="upper left") 

133 plt.tight_layout(pad=5.0) 

134 

135 pas_mode = filtered_df["pas_mode"][0] 

136 pas_mode_string = f"{pas_mode}Ω" if pas_mode in [25, 50] else f"PS gain {pas_mode}" 

137 if full_range: 

138 plt.suptitle(f" Multi Run Linearity (Fit to Full DR) for {pas_mode_string} \n {summary_plot_string}") 

139 plot_filename = f"Full_DR_Linearity_plot_{gain}.png" 

140 if plot_log_scale: 

141 plot_filename = plot_filename.replace(".png", "_log.png") 

142 log.debug(f"Saving {plot_dir / plot_filename}") 

143 plt.savefig(plot_dir / plot_filename) 

144 

145 elif not full_range and gain == "lo": 

146 plt.suptitle(f"Multi Run Linearity (Fit to 80 % DR) for {pas_mode_string} \n {summary_plot_string}") 

147 plot_filename = f"80_DR_Linearity_plot_{gain}.png" 

148 if plot_log_scale: 

149 plot_filename = plot_filename.replace(".png", "_log.png") 

150 log.debug(f"Saving {plot_dir / plot_filename}") 

151 plt.savefig(plot_dir / plot_filename) 

152 

153 ax1.cla() 

154 ax2.cla() 

155 

156 plt.close() 

157 plt.cla() 

158 plt.clf() 

159 

160 

161def energy_resolution( 

162 df: pl.DataFrame, 

163 plot_dir: Path, 

164 awg_noise_subtracted: bool = False, 

165 plot_log_scale: bool = True, 

166): 

167 """ 

168 Plots sigma_E / E vs input current across gains and attenuations. 

169 Can subtract the noise from the AWG calculated based on attenuation, in which case 

170 it is a plot of (sigma_E - AWG_noise) / E, with the appropriate uncertainties. 

171 """ 

172 e_arr = pl.col("energy_mean") 

173 dE_arr = pl.col("energy_std") 

174 d_sigma = pl.col("d_sigma") 

175 d_mu = pl.col("d_mu") 

176 N = pl.col("awg_noise") 

177 dN = pl.col("awg_noise_error") 

178 

179 if awg_noise_subtracted: 

180 f = (dE_arr**2 - N**2).sqrt() 

181 y_err = ((dE_arr * d_sigma / f) ** 2 + (f * d_mu / e_arr) ** 2 + (N * dN / f) ** 2).sqrt() / e_arr 

182 y_val = f / e_arr 

183 y_label = r"($\sigma_{E} - \sigma_{AWG})/ E$" 

184 else: 

185 y_val = dE_arr / e_arr 

186 y_err = (d_sigma.pow(2) + (dE_arr * d_mu / e_arr).pow(2)).sqrt() / e_arr 

187 y_label = r"$\sigma_E / E$" 

188 

189 df = df.with_columns( 

190 amp=pl.col("amp") * pl.col("scale_factor"), 

191 att_val_cat=pl.col("att_val").cast(pl.String), 

192 y=y_val, 

193 y_err=y_err.replace(np.inf, 0), 

194 ).with_columns( 

195 ymax=pl.col("y") + pl.col("y_err"), 

196 ymin=pl.col("y") - pl.col("y_err"), 

197 ) 

198 

199 num_colors = df["att_val_cat"].unique().len() 

200 cmap = plt.get_cmap("gist_rainbow") 

201 colors = [cmap(i / num_colors) for i in range(num_colors)] 

202 

203 theme_dict = {**sns.axes_style("whitegrid"), "grid.linestyle": ":"} 

204 p = ( 

205 so.Plot( 

206 data=df.to_pandas(), 

207 x="amp", 

208 y="y", 

209 color="att_val_cat", 

210 ymin="ymin", 

211 ymax="ymax", 

212 marker="gain", 

213 ) 

214 .add(so.Dots(fillalpha=1)) 

215 .scale( 

216 color=colors, # type: ignore 

217 marker=["o", "s"], # type: ignore 

218 ) 

219 .theme(theme_dict) 

220 ) 

221 

222 if awg_noise_subtracted: 

223 filename = "energy_res_subtracted" 

224 else: 

225 filename = "energy_res" 

226 

227 if plot_log_scale: 

228 p = p.scale(x="log", y="log") # type: ignore 

229 filename += "_log" 

230 

231 board_id = df["board_id"][0] 

232 channel = df["channel"][0] 

233 att_val_string = ", ".join(df["att_val"].unique().cast(pl.String).sort().to_list()) 

234 run_numbers = df["run_number"].unique().sort().cast(pl.String).to_list() 

235 

236 title = f"Energy Resolution for Board ID {board_id}, Channel {channel}\nAttenuation Values {att_val_string} dB" 

237 title += f"\nRuns {', '.join(str(r) for r in run_numbers)}" 

238 p = p.label( 

239 x="Input Current [mA]", 

240 y=y_label, 

241 color="Att. [dB]", 

242 title=title, 

243 ) 

244 

245 file = f"{filename}.png" 

246 

247 log.debug(f"Saving {plot_dir / file}") 

248 p.save(plot_dir / file, bbox_inches="tight") 

249 

250 

251def energy_range_resolution( 

252 df: pl.DataFrame, 

253 plot_dir: Path, 

254 plot_log_scale: bool = True, 

255): 

256 """ 

257 Plots max(E) - min(E) / E vs input current across gains and attenuations. 

258 """ 

259 e_arr = pl.col("energy_mean") 

260 dE_arr = pl.col("energy_max") - pl.col("energy_min") 

261 d_mu = pl.col("d_mu") 

262 

263 y_val = dE_arr / e_arr 

264 y_err = (dE_arr / e_arr) * (d_mu / e_arr) 

265 y_label = r"(E_max - E_min) / E" 

266 

267 df = df.with_columns( 

268 amp=pl.col("amp") * pl.col("scale_factor"), 

269 att_val_cat=pl.col("att_val").cast(pl.String), 

270 y=y_val, 

271 y_err=y_err.replace(np.inf, 0), 

272 ).with_columns( 

273 ymax=pl.col("y") + pl.col("y_err"), 

274 ymin=pl.col("y") - pl.col("y_err"), 

275 ) 

276 

277 num_colors = df["att_val_cat"].unique().len() 

278 cmap = plt.get_cmap("gist_rainbow") 

279 colors = [cmap(i / num_colors) for i in range(num_colors)] 

280 

281 theme_dict = {**sns.axes_style("whitegrid"), "grid.linestyle": ":"} 

282 p = ( 

283 so.Plot( 

284 data=df.to_pandas(), 

285 x="amp", 

286 y="y", 

287 color="att_val_cat", 

288 ymin="ymin", 

289 ymax="ymax", 

290 marker="gain", 

291 ) 

292 .add(so.Dots(fillalpha=1)) 

293 .scale(color=colors, marker=["o", "s"]) # type: ignore 

294 .theme(theme_dict) 

295 ) 

296 

297 filename = "energy_range_res" 

298 

299 if plot_log_scale: 

300 p = p.scale(x="log", y="log") # type: ignore 

301 filename += "_log" 

302 

303 board_id = df["board_id"][0] 

304 channel = df["channel"][0] 

305 att_val_string = ", ".join(df["att_val"].unique().cast(pl.String).sort().to_list()) 

306 run_numbers = df["run_number"].unique().sort().cast(pl.String).to_list() 

307 

308 title = ( 

309 f"Energy Range Resolution for Board ID {board_id}, Channel {channel}\nAttenuation Values {att_val_string} dB" 

310 ) 

311 title += f"\nRuns {', '.join(str(r) for r in run_numbers)}" 

312 p = p.label( 

313 x="Input Current [mA]", 

314 y=y_label, 

315 color="Att. [dB]", 

316 title=title, 

317 ) 

318 

319 file = f"{filename}.png" 

320 

321 log.debug(f"Saving {plot_dir / file}") 

322 p.save(plot_dir / file, bbox_inches="tight") 

323 

324 

325#####AWG Noise fitting ####### 

326def quad_sum_noise(x, N, B): 

327 """ 

328 Function of measured baseline noise vs attenuation (x). 

329 N is the noise from the AWG. 

330 B is the noise from the ADC/FEB2 itself. 

331 """ 

332 return ((N * 10 ** (-x / 20)) ** 2 + B**2) ** (1 / 2) 

333 

334 

335def fit_noise(x, y, gain): 

336 """ 

337 Helper function for fitting AWG noise with reasonable initial parameters 

338 """ 

339 if gain == "hi": 

340 p0 = [24, 10] 

341 else: 

342 p0 = [24, 3] 

343 pars, cov = curve_fit(quad_sum_noise, x, y, p0, bounds=(-np.inf, np.inf)) 

344 N = pars[0] 

345 B = pars[1] 

346 [dN, dB] = np.sqrt(np.diag(cov)) 

347 return (N, dN, B, dB) 

348 

349 

350def awg_noise_fit_plot(df: pl.DataFrame, plot_dir: Path, att_val_name: str = "att_val") -> pl.DataFrame: 

351 """ 

352 Fit baseline RMS vs attenuation to determine the portion of the noise due to the AWG. 

353 Also produces a plot of the fitted curve compared to data. 

354 

355 Adds columns `awg_noise` and `awg_noise_error`, both floats 

356 """ 

357 noise_dict = {} 

358 dnoise_dict = {} 

359 df = ( 

360 df.lazy() 

361 .with_columns(pl.col("samples_interleaved").list.first().list.tail(500).alias("baseline")) 

362 .with_columns( 

363 pl.col("baseline").first().over(att_val_name, "gain").list.std().alias("baseline_std"), 

364 pl.col("baseline").first().over(att_val_name, "gain").list.len().alias("baseline_len"), 

365 ) 

366 .collect() 

367 ) 

368 board_id = df["board_id"][0] 

369 channel = df["channel"][0] 

370 for gain in ["lo", "hi"]: 

371 filtered_df = df.filter(gain=gain) 

372 if filtered_df.is_empty(): 

373 noise_dict[gain] = 0 

374 dnoise_dict[gain] = 0 

375 continue 

376 att_val = filtered_df[att_val_name].to_numpy() 

377 baseline_std = filtered_df["baseline_std"].to_numpy() 

378 baseline_len = filtered_df["baseline_len"].to_numpy() 

379 

380 _, ax = plt.subplots() 

381 ax.errorbar( 

382 x=att_val, 

383 y=baseline_std, 

384 yerr=baseline_std / np.sqrt(baseline_len), 

385 marker="o", 

386 color="black", 

387 ls="none", 

388 ) 

389 

390 N, dN, B, dB = fit_noise(att_val, baseline_std, gain) 

391 noise_dict[gain] = N 

392 dnoise_dict[gain] = dN 

393 

394 y_exp = quad_sum_noise(att_val, N, B) 

395 # To make chisquare frequencies sum to same thing 

396 y_exp = y_exp / sum(y_exp) * sum(baseline_std) 

397 chisq, _ = stats.chisquare(baseline_std, y_exp) 

398 

399 x = np.linspace(0, att_val.max(), 100) 

400 ax.plot( 

401 x, 

402 quad_sum_noise(x, N, B), 

403 linestyle="--", 

404 linewidth=2, 

405 color="red", 

406 label=( 

407 r"$y=\sqrt{(N \times 10^{-x/20})^2 + B^2}$" 

408 + f"\nN = {N:.02g} ± {dN:.02g}" 

409 + f"\nB = {B:.02g} ± {dB:.02g}" 

410 + ("\n" + r"$\chi^2$" + f" = {chisq:.03g}") 

411 ), 

412 ) 

413 ax.legend(loc="upper right") 

414 ax.set(xlabel="Attenuation [dB]", ylabel="Baseline RMS [ADC counts]") 

415 ax.set_title(f"Fit of AWG Noise -- Board ID {board_id}, Channel {channel}, {gain} gain") 

416 

417 ax.grid() 

418 plt.tight_layout() 

419 plt.savefig(plot_dir / f"awg_noise_fitted_{gain}_gain.png") 

420 plt.cla() 

421 plt.clf() 

422 plt.close() 

423 

424 return df.drop(["baseline", "baseline_std", "baseline_len"]).with_columns( 

425 awg_noise=pl.when(pl.col("gain") == "hi") 

426 .then(noise_dict["hi"] * 10 ** (-pl.col(att_val_name) / 20)) 

427 .otherwise(noise_dict["lo"] * 10 ** (-pl.col(att_val_name) / 20)), 

428 awg_noise_error=pl.when(pl.col("gain") == "hi") 

429 .then(dnoise_dict["hi"] * 10 ** (-pl.col(att_val_name) / 20)) 

430 .otherwise(dnoise_dict["lo"] * 10 ** (-pl.col(att_val_name) / 20)), 

431 )