Coverage for polars_analysis / cli_pedestal.py: 91%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2import os 

3import sys 

4from itertools import product 

5from pathlib import Path 

6from typing import Annotated, Dict, List, Literal, Optional, cast 

7 

8import numpy as np 

9import polars as pl 

10import typer 

11 

12from polars_analysis import data_sources 

13from polars_analysis.analysis import constants 

14from polars_analysis.analysis import pedestal_analysis as analysis 

15from polars_analysis.data_sources import DeltaSource 

16from polars_analysis.plotting import board_summary_plotting 

17from polars_analysis.plotting import pedestal_plotting as plotting 

18from polars_analysis.plotting.helper import Metadata 

19 

20# Instantiate logger 

21log = logging.getLogger(__name__) 

22 

23app = typer.Typer( 

24 no_args_is_help=True, 

25 help="Remake specific pedestal plots from existing derived values", 

26) 

27 

28 

29@app.command("corr-matrix", no_args_is_help=True) 

30def load_plot_correlation_matrix( 

31 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

32 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

33 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

34 "plots" 

35 ), 

36 measurements: Annotated[List[int], typer.Option(help="measurement to plot")] = [0], 

37 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

38): 

39 """ 

40 Correlation matrix for a given run number. 

41 """ 

42 loader: data_sources.DataSource 

43 if str(data_dir)[-8:] == ".parquet": 

44 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

45 loader = data_sources.ParquetSource(data_dir, None) 

46 elif postgres_uri: 

47 loader = data_sources.SQLSource(postgres_uri, None) 

48 else: 

49 loader = data_sources.DeltaSource(data_dir, None) 

50 

51 run_plot_dir = plot_dir / f"run{run_number}" 

52 if not run_plot_dir.exists(): 

53 run_plot_dir.mkdir(parents=True, exist_ok=True) 

54 os.chmod(run_plot_dir, 0o775) 

55 

56 df = loader.load_raw_data(run_number) 

57 info = Metadata.fill_from_dataframe(df) 

58 gains: List[Literal["hi", "lo"]] = ["hi", "lo"] 

59 channels = set(df["channel"].unique().to_list()) 

60 for gain in gains: 

61 matrix = analysis.calc_correlation_matrix(df, measurements, gain) 

62 for min_channel, n_channels in zip( 

63 constants.PED_MIN_CHAN_LIST, 

64 constants.PED_N_CHAN_LIST, 

65 ): 

66 plot_channels = set(range(min_channel, min_channel + n_channels)) 

67 # Skip if there is no overlap between plot range and available channels 

68 if channels.isdisjoint(plot_channels): 

69 continue 

70 # Skip if data is fully covered by a smaller plot 

71 if analysis.next_power_of_2(len(channels)) < n_channels // 4: 

72 print(len(channels), n_channels) 

73 continue 

74 plotting.plot_correlation_matrix(matrix, gain, min_channel, n_channels, plot_dir, board_id=info.board_id) 

75 

76 

77@app.command("raw-hist", no_args_is_help=True) 

78def load_plot_raw_histogram( 

79 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

80 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

81 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

82 "plots" 

83 ), 

84 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0, 

85 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

86): 

87 """ 

88 Raw histogram for a given run number. 

89 """ 

90 loader: data_sources.DataSource 

91 if str(data_dir)[-8:] == ".parquet": 

92 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

93 loader = data_sources.ParquetSource(data_dir, None) 

94 elif postgres_uri: 

95 loader = data_sources.SQLSource(postgres_uri, None) 

96 else: 

97 loader = data_sources.DeltaSource(data_dir, None) 

98 

99 run_plot_dir = plot_dir / f"run{run_number}" 

100 if not run_plot_dir.exists(): 

101 run_plot_dir.mkdir(parents=True, exist_ok=True) 

102 os.chmod(run_plot_dir, 0o775) 

103 

104 df = loader.load_raw_data(run_number) 

105 df = df.filter(pl.col("measurement") == measurement) 

106 for row in df.iter_slices(n_rows=1): 

107 plotting.plot_hist( 

108 Metadata.fill_from_dataframe(row), 

109 row["samples"].to_numpy()[0], 

110 run_plot_dir, 

111 ) 

112 

113 

114@app.command("raw-data", no_args_is_help=True) 

115def load_plot_raw_data( 

116 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

117 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

118 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

119 "plots" 

120 ), 

121 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0, 

122 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

123): 

124 """ 

125 Raw data for a given run number. 

126 """ 

127 loader: data_sources.DataSource 

128 if str(data_dir)[-8:] == ".parquet": 

129 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

130 loader = data_sources.ParquetSource(data_dir, None) 

131 elif postgres_uri: 

132 loader = data_sources.SQLSource(postgres_uri, None) 

133 else: 

134 loader = data_sources.DeltaSource(data_dir, None) 

135 

136 run_plot_dir = plot_dir / f"run{run_number}" 

137 if not run_plot_dir.exists(): 

138 run_plot_dir.mkdir(parents=True, exist_ok=True) 

139 os.chmod(run_plot_dir, 0o775) 

140 

141 df = loader.load_raw_data(run_number) 

142 df = df.filter(pl.col("measurement") == measurement) 

143 for row in df.iter_slices(n_rows=1): 

144 print(row) 

145 plotting.plot_raw( 

146 Metadata.fill_from_dataframe(pl.DataFrame(row)), 

147 row["samples"].to_numpy()[0], 

148 run_plot_dir, 

149 ) 

150 

151 

152@app.command("baseline", no_args_is_help=True) 

153def load_plot_baseline( 

154 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

155 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

156 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

157 "plots" 

158 ), 

159 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0, 

160): 

161 """ 

162 Baseline mean and RMS for each channel. 

163 """ 

164 run_plot_dir = plot_dir / f"run{run_number}" 

165 if not run_plot_dir.exists(): 

166 run_plot_dir.mkdir(parents=True, exist_ok=True) 

167 os.chmod(run_plot_dir, 0o775) 

168 

169 loader = DeltaSource(None, derived_dir) 

170 df = loader.load_derived_data(run_number=run_number, meas_type="pedestal") 

171 df = df.filter(pl.col("measurement") == measurement) 

172 info = Metadata.fill_from_dataframe(df) 

173 means: Dict[str, np.ndarray] = {} 

174 stds: Dict[str, np.ndarray] = {} 

175 gains: List[Literal["lo", "hi"]] = ["lo", "hi"] 

176 for gain in gains: 

177 info.gain = gain 

178 means[gain] = df.filter(pl.col("gain") == gain).select(pl.col("mean")).to_series().to_numpy() 

179 stds[gain] = df.filter(pl.col("gain") == gain).select(pl.col("std")).to_series().to_numpy() 

180 

181 board_summary_plotting.plot_pedestal_mean_hist( 

182 means[gain][means[gain] != 0], 

183 plot_dir, 

184 info, 

185 show_cuts=False, 

186 ) 

187 

188 board_summary_plotting.plot_pedestal_rms_hist( 

189 stds[gain][stds[gain] != 0], 

190 plot_dir, 

191 info, 

192 show_cuts=False, 

193 ) 

194 

195 info = Metadata.fill_from_dataframe(df) 

196 plotting.plot_baseline_means_rms(df, plot_dir, board_id=info.board_id, info=info) 

197 

198 

199@app.command("autocorr", no_args_is_help=True) 

200def load_plot_autocorrelation( 

201 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

202 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

203 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

204 "plots" 

205 ), 

206 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0, 

207): 

208 """ 

209 Autocorrelation for a given run number. 

210 """ 

211 run_plot_dir = plot_dir / f"run{run_number}" 

212 if not run_plot_dir.exists(): 

213 run_plot_dir.mkdir(parents=True, exist_ok=True) 

214 os.chmod(run_plot_dir, 0o775) 

215 

216 loader = DeltaSource(None, derived_dir) 

217 df = loader.load_derived_data(run_number=run_number, meas_type="pedestal") 

218 df = df.filter(pl.col("measurement") == measurement) 

219 for row in df.iter_rows( 

220 named=True, 

221 ): 

222 df_filtered = df.filter( 

223 pl.col("run_number") == row["run_number"], 

224 pl.col("channel") == row["channel"], 

225 pl.col("gain") == row["gain"], 

226 ) 

227 plotting.plot_autocorrelation( 

228 row["run_number"], 

229 row["channel"], 

230 row["gain"], 

231 row["autocorr"], 

232 run_plot_dir, 

233 board_id=row["board_id"], 

234 info=Metadata.fill_from_dataframe(df_filtered), 

235 ) 

236 

237 

238@app.command("fft", no_args_is_help=True) 

239def load_plot_fft( 

240 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

241 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

242 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

243 "plots" 

244 ), 

245 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0, 

246): 

247 """ 

248 FFT for a given run number. 

249 """ 

250 run_plot_dir = plot_dir / f"run{run_number}" 

251 if not run_plot_dir.exists(): 

252 run_plot_dir.mkdir(parents=True, exist_ok=True) 

253 os.chmod(run_plot_dir, 0o775) 

254 

255 loader = DeltaSource(None, derived_dir) 

256 df = loader.load_derived_data(run_number=run_number, meas_type="pedestal") 

257 df = df.filter(pl.col("measurement") == measurement) 

258 for row in df.iter_rows( 

259 named=True, 

260 ): 

261 df_filtered = df.filter( 

262 pl.col("run_number") == row["run_number"], 

263 pl.col("channel") == row["channel"], 

264 pl.col("gain") == row["gain"], 

265 ) 

266 plotting.plot_fft( 

267 row["channel"], 

268 row["gain"], 

269 row["freq"], 

270 row["psd"], 

271 row["peaks"], 

272 run_plot_dir, 

273 run_number=run_number, 

274 board_id=row["board_id"], 

275 info=Metadata.fill_from_dataframe(df_filtered), 

276 ) 

277 

278 

279@app.command("coherence", no_args_is_help=True) 

280def load_plot_coherence( 

281 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

282 gain: Annotated[str, typer.Argument(help="hi or lo gain channels")], 

283 channel1: Annotated[int, typer.Argument(help="Channel 1.")], 

284 channels2: Annotated[List[int], typer.Argument(help="List of channels to compare with channel 1.")], 

285 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"), 

286 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

287 "plots" 

288 ), 

289 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None, 

290): 

291 """ 

292 Coherence between two channels for a given run number. 

293 Computes all permutations of inputs. 

294 """ 

295 loader: data_sources.DataSource 

296 if not (gain == "lo" or gain == "hi"): 

297 log.error("Gain must be 'lo' or 'hi'") 

298 sys.exit(1) 

299 if str(data_dir)[-8:] == ".parquet": 

300 log.info(f"Assuming {data_dir} is a direct path to a parquet file") 

301 loader = data_sources.ParquetSource(data_dir, None) 

302 elif postgres_uri: 

303 loader = data_sources.SQLSource(postgres_uri, None) 

304 else: 

305 loader = data_sources.DeltaSource(data_dir, None) 

306 gain = cast(Literal["hi", "lo"], gain) 

307 

308 run_plot_dir = plot_dir / f"run{run_number}" 

309 if not run_plot_dir.exists(): 

310 run_plot_dir.mkdir(parents=True, exist_ok=True) 

311 os.chmod(run_plot_dir, 0o775) 

312 

313 raw_df = loader.load_raw_data(run_number) 

314 raw_df = raw_df.filter( 

315 pl.col("gain") == gain, 

316 ).with_columns(pl.col("samples"), pl.col("channel")) 

317 

318 board_id = raw_df["board_id"].unique().to_numpy()[0] 

319 pas_mode = raw_df["pas_mode"].unique().to_numpy()[0] 

320 # HEC boards used to fill pas_mode with NaN, which get converted to floats in the DF instead of an int 

321 if pas_mode != pas_mode: 

322 pas_mode = -1 

323 

324 for c1, c2 in product([channel1], channels2): 

325 out = analysis.calc_coherence(c1, c2, raw_df) 

326 

327 if out is not None: 

328 freq, coh = out 

329 plotting.plot_coherence(c1, c2, gain, freq, coh, plot_dir, run_number, board_id, None, pas_mode) 

330 

331 

332@app.command("coherent", no_args_is_help=True) 

333def load_plot_coherent_noise( 

334 run_number: Annotated[int, typer.Argument(help="Run number to plot")], 

335 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"), 

336 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path( 

337 "plots" 

338 ), 

339): 

340 """ 

341 Coherent noise for a given run number. 

342 """ 

343 run_plot_dir = plot_dir / f"run{run_number}" 

344 if not run_plot_dir.exists(): 

345 run_plot_dir.mkdir(parents=True, exist_ok=True) 

346 os.chmod(run_plot_dir, 0o775) 

347 

348 loader = DeltaSource(None, derived_dir) 

349 df = loader.load_coherent_noise_data(run_number=run_number) 

350 for row in df.iter_rows(named=True): 

351 plotting.plot_coherent_noise(row, run_plot_dir, pas_mode=row.get("pas_mode"))