Coverage for polars_analysis / cli_pulse.py: 78%

192 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2from itertools import product 

3from pathlib import Path 

4from typing import Annotated, Optional 

5 

6import polars as pl 

7import typer 

8 

9from polars_analysis import data_sources 

10from polars_analysis.plotting import pulse_plotting as plotting 

11 

12# Instantiate logger 

13log = logging.getLogger(__name__) 

14 

15app = typer.Typer( 

16 no_args_is_help=True, 

17 help="Remake specific pulse plots from existing derived values", 

18) 

19 

20 

21@app.command("pulse-mean-rms", no_args_is_help=True) 

22def load_plot_pulse_mean_rms( 

23 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

24 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

25 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

26 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

27 "plots" 

28 ), 

29 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

30): 

31 """ 

32 pulse mean and RMS for each channel. 

33 """ 

34 loader: data_sources.DataSource 

35 if str(data_dir)[-8:] == ".parquet": 

36 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

37 loader = data_sources.ParquetSource(data_dir, derived_dir) 

38 elif postgres_uri: 

39 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

40 else: 

41 loader = data_sources.DeltaSource(data_dir, derived_dir) 

42 raw_df = loader.load_raw_data(run_number) 

43 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

44 

45 plotting.plot_pulse_means_rms(derived_df, plot_dir, raw_df["channel"].unique().to_list()) 

46 

47 

48@app.command("pulse-overlay-all", no_args_is_help=True) 

49def load_plot_pulse_overlay_all( 

50 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

51 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

52 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

53 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

54 "plots" 

55 ), 

56 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

57): 

58 """ 

59 pulse overlay for all channels. 

60 """ 

61 loader: data_sources.DataSource 

62 if str(data_dir)[-8:] == ".parquet": 

63 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

64 loader = data_sources.ParquetSource(data_dir, derived_dir) 

65 elif postgres_uri: 

66 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

67 else: 

68 loader = data_sources.DeltaSource(data_dir, derived_dir) 

69 raw_df = loader.load_raw_data(run_number) 

70 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

71 

72 for channel in raw_df["channel"].unique(): 

73 filtered_df = derived_df.filter(pl.col("channel") == channel).with_columns( 

74 board_id=pl.lit(raw_df["board_id"][0]) 

75 ) 

76 plotting.plot_pulse_overlay_all(filtered_df, channel, plot_dir) 

77 

78 

79@app.command("gain-ratios", no_args_is_help=True) 

80def load_plot_gain_ratios( 

81 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

82 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

83 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

84 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

85 "plots" 

86 ), 

87 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

88): 

89 """ 

90 gain ratios for each channel. 

91 """ 

92 loader: data_sources.DataSource 

93 if str(data_dir)[-8:] == ".parquet": 

94 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

95 loader = data_sources.ParquetSource(data_dir, derived_dir) 

96 elif postgres_uri: 

97 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

98 else: 

99 loader = data_sources.DeltaSource(data_dir, derived_dir) 

100 raw_df = loader.load_raw_data(run_number) 

101 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

102 

103 for channel in raw_df["channel"].unique(): 

104 filtered_df = derived_df.filter(pl.col("channel") == channel).with_columns( 

105 board_id=pl.lit(raw_df["board_id"][0]) 

106 ) 

107 plotting.plot_gain_ratios(filtered_df, channel, plot_dir) 

108 

109 

110@app.command("pulse-gain-overlay", no_args_is_help=True) 

111def load_plot_pulse_gain_overlay( 

112 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

113 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

114 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

115 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

116 "plots" 

117 ), 

118 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

119): 

120 """ 

121 pulse gain overlay for each channel and amplitude. 

122 """ 

123 loader: data_sources.DataSource 

124 if str(data_dir)[-8:] == ".parquet": 

125 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

126 loader = data_sources.ParquetSource(data_dir, derived_dir) 

127 elif postgres_uri: 

128 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

129 else: 

130 loader = data_sources.DeltaSource(data_dir, derived_dir) 

131 raw_df = loader.load_raw_data(run_number) 

132 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

133 

134 for channel, amp in product( 

135 raw_df["channel"].unique(), 

136 derived_df["amp"].unique(), 

137 ): 

138 filtered_df = derived_df.filter(pl.col("channel") == channel).with_columns( 

139 board_id=pl.lit(raw_df["board_id"][0]) 

140 ) 

141 plotting.plot_pulse_gain_overlay(filtered_df.filter(amp=amp), channel, plot_dir) 

142 

143 

144@app.command("energy-resolution", no_args_is_help=True) 

145def load_plot_energy_resolution( 

146 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

147 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

148 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

149 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

150 "plots" 

151 ), 

152 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

153): 

154 """ 

155 energy resolution for each channel and gain. 

156 """ 

157 loader: data_sources.DataSource 

158 if str(data_dir)[-8:] == ".parquet": 

159 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

160 loader = data_sources.ParquetSource(data_dir, derived_dir) 

161 elif postgres_uri: 

162 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

163 else: 

164 loader = data_sources.DeltaSource(data_dir, derived_dir) 

165 raw_df = loader.load_raw_data(run_number) 

166 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

167 

168 for channel, gain in product( 

169 raw_df["channel"].unique(), 

170 ["lo", "hi"], 

171 ): 

172 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

173 board_id=pl.lit(raw_df["board_id"][0]) 

174 ) 

175 plotting.plot_energy_resolution(filtered_df, channel, plot_dir) 

176 plotting.plot_energy_resolution(filtered_df, channel, plot_dir, plot_log_scale=True) 

177 

178 

179@app.command("sigma-e", no_args_is_help=True) 

180def load_plot_sigma_e( 

181 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

182 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

183 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

184 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

185 "plots" 

186 ), 

187 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

188): 

189 """ 

190 sigma-E for each channel and gain. 

191 """ 

192 loader: data_sources.DataSource 

193 if str(data_dir)[-8:] == ".parquet": 

194 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

195 loader = data_sources.ParquetSource(data_dir, derived_dir) 

196 elif postgres_uri: 

197 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

198 else: 

199 loader = data_sources.DeltaSource(data_dir, derived_dir) 

200 raw_df = loader.load_raw_data(run_number) 

201 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

202 

203 for channel, gain in product( 

204 raw_df["channel"].unique(), 

205 ["lo", "hi"], 

206 ): 

207 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

208 board_id=pl.lit(raw_df["board_id"][0]) 

209 ) 

210 plotting.plot_sigma_e(filtered_df, channel, plot_dir) 

211 

212 

213@app.command("timing-resolution", no_args_is_help=True) 

214def load_plot_sigma_T( 

215 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

216 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

217 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

218 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

219 "plots" 

220 ), 

221 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

222): 

223 """ 

224 timing resolution for each channel and gain. 

225 """ 

226 loader: data_sources.DataSource 

227 if str(data_dir)[-8:] == ".parquet": 

228 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

229 loader = data_sources.ParquetSource(data_dir, derived_dir) 

230 elif postgres_uri: 

231 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

232 else: 

233 loader = data_sources.DeltaSource(data_dir, derived_dir) 

234 raw_df = loader.load_raw_data(run_number) 

235 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

236 

237 for channel, gain in product( 

238 raw_df["channel"].unique(), 

239 ["lo", "hi"], 

240 ): 

241 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

242 board_id=pl.lit(raw_df["board_id"][0]) 

243 ) 

244 plotting.plot_sigma_T(filtered_df, channel, plot_dir) 

245 plotting.plot_sigma_T(filtered_df, channel, plot_dir, plot_log_scale=True) 

246 

247 

248@app.command("timing-mean", no_args_is_help=True) 

249def load_plot_timing_mean( 

250 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

251 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

252 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

253 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

254 "plots" 

255 ), 

256 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

257): 

258 """ 

259 timing mean for each channel and gain. 

260 """ 

261 loader: data_sources.DataSource 

262 if str(data_dir)[-8:] == ".parquet": 

263 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

264 loader = data_sources.ParquetSource(data_dir, derived_dir) 

265 elif postgres_uri: 

266 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

267 else: 

268 loader = data_sources.DeltaSource(data_dir, derived_dir) 

269 raw_df = loader.load_raw_data(run_number) 

270 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

271 

272 for channel, gain in product( 

273 raw_df["channel"].unique(), 

274 ["lo", "hi"], 

275 ): 

276 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

277 board_id=pl.lit(raw_df["board_id"][0]) 

278 ) 

279 plotting.plot_timing_mean(filtered_df, channel, plot_dir) 

280 

281 

282@app.command("risetime", no_args_is_help=True) 

283def load_plot_risetime( 

284 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

285 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

286 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

287 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

288 "plots" 

289 ), 

290 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

291): 

292 """ 

293 risetime for each channel and gain. 

294 """ 

295 loader: data_sources.DataSource 

296 if str(data_dir)[-8:] == ".parquet": 

297 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

298 loader = data_sources.ParquetSource(data_dir, derived_dir) 

299 elif postgres_uri: 

300 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

301 else: 

302 loader = data_sources.DeltaSource(data_dir, derived_dir) 

303 raw_df = loader.load_raw_data(run_number) 

304 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

305 

306 for channel, gain in product( 

307 raw_df["channel"].unique(), 

308 ["lo", "hi"], 

309 ): 

310 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

311 board_id=pl.lit(raw_df["board_id"][0]) 

312 ) 

313 plotting.plot_risetime(filtered_df, channel, plot_dir) 

314 

315 

316@app.command("autocorr", no_args_is_help=True) 

317def load_plot_autocorrelation( 

318 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

319 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

320 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

321 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

322 "plots" 

323 ), 

324 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

325): 

326 """ 

327 autocorrelation for each channel and gain. 

328 """ 

329 loader: data_sources.DataSource 

330 if str(data_dir)[-8:] == ".parquet": 

331 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

332 loader = data_sources.ParquetSource(data_dir, derived_dir) 

333 elif postgres_uri: 

334 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

335 else: 

336 loader = data_sources.DeltaSource(data_dir, derived_dir) 

337 raw_df = loader.load_raw_data(run_number) 

338 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

339 

340 for channel, gain in product( 

341 raw_df["channel"].unique(), 

342 ["lo", "hi"], 

343 ): 

344 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

345 board_id=pl.lit(raw_df["board_id"][0]) 

346 ) 

347 plotting.plot_autocorrelation(filtered_df, channel, plot_dir) 

348 

349 

350@app.command("inl", no_args_is_help=True) 

351def load_plot_INL( 

352 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

353 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

354 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

355 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

356 "plots" 

357 ), 

358 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

359): 

360 """ 

361 integral non-linearity for each channel and gain. 

362 """ 

363 loader: data_sources.DataSource 

364 if str(data_dir)[-8:] == ".parquet": 

365 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

366 loader = data_sources.ParquetSource(data_dir, derived_dir) 

367 elif postgres_uri: 

368 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

369 else: 

370 loader = data_sources.DeltaSource(data_dir, derived_dir) 

371 raw_df = loader.load_raw_data(run_number) 

372 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

373 

374 for channel, gain in product( 

375 raw_df["channel"].unique(), 

376 ["lo", "hi"], 

377 ): 

378 filtered_df: pl.DataFrame = derived_df.filter(channel=channel, gain=gain).with_columns( 

379 board_id=pl.lit(raw_df["board_id"][0]) 

380 ) 

381 plotting.plot_INL(filtered_df, channel, plot_dir) 

382 

383 

384@app.command("ofc-samples", no_args_is_help=True) 

385def load_plot_ofc_samples( 

386 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

387 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

388 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

389 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

390 "plots" 

391 ), 

392 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

393): 

394 """ 

395 OFC samples for each channel, gain, and amplitude. 

396 """ 

397 loader: data_sources.DataSource 

398 if str(data_dir)[-8:] == ".parquet": 

399 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

400 loader = data_sources.ParquetSource(data_dir, derived_dir) 

401 elif postgres_uri: 

402 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

403 else: 

404 loader = data_sources.DeltaSource(data_dir, derived_dir) 

405 raw_df = loader.load_raw_data(run_number) 

406 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

407 

408 for channel, gain, amp in product( 

409 raw_df["channel"].unique(), 

410 ["lo", "hi"], 

411 derived_df["amp"].unique(), 

412 ): 

413 filtered_df = derived_df.filter(channel=channel, gain=gain, amp=amp).with_columns( 

414 board_id=pl.lit(raw_df["board_id"][0]) 

415 ) 

416 plotting.plot_ofc_samples(filtered_df, channel, plot_dir) 

417 

418 

419@app.command("timing-hist", no_args_is_help=True) 

420def load_plot_timing_hist( 

421 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

422 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

423 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

424 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

425 "plots" 

426 ), 

427 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

428): 

429 """ 

430 timing histogram for each channel, gain, and amplitude. 

431 """ 

432 loader: data_sources.DataSource 

433 if str(data_dir)[-8:] == ".parquet": 

434 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

435 loader = data_sources.ParquetSource(data_dir, derived_dir) 

436 elif postgres_uri: 

437 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

438 else: 

439 loader = data_sources.DeltaSource(data_dir, derived_dir) 

440 raw_df = loader.load_raw_data(run_number) 

441 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

442 

443 for channel, gain, amp in product( 

444 raw_df["channel"].unique(), 

445 ["lo", "hi"], 

446 derived_df["amp"].unique(), 

447 ): 

448 filtered_df = derived_df.filter(channel=channel, gain=gain, amp=amp).with_columns( 

449 board_id=pl.lit(raw_df["board_id"][0]) 

450 ) 

451 plotting.plot_timing_hist(filtered_df, channel, plot_dir) 

452 

453 

454@app.command("energy-hist", no_args_is_help=True) 

455def load_plot_energy_hist( 

456 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

457 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

458 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

459 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

460 "plots" 

461 ), 

462 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

463): 

464 """ 

465 energy histogram for each channel, gain, and amplitude. 

466 """ 

467 loader: data_sources.DataSource 

468 if str(data_dir)[-8:] == ".parquet": 

469 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

470 loader = data_sources.ParquetSource(data_dir, derived_dir) 

471 elif postgres_uri: 

472 loader = data_sources.SQLSource(postgres_uri, derived_dir) 

473 else: 

474 loader = data_sources.DeltaSource(data_dir, derived_dir) 

475 raw_df = loader.load_raw_data(run_number) 

476 derived_df = loader.load_derived_data(run_number=run_number, meas_type="pulse") 

477 

478 for channel, gain, amp in product( 

479 raw_df["channel"].unique(), 

480 ["lo", "hi"], 

481 derived_df["amp"].unique(), 

482 ): 

483 filtered_df = derived_df.filter(channel=channel, gain=gain, amp=amp).with_columns( 

484 board_id=pl.lit(raw_df["board_id"][0]) 

485 ) 

486 plotting.plot_energy_hist(filtered_df, channel, plot_dir)