Coverage for polars_analysis / frame_utils.py: 50%
264 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 17:11 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 17:11 -0400
1import logging
2import sys
3from pathlib import Path
4from typing import List, Optional, cast
6import numpy as np
7import polars as pl
8import polars.selectors as cs
9from numpy.lib.stride_tricks import sliding_window_view
11from polars_analysis import utils
12from polars_analysis.analysis import constants
13from polars_analysis.data_sources import DataSource, DeltaSource, SQLSource
15"""
16Utilities for handling Frame data and merging it into samples dataframes.
17Mostly written with trigger mode runs in mind so far.
18"""
20# Instantiate logger
21log = logging.getLogger(__name__)
24def trim_df_to_shortest_array(df: pl.DataFrame) -> pl.DataFrame:
25 """
26 Trim columns with list types to the shortest length to ensure they are all the same
27 """
28 if df.is_empty():
29 log.error("Empty dataframe passed to trimming.")
30 return df
32 df_temp = df.select(
33 (
34 cs.by_dtype(pl.List(pl.Int16))
35 | cs.by_dtype(pl.List(pl.Int32))
36 | cs.by_dtype(pl.List(pl.Int64))
37 | cs.by_dtype(pl.List(pl.Float32))
38 | cs.by_dtype(pl.List(pl.Float64))
39 )
40 )
41 # Remove columns that are null, eg felix_evt_count, which was only added later
42 list_columns = df_temp.select(c for c in df_temp if c.null_count() != c.len()).columns
44 try:
45 width_min = df.select(pl.col(list_columns).list.len().min()).to_numpy().min()
46 width_max = df.select(pl.col(list_columns).list.len().max()).to_numpy().max()
47 except ValueError:
48 print("Value error in trim_df_to_shortest_array")
49 sys.exit(1)
51 # trim
52 if width_min != width_max:
53 log.warning(f"Trimming arrays with max legnth {width_max} to min {width_min}")
54 df = df.with_columns(
55 [pl.col(c).map_elements(lambda x: x[:width_min], return_dtype=pl.List(pl.Int64)) for c in list_columns]
56 )
58 return df
61def check_bcid_alignment(frame_data: pl.DataFrame) -> np.ndarray:
62 """
63 Do a faster check for frame misalignment assuming alignment stays the same over the whole run.
64 Arbitrarily pick hi gain since hi and lo are always from the same chip
65 and we need to avoid duplicating channel numbers for hi and lo.
66 """
67 frame_data_temp = frame_data.filter(pl.col("gain") == "hi")
68 # save, in case these are out of order
69 channel_name_array = frame_data_temp["channel"].to_numpy().astype(np.int32)
71 # Check first three words to avoid case where BCID is rolling over
72 adc_bcid_array = np.array([a[0:3] for a in frame_data_temp["adc_bcid"]])
73 # Median of the 3 columns as a reference
74 reference_bcid = np.percentile(adc_bcid_array, 50, method="higher", axis=0)
75 # Now the median of the diff is the alignment offset
76 alignment = np.median(reference_bcid - adc_bcid_array, axis=1).astype(np.int32)
78 # Use first 32 words to check for bad bcid sequences
79 bad_sequence = np.where(np.median(np.diff([idx[:32] for idx in frame_data_temp["adc_bcid"]], axis=1), axis=1) != 1)
80 if len(bad_sequence[0]) > 0:
81 log.warning(f"Channels {bad_sequence} don't follow normal BCID increments of 1.")
83 misaligned_channels = [channel_name_array[idx] for idx in np.where(alignment != 0)[0]]
84 first_bad_idx = np.zeros(len(misaligned_channels)).astype(np.int32)
85 shifts = alignment[alignment != 0]
87 return np.array(
88 [
89 misaligned_channels,
90 first_bad_idx,
91 shifts,
92 ]
93 )
96def check_bcid_alignment_full(frame_data: pl.DataFrame) -> np.ndarray:
97 """
98 Do a complete check over all samples for frame misalignment.
99 Arbitrarily pick hi gain since hi and lo are always from the same chip
100 and we need to avoid duplicating channel numbers for hi and lo.
101 """
102 frame_data_temp = frame_data.filter(pl.col("gain") == "hi")
103 # save, in case these are out of order
104 channel_name_array = frame_data_temp["channel"].to_numpy()
106 adc_bcid_array = np.array([a for a in frame_data_temp["adc_bcid"]])
108 reference_row = np.median(adc_bcid_array, axis=0)
109 aligned = adc_bcid_array == reference_row
111 bad_sequence = np.where(np.median(np.diff(adc_bcid_array, axis=1), axis=1) != 1)
112 if len(bad_sequence[0]) > 0:
113 log.warning(f"Channels {bad_sequence} don't follow normal BCID increments of 1.")
115 misaligned_channels = np.array([])
116 first_bad_idx = []
117 shifts = []
119 if not np.all(aligned):
120 misaligned_adc_idx = np.where(~np.all(aligned, axis=1))[0]
121 misaligned_channels = channel_name_array[misaligned_adc_idx]
123 for a in misaligned_adc_idx:
124 idx = np.where(~aligned[a])[0][0]
125 first_bad_idx.append(idx)
127 # shift is positive if the channel is behind the reference
128 shift = np.int32(reference_row[idx] - adc_bcid_array[a][idx])
129 shifts.append(shift)
131 return np.array(
132 [
133 misaligned_channels,
134 first_bad_idx,
135 shifts,
136 ]
137 )
140def align_row(
141 input_list: pl.Series, shift: int, front: bool, cutoff: Optional[int] = None, trigger_window: Optional[int] = None
142) -> pl.Series:
143 """
144 Helper function for alignment per row
146 Args:
147 input_list: The array to trim, e.g., samples
148 shift: Number of samples to trim
149 front: Trim the first samples, otherwise trim the last samples
150 cutoff: max number of samples with full trigger windows. Not used for singleADC mode
151 trigger_window: Size of trigger window. Distinguishes between trigger mode and single ADC
153 Returns:
154 The rimmed array
155 """
157 # Single ADC
158 # Shift the entire samples array as it is contiguous
159 if trigger_window is None:
160 if front:
161 return input_list[np.abs(shift) :]
162 else:
163 return input_list[: -np.abs(shift)]
164 # Trigger Mode
165 # Cut into trigger_window chunks, and trim each window, then flatten back to full array
166 else:
167 if front:
168 return pl.Series(
169 (input_list.to_numpy()[:cutoff].reshape([-1, trigger_window])[:, np.abs(shift) :]).flatten()
170 )
171 else:
172 return pl.Series(
173 (input_list.to_numpy()[:cutoff].reshape([-1, trigger_window])[:, : -np.abs(shift)]).flatten()
174 )
177def align_adc_bcids(df: pl.DataFrame, alignment: np.ndarray, trigger_window: Optional[int] = None) -> pl.DataFrame:
178 """
179 Assumes high and low gain channels always have the same alignment.
180 All list[Int] columns will be shifted. 'felix_bcid' is handled separately.
182 Args:
183 df: polars dataframe
184 alignment: array of which channels to align and how many samples they're off
185 trigger_window: needed for trigger mode runs to properly shift each window. Leave as None for singleADC
187 Returns:
188 Aligned dataframe
189 """
191 log.info("Aligning frames")
193 (misaligned_channels, first_bad_idx, shifts) = alignment
195 if len(alignment[0]) == 0:
196 log.info("Nothing to align!")
197 return df
198 else:
199 if np.mean(first_bad_idx) != 0:
200 log.error(f"Can't handle a shift in BCID alignment mid-data-taking: {first_bad_idx}")
202 unique_shifts = np.unique(shifts)
203 if len(np.unique(shifts)) > 1:
204 log.warning(f"More than one shift size: {unique_shifts}.")
206 # Columns that need shifting, separate to preserve type later
207 # N.B. We do not want to shift the felix_bcid, as this will misalign the coputed full adc bcid.
208 # We are shifting the adc bcid under the felix bcid, so we need to shift all rows (channels) the same,
209 # but trimming the same as the originally correctly aligned rows
210 list_columns16 = df.select(cs.by_dtype(pl.List(pl.Int16))).select(~cs.by_name("felix_bcid")).columns
211 list_columns32 = df.select(cs.by_dtype(pl.List(pl.Int32))).select(~cs.by_name("felix_bcid")).columns
212 list_columns64 = df.select(cs.by_dtype(pl.List(pl.Int64))).select(~cs.by_name("felix_bcid")).columns
214 list_columns16_global = df.select(cs.by_dtype(pl.List(pl.Int16))).select(cs.by_name("felix_bcid")).columns
216 # In case this is a dataframe with channels or gain as a list, we need to remove those
217 for c in ["channel", "gain"]:
218 for lc in [list_columns16, list_columns32, list_columns64]:
219 if c in lc:
220 lc.remove(c)
222 # We're going to iterate through unique shift sizes.
223 # There is a more complex way to do this that would save more data by combining shifts in the same direction.
224 for shift in unique_shifts:
225 # Cutoff to make divisible by the trigger window. Needs to be computed here to update for multiple shifts.
226 if trigger_window is not None:
227 cutoff = np.floor(df["samples"][0].len() / trigger_window).astype(np.int32) * trigger_window
228 else:
229 cutoff = df["samples"][0].len()
231 misaligned_channels_for_shift = misaligned_channels[shifts == shift]
233 if shift > 0:
234 # We need to remove samples from the missaligned data
235 row_selection = pl.col("channel").is_in(misaligned_channels_for_shift)
236 else:
237 # We need to remove samples from the aligned data
238 row_selection = ~pl.col("channel").is_in(misaligned_channels_for_shift)
240 df = df.with_columns(
241 [ # First the special case of felix_bcid, where all channels are shifted the same since this is our
242 # global reference. Same shift as ~row_selection below.
243 pl.when(shift > 0)
244 .then(
245 pl.col(c).map_elements(
246 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int16)
247 )
248 )
249 .otherwise(
250 pl.col(c).map_elements(
251 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int16)
252 )
253 )
254 for c in list_columns16_global
255 ]
256 + [
257 pl.when(row_selection)
258 # abs() because we have changed row_selection above and need to remove entries
259 .then(
260 pl.col(c).map_elements(
261 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int16)
262 )
263 )
264 .otherwise(
265 pl.col(c).map_elements(
266 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int16)
267 )
268 )
269 for c in list_columns16
270 ]
271 + [
272 pl.when(row_selection)
273 # abs() because we have changed row_selection above and need to remove entries
274 .then(
275 pl.col(c).map_elements(
276 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int32)
277 )
278 )
279 .otherwise(
280 pl.col(c).map_elements(
281 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int32)
282 )
283 )
284 for c in list_columns32
285 ]
286 + [
287 pl.when(row_selection)
288 # abs() because we have changed row_selection above and need to remove entries
289 .then(
290 pl.col(c).map_elements(
291 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int64)
292 )
293 )
294 .otherwise(
295 pl.col(c).map_elements(
296 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int64)
297 )
298 )
299 for c in list_columns64
300 ]
301 )
302 if trigger_window is not None:
303 df = df.with_columns(pl.lit(trigger_window - np.abs(shift)).alias("trigger_window"))
304 # Update trigger window for case of multiple shifts
305 trigger_window = trigger_window - np.abs(shift)
307 return df
310def unpack_frame_data(
311 frame_data: pl.DataFrame,
312 swap_frame18: bool = False,
313 skip_channels_lo: Optional[List[int]] = None,
314 skip_channels_hi: Optional[List[int]] = None,
315 drop_missing_frame_chs: bool = True,
316) -> pl.DataFrame:
317 """
318 Unpack the frame data
319 """
321 if swap_frame18: # old data had a bug in it
322 frame1, frame8 = "frame8", "frame1"
323 else:
324 frame1, frame8 = "frame1", "frame8"
326 df = (
327 frame_data.with_columns(
328 pl.col("adc")
329 # Need to subtract 1 to get 0-based ADC counting for channel
330 .map_elements(lambda x: [(x - 1) * 4 + i for i in range(4)], return_dtype=pl.List(pl.Int32))
331 .alias("channel"),
332 pl.lit(["hi", "lo"]).alias("gain"),
333 )
334 # need to do this here so we can split ADCs across lpGBTs
335 .explode(pl.col("channel"))
336 .explode(pl.col("gain"))
337 .with_columns(
338 # ADCs read by single lpGBT
339 pl.when(
340 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 1)) | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 2))
341 )
342 .then(pl.col(frame8))
343 # ADCs read by single lpGBT
344 .when(
345 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 0)) | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 1))
346 )
347 .then(pl.col(frame1))
348 # ADCs readout by two lpGBTs
349 .when(
350 (
351 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 2))
352 | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 0))
353 )
354 & ((pl.col("adc") * 4 - pl.col("channel")) <= 2)
355 )
356 .then(pl.col(frame8))
357 .when(
358 (
359 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 2))
360 | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 0))
361 )
362 & ((pl.col("adc") * 4 - pl.col("channel")) >= 3)
363 )
364 .then(pl.col(frame1))
365 .alias("merged_frame")
366 )
367 )
369 # Polars lacks funtionality in the bitwise world
370 adc_bcid = np.bitwise_and(np.right_shift(df["merged_frame"].to_numpy(), 8), 0x1F)
371 adc_first = np.bitwise_and(df["merged_frame"].to_numpy(), 0x1 << 13)
372 df = df.with_columns(
373 pl.Series(name="adc_bcid", values=list(adc_bcid)).cast(pl.List(pl.Int64)),
374 pl.Series(name="adc_first", values=list(adc_first)).cast(pl.List(pl.Int64)),
375 )
377 if skip_channels_lo:
378 df = df.filter(~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo))))
379 if skip_channels_hi:
380 df = df.filter(~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi))))
382 # Check for empty frame data
383 df_temp = df.filter((pl.col("adc_bcid").list.mean() == 0) | (pl.col("adc_bcid").list.mean() == 0x1F))
384 if len(df_temp) > 0:
385 log.warning(f"Channels {df_temp['channel'].to_numpy()} have empty frame data")
387 if drop_missing_frame_chs:
388 log.warning("Dropping these channels.")
389 log.warning("""It is very important that your samples data channels have the correct number corresponding\
390 to each ADC. Otherwise, the wrong channels will be shifted.
391 Are you running on old data? It is very likely frame8 and frame1 were swapped, breaking the correspondance here.
392Try using --swap-frame18""")
393 df = df.filter(~((pl.col("adc_bcid").list.mean() == 0) | (pl.col("adc_bcid").list.mean() == 0x1F)))
395 return df
398def unwrap_frame_data(
399 frame_data: pl.DataFrame,
400 swap_frame18: bool = False,
401 skip_channels_lo: Optional[List[int]] = None,
402 skip_channels_hi: Optional[List[int]] = None,
403 drop_missing_frame_chs: bool = True,
404) -> pl.DataFrame:
405 """
406 Compute BCIDs and return a dataframe that can be
407 merged with per channel data to append frame data
408 """
409 # Trim arrays in DF if needed
410 df = trim_df_to_shortest_array(frame_data)
412 df = unpack_frame_data(
413 df, swap_frame18, skip_channels_lo, skip_channels_hi, drop_missing_frame_chs=drop_missing_frame_chs
414 )
416 # ADC alignment check
417 alignment = check_bcid_alignment(df)
419 if len(alignment[0]) != 0:
420 log.warning(
421 f"""Misaligned ADC BCIDs found on channels (zero counting): {alignment[0]}
422 starting at indices: {alignment[1]} shifted by: {alignment[2]}"""
423 )
425 # To handle change in column names
426 felix_names = []
427 if "felix_evt_count" in df.columns:
428 felix_names.append("felix_evt_count")
429 if "felix_event_count" in df.columns:
430 felix_names.append("felix_event_count")
431 if "felix_bcid" in df.columns:
432 felix_names.append("felix_bcid")
433 if "trigger_rate" in df.columns:
434 felix_names.append("trigger_rate")
436 # Drop some columns
437 df = df.select(
438 [
439 pl.col("measurement"),
440 pl.col("gain"),
441 pl.col("channel"),
442 pl.col("board_id"),
443 pl.col("run_number"),
444 pl.col("adc"),
445 pl.col(felix_names),
446 pl.col("merged_frame"),
447 pl.col("adc_bcid"),
448 pl.col("adc_first"),
449 # pl.col("adc"),
450 # pl.col("frame1"),
451 # pl.col("frame8"),
452 ]
453 )
455 return df
458def merge_samples_frame_data(samples_data: pl.DataFrame, frame_data: pl.DataFrame) -> pl.DataFrame:
459 df = samples_data.join(frame_data, on=["run_number", "measurement", "channel", "gain", "board_id"], how="left")
461 return df
464def compute_adc_felix_bcid_offset(frame_data: pl.DataFrame, trigger_window: int = 128) -> int:
465 # Need to find a trigger window where both the Felix and ADC have a BCR
466 # Assuming everything is aligned, so we just need one example from each
467 adc_bcid = frame_data["adc_bcid"][0].to_numpy()
468 felix_bcid = frame_data["felix_bcid"][0].to_numpy()
470 # Make divisible by the trigger window
471 cutoff = np.floor(len(adc_bcid) / trigger_window).astype(np.int32) * trigger_window
473 # Reshape into packets
474 adc_bcid_packets = adc_bcid[:cutoff].reshape(-1, trigger_window)
475 felix_bcid_packets = felix_bcid[:cutoff].reshape(-1, trigger_window)
477 adc_bcr = np.where((np.diff(adc_bcid_packets) != 1) & (np.diff(adc_bcid_packets) != -31))
478 felix_bcr = np.where((np.diff(felix_bcid_packets) != 1))
480 common_packet = -1
481 for idx in felix_bcr[0]:
482 if idx in adc_bcr[0]:
483 common_packet = idx
484 break
485 if common_packet == -1:
486 log.warning("Could not find packet with overlapping BCR!")
487 return -9999
489 adc_index = np.where(adc_bcr[0] == common_packet)[0][0]
490 felix_index = np.where(felix_bcr[0] == common_packet)[0][0]
491 adc_packet_index = adc_bcr[1][adc_index]
492 felix_packet_index = felix_bcr[1][felix_index]
494 return int(felix_packet_index - adc_packet_index)
497def add_full_adc_bcid(df: pl.DataFrame, bcid_offset: int) -> pl.DataFrame:
498 """
499 Add full, unwrapped, BCID by adding the bcid_offset to the felix_bcid
500 """
502 return df.with_columns(
503 pl.col("felix_bcid")
504 .map_elements(lambda x: (x.to_numpy() + bcid_offset) % (constants.N_BCID), return_dtype=pl.List(pl.Int64))
505 .alias("adc_full_bcid")
506 )
509def correct_baseline(
510 baseline_corr_integration_period: float,
511 raw_data: pl.DataFrame,
512 plot_dir: Optional[Path],
513 trigger_window: int = 128,
514 trigger_rate: float = constants.FELIX_TRIGGER_RATE[1], # Hz
515 bnl_data: bool = False,
516) -> pl.DataFrame:
517 """
518 Correct variations in the pedestal over an integration period vs the mean over the whole run.
519 Assumed to be run after alignment.
521 Args:
522 baseline_corr_integration_period: time period in seconds to attempt to integrate over
523 raw_data: raw dataframe to correct
524 plot_dir: directory for adding run_info
525 trigger_window: size of trigger window in samples, overwritten if DF has this metadata
526 trigger_rate: trigger rate in Hz, overwritten if DF has this metadata
527 bnl_data: if BNL data the trigger rate is different
530 Returns:
531 pl.DataFrame: The data frame with corrected samples replacing the raw samples
533 """
535 # Overwrite input arguments with values from metadata if it exists
536 if "trigger_window" in raw_data.columns:
537 trigger_window_meta = raw_data.select(pl.col("trigger_window").first()).item()
538 if trigger_window_meta is not None and trigger_window_meta > 0 and trigger_window_meta != trigger_window:
539 log.warning(f"Trigger window {trigger_window} overridden by window in metadata {trigger_window_meta}")
540 trigger_window = trigger_window
542 if "trigger_rate" in raw_data.columns:
543 trigger_rate_int = raw_data.select(pl.col("trigger_rate").first()).item()
544 run_number = raw_data.select(pl.col("run_number").first()).item()
545 if trigger_rate_int is not None and trigger_rate_int > 0:
546 trigger_rate_meta = constants.felix_trigger_rate(run_number, trigger_rate_int, bnl_data)
548 if trigger_rate_meta != trigger_rate:
549 log.warning(f"Trigger rate {trigger_rate} overridden by rate in metadata {trigger_rate_meta}")
550 trigger_rate = trigger_rate_meta
552 n_trigger_windows = len(raw_data["samples"][0]) / trigger_window
554 if trigger_rate is None or trigger_window is None or trigger_rate < 0 or trigger_window < 1:
555 log.error(f"Need the {trigger_rate=} and {trigger_window=} to perform baseline correction.")
556 sys.exit()
558 trigger_window_period = trigger_window * 25e-9 # trigger window period in seconds
559 trigger_period = 1 / trigger_rate
561 # Find cloest time period to baseline_corr_integration_period
562 min_idx: int
563 if np.abs(trigger_window_period - baseline_corr_integration_period) < np.abs(
564 trigger_period - baseline_corr_integration_period
565 ):
566 log.info(
567 f"Closest integration period for baseline correction to {baseline_corr_integration_period:.04f} s"
568 f" is integrating a single trigger window {trigger_window_period:.04f} s"
569 )
570 min_idx = 1
571 integration_period = trigger_window_period
572 else:
573 # allows up to 5s at fastest trigger rate
574 # [0, 0, 1, ...] we need to integrate 2 windows to get the trigger period, so index 2
575 trigger_period_multiples = trigger_period * np.concatenate([[0, 0], np.arange(1, 25000, 1)])
576 min_idx = int(np.abs(baseline_corr_integration_period - trigger_period_multiples).argmin())
578 if min_idx >= 25000:
579 log.error(
580 "The requested integration period is longer than the hardcoded max of 25000 trigger windows. "
581 "No correction applied."
582 )
583 return raw_data
585 if min_idx >= n_trigger_windows:
586 log.error("The requested integration period is longer than the run. No correction applied")
587 return raw_data
589 integration_period = trigger_period_multiples[min_idx]
590 log.info(
591 f"Closest integration period for baseline correction to {baseline_corr_integration_period:.04f} s"
592 f" is integrating {min_idx} trigger windows {integration_period:.04f} s and there are {n_trigger_windows=}"
593 )
594 # Mean per row of samples as stable baseline to correct to
595 # window_averages = averages over N trigger windows * trigger window samples
596 raw_data = raw_data.with_columns(pl.col("samples").list.mean().alias("mean")).with_columns(
597 pl.col("samples")
598 .map_elements(
599 lambda x: (
600 np.mean(sliding_window_view(x, min_idx * trigger_window)[:: min_idx * trigger_window], axis=1).reshape(
601 # For picking out only some samples per trigger window
602 # np.mean(sliding_window_view(x, min_idx * trigger_window)[:: min_idx * trigger_window].reshape(-1, trigger_window)[:,0:4].reshape(-1,min_idx*4), axis=1).reshape( #noqa E501
603 -1,
604 1,
605 )
606 * np.ones(min_idx * trigger_window)
607 ).flatten(),
608 return_dtype=pl.List(pl.Float64),
609 )
610 .alias("window_averages")
611 )
612 # sliding_window_view can trim the data, so we need to keep the other columns in sync
613 raw_data = trim_df_to_shortest_array(raw_data)
614 # Overwrite samples with corrected samples
615 raw_data = raw_data.with_columns(
616 ((pl.col("samples") - pl.col("window_averages")) + pl.col("mean")).alias("samples")
617 )
619 if plot_dir is not None:
620 for board_id in raw_data["board_id"].unique().to_list():
621 utils.add_run_info(
622 "Baseline drift correction timescale", f"{integration_period:.04f} s", board_id, plot_dir, True
623 )
625 return raw_data
628def check_and_align_frames_wrapper(
629 loader: DataSource,
630 run_number: int,
631 swap_frame18: bool,
632 require_positive: bool = False,
633 require_unsaturated: bool = False,
634 require_nonempty: bool = True,
635 baseline_corr_integration_period: Optional[float] = None,
636 plot_dir: Optional[Path] = None,
637 do_alignment: bool = True,
638 bnl_data: bool = False,
639) -> tuple[pl.DataFrame, np.ndarray]:
640 is_nevis_data = False
641 if isinstance(loader, SQLSource):
642 is_nevis_data = True
643 elif isinstance(loader, DeltaSource): # DeltaSource or ParquetSource
644 is_nevis_data = "flx-srv-atlas" in str(loader.raw_dir).lower()
646 if is_nevis_data and run_number < 2046 and not swap_frame18:
647 log.warning(f"Run number {run_number} is older than ~2046, swap_frame18 likely needs to be set!")
649 columns_to_drop = [
650 "adc",
651 "adc_bcid", # keep for plotting vs ADC bcid, ie comment out
652 "adc_first",
653 "felix_bcid",
654 "merged_frame",
655 "trigger_rate_right",
656 ]
658 raw_data = loader.load_raw_data(
659 run_number,
660 require_positive=require_positive,
661 require_unsaturated=require_unsaturated,
662 require_nonempty=require_nonempty,
663 )
664 frame_data = loader.load_frame_data(run_number)
666 # Flag for trigger vs single ADC mode
667 is_triggerMode = raw_data["measurement"].unique().len() == 1
669 if is_triggerMode:
670 # Can do it all in one go
671 frame_data = unwrap_frame_data(frame_data, swap_frame18, drop_missing_frame_chs=do_alignment)
672 alignment = check_bcid_alignment(frame_data)
674 trigger_window = -1
675 if raw_data["trigger_window"][0] is not None and raw_data["trigger_window"][0] > 0:
676 trigger_window = raw_data["trigger_window"][0]
677 elif (fec := frame_data["felix_event_count"][0]) is not None:
678 trigger_window = len(np.where(fec == 0)[0])
680 raw_data = merge_samples_frame_data(raw_data, frame_data)
682 # Check if frames are already aligned
683 if len(alignment[0]) == 0:
684 log.info("Frames already aligned")
685 elif do_alignment:
686 log.info("Aligning frames")
687 # prune columns to be consistent with single adc for now
688 raw_data = align_adc_bcids(raw_data, alignment, trigger_window)
689 else:
690 log.info("Frames not aligned, but do_alignment is False.")
691 alignment = np.array([[], [], []])
693 # perform baseline correction
694 if baseline_corr_integration_period is not None:
695 raw_data = correct_baseline(baseline_corr_integration_period, raw_data, plot_dir, bnl_data)
697 bcid_offset = compute_adc_felix_bcid_offset(frame_data, trigger_window)
698 raw_data = add_full_adc_bcid(raw_data, bcid_offset).drop(columns_to_drop, strict=False)
700 else: # single ADC
701 # Need to loop through measurments since they can have independent lengths
703 raw_data_per_meas = []
704 alignment = np.array([[], [], []])
705 for meas in raw_data["measurement"].unique().to_list():
706 frame_data_meas = unwrap_frame_data(frame_data.filter(pl.col("measurement") == meas), swap_frame18)
707 alignment_temp = check_bcid_alignment(frame_data_meas)
709 raw_data_meas = raw_data.filter(pl.col("measurement") == meas)
710 raw_data_meas = merge_samples_frame_data(raw_data_meas, frame_data_meas)
712 # Check if frames are already aligned
713 if len(alignment_temp[0]) == 0:
714 log.info(f"Measurement {meas} frames already aligned")
715 elif do_alignment:
716 log.info(f"Aligning frames for measurement {meas}")
718 # Need to prune columns since if no alignment is needed these don't get added to that measurement
719 raw_data_meas = align_adc_bcids(raw_data_meas, alignment_temp).drop(columns_to_drop, strict=False)
720 alignment = np.concatenate([alignment, alignment_temp], axis=1).astype(np.int32)
721 else:
722 log.info(f"Measurement {meas} frames not aligned, but do_alignment is False.")
724 # perform baseline correction
725 if baseline_corr_integration_period is not None:
726 log.warning("Running the baseline pedestal drift correction on single ADC mode runs is a little janky")
728 # For purposes of chunking the data, make 'trigger windows' to match the requested integration period.
729 # If that's larger than the data we have, take the length of the data.
730 trigger_window_requested = np.floor(baseline_corr_integration_period / 25e-9)
731 max_samples = np.floor(cast(int, raw_data_meas["samples"].list.len().min()))
733 trigger_window = int(np.min([trigger_window_requested, max_samples]))
734 log.info(f"Splitting the single ADC data into {trigger_window} sample chunks")
735 raw_data_meas = correct_baseline(
736 baseline_corr_integration_period,
737 raw_data_meas,
738 plot_dir,
739 trigger_window=trigger_window,
740 trigger_rate=40e6,
741 )
743 # Doesn't work for single ADC at the moment
744 # bcid_offset = compute_adc_felix_bcid_offset(frame_data, trigger_window)
745 # raw_data = add_full_adc_bcid(raw_data, bcid_offset).drop(columns_to_drop, strict=False)
747 raw_data_per_meas.append(raw_data_meas)
749 if len(alignment[0]) != 0 or baseline_corr_integration_period is not None:
750 raw_data = pl.concat(raw_data_per_meas, how="diagonal")
752 return raw_data, np.array(alignment)