Coverage for polars_analysis / frame.py: 59%
179 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import concurrent.futures
2import logging
3import multiprocessing as mp
4import os
5import sys
6import traceback
7from concurrent.futures import ProcessPoolExecutor
8from copy import deepcopy
9from pathlib import Path
10from typing import List, Literal, Optional
12import matplotlib
13import numpy as np
14import polars as pl
16matplotlib.use("agg")
17import scipy.signal as sps
18from matplotlib import pyplot as plt
20import polars_analysis.plotting.frame_plotting as frame_plotting
21import polars_analysis.plotting.pedestal_plotting as pedestal_plotting
22from polars_analysis import frame_utils
23from polars_analysis.analysis import constants, pedestal_analysis
24from polars_analysis.data_sources import DataSource
25from polars_analysis.plotting.helper import Metadata
27"""
28High level functions for frame analysis
29"""
31# Instantiate logger
32log = logging.getLogger(__name__)
35def scan_for_misalignment(
36 loader: DataSource,
37 min_run_number: int,
38 max_run_number: int,
39 plot_dir: Path,
40 reject_single_adc: bool = True,
41 test_channel: int = 7,
42) -> None:
43 """
44 Function to scan over all runs in the range of min to max run number for
45 misaligned frames. Produces plots of misaligned channels per board.
46 Previously scan_misalignment.py
47 """
48 old_run_cutoff = 2046
50 if min_run_number > max_run_number:
51 log.error("Min run number is larger than max run number")
52 sys.exit(1)
54 if min_run_number < old_run_cutoff and max_run_number > old_run_cutoff:
55 log.error(
56 "Run number range stradles 2046 cutoff between bugfix of frame data. These need to be run over and handled separately." # noqa: E501
57 )
58 sys.exit(1)
60 # old runs have swapped frame data
61 old_runs = max_run_number < old_run_cutoff
63 df_run_numbers = loader.get_runs_list()
65 run_numbers = (
66 df_run_numbers.filter(
67 pl.col("meas_type") == "pedestal",
68 pl.col("run_number") <= max_run_number,
69 pl.col("run_number") >= min_run_number,
70 )
71 .select(pl.col("run_number"))
72 .to_series()
73 .to_list()
74 )
76 df = loader.load_frame_data(*run_numbers, reject_single_adc=reject_single_adc, non_empty=True)
77 # Filter away trigger runs if needed
78 if not reject_single_adc:
79 run_numbers = (
80 df.group_by(pl.col("run_number"))
81 .agg(pl.col("measurement").unique().len() > 1)
82 .filter(pl.col("measurement"))["run_number"]
83 .to_list()
84 )
85 df = df.filter(pl.col("run_number").is_in(run_numbers))
87 runs_to_test = np.unique(df.select(pl.col("run_number")).to_series().to_list())
89 data = []
91 for rn in runs_to_test:
92 print(f"Processing run: {rn}")
93 df_temp = df.filter(pl.col("run_number") == rn)
94 if len(df_temp) != 32:
95 print(f"Skipping run {rn} as it has {len(df_temp)} != 32 rows of ADC data")
96 continue
98 # Need to loop over measurements to handle single ADC runs
99 a = np.array([[], [], []], dtype=np.int32)
100 for meas in df_temp["measurement"].unique().to_list():
101 df_temp_meas = df_temp.filter(pl.col("measurement") == meas)
102 df_temp_meas = frame_utils.trim_df_to_shortest_array(df_temp_meas)
103 df_temp_meas = frame_utils.unpack_frame_data(df_temp_meas, old_runs)
104 a = np.concatenate([a, frame_utils.check_bcid_alignment(df_temp_meas)], axis=1)
106 data.append((int(rn), int(df_temp["board_id"][0]), a[0].astype(np.int64)))
108 data_dict = dict()
109 data_dict["run_number"], data_dict["board_id"], data_dict["misalign_channels"] = list(zip(*data))
111 df_alignment_check = pl.from_dict(
112 data_dict,
113 schema={"run_number": pl.Int64, "board_id": pl.Int64, "misalign_channels": pl.List(pl.Int64)},
114 strict=False,
115 )
117 df_gb = (
118 df_alignment_check.filter(pl.col("misalign_channels").list.len() > 0)
119 .group_by(pl.col("board_id"))
120 .agg(pl.col("misalign_channels").explode())
121 )
123 # print out some info for runs that have misaligned channel X
124 print(f"Runs with ch {test_channel} misaligned")
125 for r in (
126 df_alignment_check.filter(pl.col("misalign_channels").list.contains(test_channel))
127 .select(pl.col("board_id"), pl.col("run_number").cast(pl.Int32))
128 .join(df, on="run_number", how="left")
129 .select(pl.col(["run_number", "board_id", "timestamp"]))
130 .group_by(pl.col("run_number"))
131 .first()
132 .sort(pl.col("run_number"))
133 .iter_rows()
134 ):
135 print(r[0], r[1], r[2].strftime("%Y-%m-%d %H:%M:%S"))
137 for board in df_gb["board_id"].to_list():
138 print(f"plotting {board}")
139 n, _ = np.histogram(df_gb.filter(pl.col("board_id") == board)["misalign_channels"].to_list(), bins=range(129))
141 n_runs = len(df_alignment_check.filter(pl.col("board_id") == board))
142 # denom = np.ones(len(n)) * n_runs # noqa: F841
144 plt.figure(figsize=(10, 8))
145 plt.bar(range(128), n, fill=False) # /denom
146 plt.title(
147 f"Channels with misalignment\nBoard ID = {board}, Runs in {min_run_number}--{max_run_number}, N Runs Total = {n_runs}" # noqa: E501
148 )
149 plt.ylabel("N Misaligned Runs")
150 plt.xlabel("Channel")
151 plt.xticks(np.arange(0, 128, 4), rotation=70)
152 plt.tight_layout()
153 plt.grid(True)
154 if old_runs:
155 plt.savefig(plot_dir / f"ch_misalign_freq_{board}_oldruns.png")
156 else:
157 plt.savefig(plot_dir / f"ch_misalign_freq_{board}.png")
158 plt.cla()
159 plt.clf()
161 # todo make plots that are just empty instead
162 print(f"Aligned boards: {np.setdiff1d(np.unique(data_dict['board_id']), df_gb['board_id'].to_list())}")
165def plot_extended_readout(
166 raw_data: pl.DataFrame,
167 run_number: int,
168 plot_dir_base: Path,
169 trigger_window: int = 128,
170 trigger_rate: float = constants.FELIX_TRIGGER_RATE[1], # Hz
171 skip_channels_lo: Optional[List[int]] = None,
172 skip_channels_hi: Optional[List[int]] = None,
173 bnl_data: bool = False,
174):
175 plot_adc_bcid = False # check columns_to_drop (need to keep adc_bcid) in check_and_align_frames_wrapper
176 # when setting this to True
178 board_id = raw_data.select(pl.col("board_id").first()).item()
179 pas_mode = raw_data.select(pl.col("pas_mode").first()).item()
181 # Overwrite input arguments with values from metadata if it exists
182 if "trigger_window" in raw_data.columns:
183 trigger_window_meta = raw_data.select(pl.col("trigger_window").first()).item()
184 if trigger_window_meta is not None and trigger_window_meta > 0 and trigger_window_meta != trigger_window:
185 log.warning(f"Trigger window {trigger_window} overridden by window in metadata {trigger_window_meta}")
186 trigger_window = trigger_window
187 if "trigger_rate" in raw_data.columns:
188 trigger_rate_int = raw_data.select(pl.col("trigger_rate").first()).item()
189 if trigger_rate_int is not None and trigger_rate_int > 0:
190 trigger_rate_meta = constants.felix_trigger_rate(run_number, trigger_rate_int, bnl_data)
192 if trigger_rate_meta != trigger_rate:
193 log.warning(f"Trigger rate {trigger_rate} overridden by rate in metadata {trigger_rate_meta}")
194 trigger_rate = trigger_rate_meta
196 plot_dir = Path(plot_dir_base / f"run{run_number}/extended_readout/")
197 if not plot_dir.exists():
198 plot_dir.mkdir(parents=True, exist_ok=True)
199 os.chmod(plot_dir, 0o775)
200 plot_dir_filled = len([p for p in plot_dir.glob("*png")]) > 0
202 # Pair down a few columns
203 raw_data = raw_data.select(
204 pl.col(
205 [
206 "board_id",
207 "gain",
208 "channel",
209 "samples",
210 "measurement",
211 "felix_event_count",
212 "awg_amp",
213 "att_val",
214 "pas_mode",
215 "run_number",
216 "adc_full_bcid",
217 ]
218 + (["adc_bcid"] if plot_adc_bcid else []) # needed to plot vs adc_bcid below
219 )
220 )
222 # Hacking in ADC_BCID when BCR is held high
223 # if plot_adc_bcid:
224 # raw_data = raw_data.with_columns( pl.lit([i%32 for i in range(1957246)]).alias("adc_bcid"))
226 if skip_channels_lo:
227 raw_data = raw_data.filter(~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo))))
228 if skip_channels_hi:
229 raw_data = raw_data.filter(~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi))))
231 # Sort by channels to preserve order
232 raw_data.sort(pl.col("channel"))
234 gains: List[Literal["lo", "hi"]] = ["lo", "hi"]
235 for gain in gains:
236 raw_data_g = raw_data.filter(pl.col("gain") == gain)
237 info_g = Metadata.fill_from_dataframe(raw_data_g)
238 channels = raw_data_g["channel"].unique().sort().to_list()
240 cutoff = np.floor(len(raw_data_g["felix_event_count"][0]) / trigger_window).astype(np.int32) * trigger_window
242 raw_data_g = raw_data_g.with_columns(
243 pl.col("samples")
244 .map_elements(
245 lambda x: x[:cutoff].to_numpy().reshape(-1, trigger_window).mean(axis=1),
246 return_dtype=pl.List(pl.Float64),
247 )
248 .alias("trig_window_mean"),
249 pl.col("samples")
250 .map_elements(
251 lambda x: x[:cutoff].to_numpy().reshape(-1, trigger_window).std(axis=1),
252 return_dtype=pl.List(pl.Float64),
253 )
254 .alias("trig_window_std"),
255 )
257 # Mean and std vs time
258 mean_y = np.array([i for i in raw_data_g["trig_window_mean"]])
259 std_y = np.array([i for i in raw_data_g["trig_window_std"]])
260 time_x = np.arange(len(mean_y[0])) / trigger_rate
261 std_samples = raw_data_g["samples"].list.std().to_numpy()
263 # FFT of mean vs time
264 freq, psd = sps.welch(mean_y, fs=trigger_rate, axis=1, nperseg=len(mean_y[0]) // 4)
266 adc_full_bcid = raw_data_g["adc_full_bcid"]
267 x_bcid = np.arange(constants.N_BCID)
269 # Correlation Matrix
270 measurements = raw_data_g["measurement"].unique().to_list()
271 matrix = pedestal_analysis.calc_correlation_matrix(raw_data_g, measurements, gain, "trig_window_mean")
272 min_channel: int = raw_data_g.select(pl.col("channel").min()).item()
273 n_channels: int = raw_data_g.select(pl.col("channel").unique().count()).item()
274 pedestal_plotting.plot_correlation_matrix(
275 matrix, gain, min_channel, n_channels, plot_dir, board_id=board_id, pas_mode=pas_mode
276 )
278 log.info(f"Plotting {gain} gain extended readout figures")
279 frame_plotting.plot_mean_v_time(time_x, mean_y, plot_dir, info=info_g)
281 frame_plotting.plot_60hz_power(freq, psd, plot_dir, info=info_g, channels=channels)
283 pedestal_plotting.plot_fft2d(
284 freq.tolist(),
285 psd,
286 plot_dir,
287 channels=channels,
288 gain=gain,
289 run_number=run_number,
290 board_id=str(board_id),
291 pas_mode=pas_mode,
292 unit="Hz",
293 )
295 # only used if plot_adc_bcid == True
296 x_adc_bcid = np.arange(32)
297 freq_adc_all = np.array([])
298 psd_adc_all = []
299 h_adc_bcid_sum = np.zeros(32)
300 h_adc_bcid_weighted_sum = np.zeros(32)
302 with ProcessPoolExecutor(mp_context=mp.get_context("spawn")) as executor:
303 job_handles = dict()
304 for index, channel in enumerate(channels):
305 # Need to copy info for each channel or else parallel plotting might have wrong labels
306 info_ch = deepcopy(info_g)
307 info_ch.channels = channel
309 h_bcid = np.histogram(adc_full_bcid[index], bins=np.arange(constants.N_BCID + 1))
311 h_weighted = np.histogram(
312 adc_full_bcid[index],
313 bins=np.arange(constants.N_BCID + 1),
314 weights=raw_data_g["samples"][index],
315 )
317 freq_bcid, psd_bcid = sps.welch(h_weighted[0] / h_bcid[0], fs=1, nperseg=constants.N_BCID)
319 # Internal ADC BCID
320 if plot_adc_bcid:
321 adc_internal_bcid = raw_data_g["adc_bcid"]
322 h_adc_bcid = np.histogram(adc_internal_bcid[index], bins=np.arange(33))
323 h_adc_weighted = np.histogram(
324 adc_internal_bcid[index],
325 bins=np.arange(33),
326 weights=raw_data_g["samples"][index],
327 )
328 h_adc_bcid_sum += h_adc_bcid[0]
329 h_adc_bcid_weighted_sum += h_adc_weighted[0]
331 freq_adc_bcid, psd_adc_bcid = sps.welch(h_adc_weighted[0] / h_adc_bcid[0], fs=1, nperseg=33)
332 freq_adc_all = freq_adc_bcid.astype(np.ndarray)
333 psd_adc_all.append(psd_adc_bcid)
335 job_handles[
336 executor.submit(
337 frame_plotting.plot_mean_v_bcid,
338 channel,
339 x_adc_bcid,
340 h_adc_bcid[0],
341 h_adc_weighted[0],
342 plot_dir,
343 info_ch,
344 )
345 ] = "plot_mean_v_bcid"
347 job_handles[
348 executor.submit(
349 frame_plotting.plot_fft_mean_v_bcid,
350 channel,
351 freq_adc_bcid,
352 psd_adc_bcid,
353 plot_dir,
354 info_ch,
355 )
356 ] = "plot_fft_mean_v_bcid"
357 else:
358 job_handles[
359 executor.submit(
360 frame_plotting.plot_mean_v_bcid,
361 channel,
362 x_bcid,
363 h_bcid[0],
364 h_weighted[0],
365 plot_dir,
366 info_ch,
367 )
368 ] = "plot_mean_v_bcid"
370 job_handles[
371 executor.submit(
372 frame_plotting.plot_fft_mean_v_bcid,
373 channel,
374 freq_bcid,
375 psd_bcid,
376 plot_dir,
377 info_ch,
378 )
379 ] = "plot_fft_mean_v_bcid"
381 # job_handles[
382 # executor.submit(
383 # frame_plotting.plot_adc_full_bcid,
384 # channel,
385 # h_bcid,
386 # plot_dir,
387 # info_ch,
388 # )
389 # ] = "plot_adc_full_bcid"
391 job_handles[
392 executor.submit(
393 frame_plotting.plot_mean_v_time_ch,
394 channel,
395 time_x,
396 mean_y[index],
397 plot_dir,
398 info_ch,
399 )
400 ] = "plot_mean_v_time_ch"
402 # job_handles[
403 # executor.submit(
404 # frame_plotting.plot_mean_v_time_ch_zoom,
405 # channel,
406 # time_x,
407 # mean_y[index],
408 # plot_dir,
409 # info_ch,
410 # )
411 # ] = "plot_mean_v_time_ch_zoom"
413 # job_handles[
414 # executor.submit(
415 # frame_plotting.plot_std_v_time_ch,
416 # channel,
417 # time_x,
418 # std_y[index],
419 # plot_dir,
420 # info_ch,
421 # )
422 # ] = "plot_std_v_time_ch"
424 # job_handles[
425 # executor.submit(
426 # frame_plotting.plot_std_v_time_ch_zoom,
427 # channel,
428 # time_x,
429 # std_y[index],
430 # plot_dir,
431 # info_ch,
432 # )
433 # ] = "plot_std_v_time_ch_zoom"
435 job_handles[
436 executor.submit(
437 frame_plotting.plot_fft_mean_v_time,
438 channel,
439 freq,
440 psd[index],
441 plot_dir,
442 info_ch,
443 )
444 ] = "plot_fft_mean_v_time"
446 # job_handles[
447 # executor.submit(
448 # frame_plotting.plot_window_stds,
449 # channel,
450 # plot_dir,
451 # std_y[index],
452 # std_samples[index],
453 # info_ch
454 # )
455 # ] = "plot_window_stds"
457 job_handles[
458 executor.submit(
459 frame_plotting.plot_window_means,
460 channel,
461 plot_dir,
462 mean_y[index],
463 std_y[index],
464 std_samples[index],
465 trigger_window,
466 info_ch,
467 )
468 ] = "plot_window_means_sample_mean"
470 if plot_adc_bcid:
471 job_handles[
472 executor.submit(
473 frame_plotting.plot_mean_v_bcid,
474 200,
475 x_adc_bcid,
476 h_adc_bcid_sum,
477 h_adc_bcid_weighted_sum,
478 plot_dir,
479 info_g,
480 )
481 ] = "plot_mean_v_bcid_sum"
483 freq_adc_bcid, psd_adc_bcid = sps.welch(h_adc_bcid_weighted_sum / h_adc_bcid_sum, fs=1, nperseg=33)
484 job_handles[
485 executor.submit(
486 frame_plotting.plot_fft_mean_v_bcid,
487 200,
488 freq_adc_bcid,
489 psd_adc_bcid,
490 plot_dir,
491 info_g,
492 )
493 ] = "plot_fft_mean_v_bcid"
495 # Check for exceptions
496 for future in concurrent.futures.as_completed(job_handles):
497 job = job_handles[future]
498 try:
499 future.result()
500 except Exception as exc:
501 log.error(f"{job} generated an exception: {exc}")
502 print(traceback.format_exc())
504 if plot_adc_bcid:
505 pedestal_plotting.plot_fft2d(
506 freq_adc_all.tolist(),
507 np.concatenate([[np.array(i)] for i in psd_adc_all]),
508 plot_dir,
509 channels=channels,
510 gain=gain,
511 run_number=run_number,
512 board_id=str(board_id),
513 pas_mode=pas_mode,
514 unit="1/(ADC BCID)",
515 extra_filename="adc_bcid",
516 )
518 if not plot_dir_filled:
519 for f in plot_dir.glob("*png"):
520 os.chmod(f, 0o664)
521 for f in plot_dir.glob("*json"):
522 os.chmod(f, 0o664)
525def plot_extended_readout_from_loader(
526 loader: DataSource,
527 run_number: int,
528 plot_dir_base: Path,
529 trigger_window: int = 128,
530 trigger_rate: float = constants.FELIX_TRIGGER_RATE[1], # Hz
531 skip_channels_lo: Optional[List[int]] = None,
532 skip_channels_hi: Optional[List[int]] = None,
533 swap_frame18: bool = False,
534 baseline_corr_integration_period: Optional[float] = None,
535):
536 """
537 Load a dataframe for standalone running
538 """
539 raw_data, _ = frame_utils.check_and_align_frames_wrapper(
540 loader,
541 run_number,
542 swap_frame18,
543 baseline_corr_integration_period=baseline_corr_integration_period,
544 plot_dir=Path(plot_dir_base / f"run{run_number}"),
545 )
547 plot_extended_readout(
548 raw_data,
549 run_number,
550 plot_dir_base,
551 trigger_window,
552 trigger_rate,
553 skip_channels_lo,
554 skip_channels_hi,
555 )