Coverage for polars_analysis / cli_pedestal.py: 91%
159 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import logging
2import os
3import sys
4from itertools import product
5from pathlib import Path
6from typing import Annotated, Dict, List, Literal, Optional, cast
8import numpy as np
9import polars as pl
10import typer
12from polars_analysis import data_sources
13from polars_analysis.analysis import constants
14from polars_analysis.analysis import pedestal_analysis as analysis
15from polars_analysis.data_sources import DeltaSource
16from polars_analysis.plotting import board_summary_plotting
17from polars_analysis.plotting import pedestal_plotting as plotting
18from polars_analysis.plotting.helper import Metadata
20# Instantiate logger
21log = logging.getLogger(__name__)
23app = typer.Typer(
24 no_args_is_help=True,
25 help="Remake specific pedestal plots from existing derived values",
26)
29@app.command("corr-matrix", no_args_is_help=True)
30def load_plot_correlation_matrix(
31 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
32 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"),
33 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
34 "plots"
35 ),
36 measurements: Annotated[List[int], typer.Option(help="measurement to plot")] = [0],
37 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None,
38):
39 """
40 Correlation matrix for a given run number.
41 """
42 loader: data_sources.DataSource
43 if str(data_dir)[-8:] == ".parquet":
44 log.info(f"Assuming {data_dir} is a direct path to a parquet file")
45 loader = data_sources.ParquetSource(data_dir, None)
46 elif postgres_uri:
47 loader = data_sources.SQLSource(postgres_uri, None)
48 else:
49 loader = data_sources.DeltaSource(data_dir, None)
51 run_plot_dir = plot_dir / f"run{run_number}"
52 if not run_plot_dir.exists():
53 run_plot_dir.mkdir(parents=True, exist_ok=True)
54 os.chmod(run_plot_dir, 0o775)
56 df = loader.load_raw_data(run_number)
57 info = Metadata.fill_from_dataframe(df)
58 gains: List[Literal["hi", "lo"]] = ["hi", "lo"]
59 channels = set(df["channel"].unique().to_list())
60 for gain in gains:
61 matrix = analysis.calc_correlation_matrix(df, measurements, gain)
62 for min_channel, n_channels in zip(
63 constants.PED_MIN_CHAN_LIST,
64 constants.PED_N_CHAN_LIST,
65 ):
66 plot_channels = set(range(min_channel, min_channel + n_channels))
67 # Skip if there is no overlap between plot range and available channels
68 if channels.isdisjoint(plot_channels):
69 continue
70 # Skip if data is fully covered by a smaller plot
71 if analysis.next_power_of_2(len(channels)) < n_channels // 4:
72 print(len(channels), n_channels)
73 continue
74 plotting.plot_correlation_matrix(matrix, gain, min_channel, n_channels, plot_dir, board_id=info.board_id)
77@app.command("raw-hist", no_args_is_help=True)
78def load_plot_raw_histogram(
79 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
80 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"),
81 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
82 "plots"
83 ),
84 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0,
85 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None,
86):
87 """
88 Raw histogram for a given run number.
89 """
90 loader: data_sources.DataSource
91 if str(data_dir)[-8:] == ".parquet":
92 log.info(f"Assuming {data_dir} is a direct path to a parquet file")
93 loader = data_sources.ParquetSource(data_dir, None)
94 elif postgres_uri:
95 loader = data_sources.SQLSource(postgres_uri, None)
96 else:
97 loader = data_sources.DeltaSource(data_dir, None)
99 run_plot_dir = plot_dir / f"run{run_number}"
100 if not run_plot_dir.exists():
101 run_plot_dir.mkdir(parents=True, exist_ok=True)
102 os.chmod(run_plot_dir, 0o775)
104 df = loader.load_raw_data(run_number)
105 df = df.filter(pl.col("measurement") == measurement)
106 for row in df.iter_slices(n_rows=1):
107 plotting.plot_hist(
108 Metadata.fill_from_dataframe(row),
109 row["samples"].to_numpy()[0],
110 run_plot_dir,
111 )
114@app.command("raw-data", no_args_is_help=True)
115def load_plot_raw_data(
116 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
117 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"),
118 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
119 "plots"
120 ),
121 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0,
122 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None,
123):
124 """
125 Raw data for a given run number.
126 """
127 loader: data_sources.DataSource
128 if str(data_dir)[-8:] == ".parquet":
129 log.info(f"Assuming {data_dir} is a direct path to a parquet file")
130 loader = data_sources.ParquetSource(data_dir, None)
131 elif postgres_uri:
132 loader = data_sources.SQLSource(postgres_uri, None)
133 else:
134 loader = data_sources.DeltaSource(data_dir, None)
136 run_plot_dir = plot_dir / f"run{run_number}"
137 if not run_plot_dir.exists():
138 run_plot_dir.mkdir(parents=True, exist_ok=True)
139 os.chmod(run_plot_dir, 0o775)
141 df = loader.load_raw_data(run_number)
142 df = df.filter(pl.col("measurement") == measurement)
143 for row in df.iter_slices(n_rows=1):
144 print(row)
145 plotting.plot_raw(
146 Metadata.fill_from_dataframe(pl.DataFrame(row)),
147 row["samples"].to_numpy()[0],
148 run_plot_dir,
149 )
152@app.command("baseline", no_args_is_help=True)
153def load_plot_baseline(
154 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
155 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"),
156 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
157 "plots"
158 ),
159 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0,
160):
161 """
162 Baseline mean and RMS for each channel.
163 """
164 run_plot_dir = plot_dir / f"run{run_number}"
165 if not run_plot_dir.exists():
166 run_plot_dir.mkdir(parents=True, exist_ok=True)
167 os.chmod(run_plot_dir, 0o775)
169 loader = DeltaSource(None, derived_dir)
170 df = loader.load_derived_data(run_number=run_number, meas_type="pedestal")
171 df = df.filter(pl.col("measurement") == measurement)
172 info = Metadata.fill_from_dataframe(df)
173 means: Dict[str, np.ndarray] = {}
174 stds: Dict[str, np.ndarray] = {}
175 gains: List[Literal["lo", "hi"]] = ["lo", "hi"]
176 for gain in gains:
177 info.gain = gain
178 means[gain] = df.filter(pl.col("gain") == gain).select(pl.col("mean")).to_series().to_numpy()
179 stds[gain] = df.filter(pl.col("gain") == gain).select(pl.col("std")).to_series().to_numpy()
181 board_summary_plotting.plot_pedestal_mean_hist(
182 means[gain][means[gain] != 0],
183 plot_dir,
184 info,
185 show_cuts=False,
186 )
188 board_summary_plotting.plot_pedestal_rms_hist(
189 stds[gain][stds[gain] != 0],
190 plot_dir,
191 info,
192 show_cuts=False,
193 )
195 info = Metadata.fill_from_dataframe(df)
196 plotting.plot_baseline_means_rms(df, plot_dir, board_id=info.board_id, info=info)
199@app.command("autocorr", no_args_is_help=True)
200def load_plot_autocorrelation(
201 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
202 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"),
203 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
204 "plots"
205 ),
206 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0,
207):
208 """
209 Autocorrelation for a given run number.
210 """
211 run_plot_dir = plot_dir / f"run{run_number}"
212 if not run_plot_dir.exists():
213 run_plot_dir.mkdir(parents=True, exist_ok=True)
214 os.chmod(run_plot_dir, 0o775)
216 loader = DeltaSource(None, derived_dir)
217 df = loader.load_derived_data(run_number=run_number, meas_type="pedestal")
218 df = df.filter(pl.col("measurement") == measurement)
219 for row in df.iter_rows(
220 named=True,
221 ):
222 df_filtered = df.filter(
223 pl.col("run_number") == row["run_number"],
224 pl.col("channel") == row["channel"],
225 pl.col("gain") == row["gain"],
226 )
227 plotting.plot_autocorrelation(
228 row["run_number"],
229 row["channel"],
230 row["gain"],
231 row["autocorr"],
232 run_plot_dir,
233 board_id=row["board_id"],
234 info=Metadata.fill_from_dataframe(df_filtered),
235 )
238@app.command("fft", no_args_is_help=True)
239def load_plot_fft(
240 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
241 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"),
242 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
243 "plots"
244 ),
245 measurement: Annotated[int, typer.Option(help="measurement to plot")] = 0,
246):
247 """
248 FFT for a given run number.
249 """
250 run_plot_dir = plot_dir / f"run{run_number}"
251 if not run_plot_dir.exists():
252 run_plot_dir.mkdir(parents=True, exist_ok=True)
253 os.chmod(run_plot_dir, 0o775)
255 loader = DeltaSource(None, derived_dir)
256 df = loader.load_derived_data(run_number=run_number, meas_type="pedestal")
257 df = df.filter(pl.col("measurement") == measurement)
258 for row in df.iter_rows(
259 named=True,
260 ):
261 df_filtered = df.filter(
262 pl.col("run_number") == row["run_number"],
263 pl.col("channel") == row["channel"],
264 pl.col("gain") == row["gain"],
265 )
266 plotting.plot_fft(
267 row["channel"],
268 row["gain"],
269 row["freq"],
270 row["psd"],
271 row["peaks"],
272 run_plot_dir,
273 run_number=run_number,
274 board_id=row["board_id"],
275 info=Metadata.fill_from_dataframe(df_filtered),
276 )
279@app.command("coherence", no_args_is_help=True)
280def load_plot_coherence(
281 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
282 gain: Annotated[str, typer.Argument(help="hi or lo gain channels")],
283 channel1: Annotated[int, typer.Argument(help="Channel 1.")],
284 channels2: Annotated[List[int], typer.Argument(help="List of channels to compare with channel 1.")],
285 data_dir: Annotated[Path, typer.Option(envvar="DATA_DIR", help="path to delta table")] = Path("data/samples"),
286 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
287 "plots"
288 ),
289 postgres_uri: Annotated[Optional[str], typer.Option(envvar="POSTGRES_URI", help="PostgreSQL URI")] = None,
290):
291 """
292 Coherence between two channels for a given run number.
293 Computes all permutations of inputs.
294 """
295 loader: data_sources.DataSource
296 if not (gain == "lo" or gain == "hi"):
297 log.error("Gain must be 'lo' or 'hi'")
298 sys.exit(1)
299 if str(data_dir)[-8:] == ".parquet":
300 log.info(f"Assuming {data_dir} is a direct path to a parquet file")
301 loader = data_sources.ParquetSource(data_dir, None)
302 elif postgres_uri:
303 loader = data_sources.SQLSource(postgres_uri, None)
304 else:
305 loader = data_sources.DeltaSource(data_dir, None)
306 gain = cast(Literal["hi", "lo"], gain)
308 run_plot_dir = plot_dir / f"run{run_number}"
309 if not run_plot_dir.exists():
310 run_plot_dir.mkdir(parents=True, exist_ok=True)
311 os.chmod(run_plot_dir, 0o775)
313 raw_df = loader.load_raw_data(run_number)
314 raw_df = raw_df.filter(
315 pl.col("gain") == gain,
316 ).with_columns(pl.col("samples"), pl.col("channel"))
318 board_id = raw_df["board_id"].unique().to_numpy()[0]
319 pas_mode = raw_df["pas_mode"].unique().to_numpy()[0]
320 # HEC boards used to fill pas_mode with NaN, which get converted to floats in the DF instead of an int
321 if pas_mode != pas_mode:
322 pas_mode = -1
324 for c1, c2 in product([channel1], channels2):
325 out = analysis.calc_coherence(c1, c2, raw_df)
327 if out is not None:
328 freq, coh = out
329 plotting.plot_coherence(c1, c2, gain, freq, coh, plot_dir, run_number, board_id, None, pas_mode)
332@app.command("coherent", no_args_is_help=True)
333def load_plot_coherent_noise(
334 run_number: Annotated[int, typer.Argument(help="Run number to plot")],
335 derived_dir: Annotated[Path, typer.Option(help="path to directory with derived values")] = Path("derived"),
336 plot_dir: Annotated[Path, typer.Option(envvar="RUNS_PLOT_DIR", help="path to directory to save plots")] = Path(
337 "plots"
338 ),
339):
340 """
341 Coherent noise for a given run number.
342 """
343 run_plot_dir = plot_dir / f"run{run_number}"
344 if not run_plot_dir.exists():
345 run_plot_dir.mkdir(parents=True, exist_ok=True)
346 os.chmod(run_plot_dir, 0o775)
348 loader = DeltaSource(None, derived_dir)
349 df = loader.load_coherent_noise_data(run_number=run_number)
350 for row in df.iter_rows(named=True):
351 plotting.plot_coherent_noise(row, run_plot_dir, pas_mode=row.get("pas_mode"))