Coverage for polars_analysis / frame_utils.py: 50%

264 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 17:11 -0400

1import logging 

2import sys 

3from pathlib import Path 

4from typing import List, Optional, cast 

5 

6import numpy as np 

7import polars as pl 

8import polars.selectors as cs 

9from numpy.lib.stride_tricks import sliding_window_view 

10 

11from polars_analysis import utils 

12from polars_analysis.analysis import constants 

13from polars_analysis.data_sources import DataSource, DeltaSource, SQLSource 

14 

15""" 

16Utilities for handling Frame data and merging it into samples dataframes. 

17Mostly written with trigger mode runs in mind so far. 

18""" 

19 

20# Instantiate logger 

21log = logging.getLogger(__name__) 

22 

23 

24def trim_df_to_shortest_array(df: pl.DataFrame) -> pl.DataFrame: 

25 """ 

26 Trim columns with list types to the shortest length to ensure they are all the same 

27 """ 

28 if df.is_empty(): 

29 log.error("Empty dataframe passed to trimming.") 

30 return df 

31 

32 df_temp = df.select( 

33 ( 

34 cs.by_dtype(pl.List(pl.Int16)) 

35 | cs.by_dtype(pl.List(pl.Int32)) 

36 | cs.by_dtype(pl.List(pl.Int64)) 

37 | cs.by_dtype(pl.List(pl.Float32)) 

38 | cs.by_dtype(pl.List(pl.Float64)) 

39 ) 

40 ) 

41 # Remove columns that are null, eg felix_evt_count, which was only added later 

42 list_columns = df_temp.select(c for c in df_temp if c.null_count() != c.len()).columns 

43 

44 try: 

45 width_min = df.select(pl.col(list_columns).list.len().min()).to_numpy().min() 

46 width_max = df.select(pl.col(list_columns).list.len().max()).to_numpy().max() 

47 except ValueError: 

48 print("Value error in trim_df_to_shortest_array") 

49 sys.exit(1) 

50 

51 # trim 

52 if width_min != width_max: 

53 log.warning(f"Trimming arrays with max legnth {width_max} to min {width_min}") 

54 df = df.with_columns( 

55 [pl.col(c).map_elements(lambda x: x[:width_min], return_dtype=pl.List(pl.Int64)) for c in list_columns] 

56 ) 

57 

58 return df 

59 

60 

61def check_bcid_alignment(frame_data: pl.DataFrame) -> np.ndarray: 

62 """ 

63 Do a faster check for frame misalignment assuming alignment stays the same over the whole run. 

64 Arbitrarily pick hi gain since hi and lo are always from the same chip 

65 and we need to avoid duplicating channel numbers for hi and lo. 

66 """ 

67 frame_data_temp = frame_data.filter(pl.col("gain") == "hi") 

68 # save, in case these are out of order 

69 channel_name_array = frame_data_temp["channel"].to_numpy().astype(np.int32) 

70 

71 # Check first three words to avoid case where BCID is rolling over 

72 adc_bcid_array = np.array([a[0:3] for a in frame_data_temp["adc_bcid"]]) 

73 # Median of the 3 columns as a reference 

74 reference_bcid = np.percentile(adc_bcid_array, 50, method="higher", axis=0) 

75 # Now the median of the diff is the alignment offset 

76 alignment = np.median(reference_bcid - adc_bcid_array, axis=1).astype(np.int32) 

77 

78 # Use first 32 words to check for bad bcid sequences 

79 bad_sequence = np.where(np.median(np.diff([idx[:32] for idx in frame_data_temp["adc_bcid"]], axis=1), axis=1) != 1) 

80 if len(bad_sequence[0]) > 0: 

81 log.warning(f"Channels {bad_sequence} don't follow normal BCID increments of 1.") 

82 

83 misaligned_channels = [channel_name_array[idx] for idx in np.where(alignment != 0)[0]] 

84 first_bad_idx = np.zeros(len(misaligned_channels)).astype(np.int32) 

85 shifts = alignment[alignment != 0] 

86 

87 return np.array( 

88 [ 

89 misaligned_channels, 

90 first_bad_idx, 

91 shifts, 

92 ] 

93 ) 

94 

95 

96def check_bcid_alignment_full(frame_data: pl.DataFrame) -> np.ndarray: 

97 """ 

98 Do a complete check over all samples for frame misalignment. 

99 Arbitrarily pick hi gain since hi and lo are always from the same chip 

100 and we need to avoid duplicating channel numbers for hi and lo. 

101 """ 

102 frame_data_temp = frame_data.filter(pl.col("gain") == "hi") 

103 # save, in case these are out of order 

104 channel_name_array = frame_data_temp["channel"].to_numpy() 

105 

106 adc_bcid_array = np.array([a for a in frame_data_temp["adc_bcid"]]) 

107 

108 reference_row = np.median(adc_bcid_array, axis=0) 

109 aligned = adc_bcid_array == reference_row 

110 

111 bad_sequence = np.where(np.median(np.diff(adc_bcid_array, axis=1), axis=1) != 1) 

112 if len(bad_sequence[0]) > 0: 

113 log.warning(f"Channels {bad_sequence} don't follow normal BCID increments of 1.") 

114 

115 misaligned_channels = np.array([]) 

116 first_bad_idx = [] 

117 shifts = [] 

118 

119 if not np.all(aligned): 

120 misaligned_adc_idx = np.where(~np.all(aligned, axis=1))[0] 

121 misaligned_channels = channel_name_array[misaligned_adc_idx] 

122 

123 for a in misaligned_adc_idx: 

124 idx = np.where(~aligned[a])[0][0] 

125 first_bad_idx.append(idx) 

126 

127 # shift is positive if the channel is behind the reference 

128 shift = np.int32(reference_row[idx] - adc_bcid_array[a][idx]) 

129 shifts.append(shift) 

130 

131 return np.array( 

132 [ 

133 misaligned_channels, 

134 first_bad_idx, 

135 shifts, 

136 ] 

137 ) 

138 

139 

140def align_row( 

141 input_list: pl.Series, shift: int, front: bool, cutoff: Optional[int] = None, trigger_window: Optional[int] = None 

142) -> pl.Series: 

143 """ 

144 Helper function for alignment per row 

145 

146 Args: 

147 input_list: The array to trim, e.g., samples 

148 shift: Number of samples to trim 

149 front: Trim the first samples, otherwise trim the last samples 

150 cutoff: max number of samples with full trigger windows. Not used for singleADC mode 

151 trigger_window: Size of trigger window. Distinguishes between trigger mode and single ADC 

152 

153 Returns: 

154 The rimmed array 

155 """ 

156 

157 # Single ADC 

158 # Shift the entire samples array as it is contiguous 

159 if trigger_window is None: 

160 if front: 

161 return input_list[np.abs(shift) :] 

162 else: 

163 return input_list[: -np.abs(shift)] 

164 # Trigger Mode 

165 # Cut into trigger_window chunks, and trim each window, then flatten back to full array 

166 else: 

167 if front: 

168 return pl.Series( 

169 (input_list.to_numpy()[:cutoff].reshape([-1, trigger_window])[:, np.abs(shift) :]).flatten() 

170 ) 

171 else: 

172 return pl.Series( 

173 (input_list.to_numpy()[:cutoff].reshape([-1, trigger_window])[:, : -np.abs(shift)]).flatten() 

174 ) 

175 

176 

177def align_adc_bcids(df: pl.DataFrame, alignment: np.ndarray, trigger_window: Optional[int] = None) -> pl.DataFrame: 

178 """ 

179 Assumes high and low gain channels always have the same alignment. 

180 All list[Int] columns will be shifted. 'felix_bcid' is handled separately. 

181 

182 Args: 

183 df: polars dataframe 

184 alignment: array of which channels to align and how many samples they're off 

185 trigger_window: needed for trigger mode runs to properly shift each window. Leave as None for singleADC 

186 

187 Returns: 

188 Aligned dataframe 

189 """ 

190 

191 log.info("Aligning frames") 

192 

193 (misaligned_channels, first_bad_idx, shifts) = alignment 

194 

195 if len(alignment[0]) == 0: 

196 log.info("Nothing to align!") 

197 return df 

198 else: 

199 if np.mean(first_bad_idx) != 0: 

200 log.error(f"Can't handle a shift in BCID alignment mid-data-taking: {first_bad_idx}") 

201 

202 unique_shifts = np.unique(shifts) 

203 if len(np.unique(shifts)) > 1: 

204 log.warning(f"More than one shift size: {unique_shifts}.") 

205 

206 # Columns that need shifting, separate to preserve type later 

207 # N.B. We do not want to shift the felix_bcid, as this will misalign the coputed full adc bcid. 

208 # We are shifting the adc bcid under the felix bcid, so we need to shift all rows (channels) the same, 

209 # but trimming the same as the originally correctly aligned rows 

210 list_columns16 = df.select(cs.by_dtype(pl.List(pl.Int16))).select(~cs.by_name("felix_bcid")).columns 

211 list_columns32 = df.select(cs.by_dtype(pl.List(pl.Int32))).select(~cs.by_name("felix_bcid")).columns 

212 list_columns64 = df.select(cs.by_dtype(pl.List(pl.Int64))).select(~cs.by_name("felix_bcid")).columns 

213 

214 list_columns16_global = df.select(cs.by_dtype(pl.List(pl.Int16))).select(cs.by_name("felix_bcid")).columns 

215 

216 # In case this is a dataframe with channels or gain as a list, we need to remove those 

217 for c in ["channel", "gain"]: 

218 for lc in [list_columns16, list_columns32, list_columns64]: 

219 if c in lc: 

220 lc.remove(c) 

221 

222 # We're going to iterate through unique shift sizes. 

223 # There is a more complex way to do this that would save more data by combining shifts in the same direction. 

224 for shift in unique_shifts: 

225 # Cutoff to make divisible by the trigger window. Needs to be computed here to update for multiple shifts. 

226 if trigger_window is not None: 

227 cutoff = np.floor(df["samples"][0].len() / trigger_window).astype(np.int32) * trigger_window 

228 else: 

229 cutoff = df["samples"][0].len() 

230 

231 misaligned_channels_for_shift = misaligned_channels[shifts == shift] 

232 

233 if shift > 0: 

234 # We need to remove samples from the missaligned data 

235 row_selection = pl.col("channel").is_in(misaligned_channels_for_shift) 

236 else: 

237 # We need to remove samples from the aligned data 

238 row_selection = ~pl.col("channel").is_in(misaligned_channels_for_shift) 

239 

240 df = df.with_columns( 

241 [ # First the special case of felix_bcid, where all channels are shifted the same since this is our 

242 # global reference. Same shift as ~row_selection below. 

243 pl.when(shift > 0) 

244 .then( 

245 pl.col(c).map_elements( 

246 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int16) 

247 ) 

248 ) 

249 .otherwise( 

250 pl.col(c).map_elements( 

251 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int16) 

252 ) 

253 ) 

254 for c in list_columns16_global 

255 ] 

256 + [ 

257 pl.when(row_selection) 

258 # abs() because we have changed row_selection above and need to remove entries 

259 .then( 

260 pl.col(c).map_elements( 

261 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int16) 

262 ) 

263 ) 

264 .otherwise( 

265 pl.col(c).map_elements( 

266 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int16) 

267 ) 

268 ) 

269 for c in list_columns16 

270 ] 

271 + [ 

272 pl.when(row_selection) 

273 # abs() because we have changed row_selection above and need to remove entries 

274 .then( 

275 pl.col(c).map_elements( 

276 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int32) 

277 ) 

278 ) 

279 .otherwise( 

280 pl.col(c).map_elements( 

281 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int32) 

282 ) 

283 ) 

284 for c in list_columns32 

285 ] 

286 + [ 

287 pl.when(row_selection) 

288 # abs() because we have changed row_selection above and need to remove entries 

289 .then( 

290 pl.col(c).map_elements( 

291 lambda x: align_row(x, shift, True, cutoff, trigger_window), return_dtype=pl.List(pl.Int64) 

292 ) 

293 ) 

294 .otherwise( 

295 pl.col(c).map_elements( 

296 lambda x: align_row(x, shift, False, cutoff, trigger_window), return_dtype=pl.List(pl.Int64) 

297 ) 

298 ) 

299 for c in list_columns64 

300 ] 

301 ) 

302 if trigger_window is not None: 

303 df = df.with_columns(pl.lit(trigger_window - np.abs(shift)).alias("trigger_window")) 

304 # Update trigger window for case of multiple shifts 

305 trigger_window = trigger_window - np.abs(shift) 

306 

307 return df 

308 

309 

310def unpack_frame_data( 

311 frame_data: pl.DataFrame, 

312 swap_frame18: bool = False, 

313 skip_channels_lo: Optional[List[int]] = None, 

314 skip_channels_hi: Optional[List[int]] = None, 

315 drop_missing_frame_chs: bool = True, 

316) -> pl.DataFrame: 

317 """ 

318 Unpack the frame data 

319 """ 

320 

321 if swap_frame18: # old data had a bug in it 

322 frame1, frame8 = "frame8", "frame1" 

323 else: 

324 frame1, frame8 = "frame1", "frame8" 

325 

326 df = ( 

327 frame_data.with_columns( 

328 pl.col("adc") 

329 # Need to subtract 1 to get 0-based ADC counting for channel 

330 .map_elements(lambda x: [(x - 1) * 4 + i for i in range(4)], return_dtype=pl.List(pl.Int32)) 

331 .alias("channel"), 

332 pl.lit(["hi", "lo"]).alias("gain"), 

333 ) 

334 # need to do this here so we can split ADCs across lpGBTs 

335 .explode(pl.col("channel")) 

336 .explode(pl.col("gain")) 

337 .with_columns( 

338 # ADCs read by single lpGBT 

339 pl.when( 

340 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 1)) | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 2)) 

341 ) 

342 .then(pl.col(frame8)) 

343 # ADCs read by single lpGBT 

344 .when( 

345 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 0)) | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 1)) 

346 ) 

347 .then(pl.col(frame1)) 

348 # ADCs readout by two lpGBTs 

349 .when( 

350 ( 

351 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 2)) 

352 | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 0)) 

353 ) 

354 & ((pl.col("adc") * 4 - pl.col("channel")) <= 2) 

355 ) 

356 .then(pl.col(frame8)) 

357 .when( 

358 ( 

359 ((pl.col("adc") <= 16) & (pl.col("adc") % 3 == 2)) 

360 | ((pl.col("adc") >= 17) & (pl.col("adc") % 3 == 0)) 

361 ) 

362 & ((pl.col("adc") * 4 - pl.col("channel")) >= 3) 

363 ) 

364 .then(pl.col(frame1)) 

365 .alias("merged_frame") 

366 ) 

367 ) 

368 

369 # Polars lacks funtionality in the bitwise world 

370 adc_bcid = np.bitwise_and(np.right_shift(df["merged_frame"].to_numpy(), 8), 0x1F) 

371 adc_first = np.bitwise_and(df["merged_frame"].to_numpy(), 0x1 << 13) 

372 df = df.with_columns( 

373 pl.Series(name="adc_bcid", values=list(adc_bcid)).cast(pl.List(pl.Int64)), 

374 pl.Series(name="adc_first", values=list(adc_first)).cast(pl.List(pl.Int64)), 

375 ) 

376 

377 if skip_channels_lo: 

378 df = df.filter(~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo)))) 

379 if skip_channels_hi: 

380 df = df.filter(~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi)))) 

381 

382 # Check for empty frame data 

383 df_temp = df.filter((pl.col("adc_bcid").list.mean() == 0) | (pl.col("adc_bcid").list.mean() == 0x1F)) 

384 if len(df_temp) > 0: 

385 log.warning(f"Channels {df_temp['channel'].to_numpy()} have empty frame data") 

386 

387 if drop_missing_frame_chs: 

388 log.warning("Dropping these channels.") 

389 log.warning("""It is very important that your samples data channels have the correct number corresponding\ 

390 to each ADC. Otherwise, the wrong channels will be shifted. 

391 Are you running on old data? It is very likely frame8 and frame1 were swapped, breaking the correspondance here. 

392Try using --swap-frame18""") 

393 df = df.filter(~((pl.col("adc_bcid").list.mean() == 0) | (pl.col("adc_bcid").list.mean() == 0x1F))) 

394 

395 return df 

396 

397 

398def unwrap_frame_data( 

399 frame_data: pl.DataFrame, 

400 swap_frame18: bool = False, 

401 skip_channels_lo: Optional[List[int]] = None, 

402 skip_channels_hi: Optional[List[int]] = None, 

403 drop_missing_frame_chs: bool = True, 

404) -> pl.DataFrame: 

405 """ 

406 Compute BCIDs and return a dataframe that can be 

407 merged with per channel data to append frame data 

408 """ 

409 # Trim arrays in DF if needed 

410 df = trim_df_to_shortest_array(frame_data) 

411 

412 df = unpack_frame_data( 

413 df, swap_frame18, skip_channels_lo, skip_channels_hi, drop_missing_frame_chs=drop_missing_frame_chs 

414 ) 

415 

416 # ADC alignment check 

417 alignment = check_bcid_alignment(df) 

418 

419 if len(alignment[0]) != 0: 

420 log.warning( 

421 f"""Misaligned ADC BCIDs found on channels (zero counting): {alignment[0]} 

422 starting at indices: {alignment[1]} shifted by: {alignment[2]}""" 

423 ) 

424 

425 # To handle change in column names 

426 felix_names = [] 

427 if "felix_evt_count" in df.columns: 

428 felix_names.append("felix_evt_count") 

429 if "felix_event_count" in df.columns: 

430 felix_names.append("felix_event_count") 

431 if "felix_bcid" in df.columns: 

432 felix_names.append("felix_bcid") 

433 if "trigger_rate" in df.columns: 

434 felix_names.append("trigger_rate") 

435 

436 # Drop some columns 

437 df = df.select( 

438 [ 

439 pl.col("measurement"), 

440 pl.col("gain"), 

441 pl.col("channel"), 

442 pl.col("board_id"), 

443 pl.col("run_number"), 

444 pl.col("adc"), 

445 pl.col(felix_names), 

446 pl.col("merged_frame"), 

447 pl.col("adc_bcid"), 

448 pl.col("adc_first"), 

449 # pl.col("adc"), 

450 # pl.col("frame1"), 

451 # pl.col("frame8"), 

452 ] 

453 ) 

454 

455 return df 

456 

457 

458def merge_samples_frame_data(samples_data: pl.DataFrame, frame_data: pl.DataFrame) -> pl.DataFrame: 

459 df = samples_data.join(frame_data, on=["run_number", "measurement", "channel", "gain", "board_id"], how="left") 

460 

461 return df 

462 

463 

464def compute_adc_felix_bcid_offset(frame_data: pl.DataFrame, trigger_window: int = 128) -> int: 

465 # Need to find a trigger window where both the Felix and ADC have a BCR 

466 # Assuming everything is aligned, so we just need one example from each 

467 adc_bcid = frame_data["adc_bcid"][0].to_numpy() 

468 felix_bcid = frame_data["felix_bcid"][0].to_numpy() 

469 

470 # Make divisible by the trigger window 

471 cutoff = np.floor(len(adc_bcid) / trigger_window).astype(np.int32) * trigger_window 

472 

473 # Reshape into packets 

474 adc_bcid_packets = adc_bcid[:cutoff].reshape(-1, trigger_window) 

475 felix_bcid_packets = felix_bcid[:cutoff].reshape(-1, trigger_window) 

476 

477 adc_bcr = np.where((np.diff(adc_bcid_packets) != 1) & (np.diff(adc_bcid_packets) != -31)) 

478 felix_bcr = np.where((np.diff(felix_bcid_packets) != 1)) 

479 

480 common_packet = -1 

481 for idx in felix_bcr[0]: 

482 if idx in adc_bcr[0]: 

483 common_packet = idx 

484 break 

485 if common_packet == -1: 

486 log.warning("Could not find packet with overlapping BCR!") 

487 return -9999 

488 

489 adc_index = np.where(adc_bcr[0] == common_packet)[0][0] 

490 felix_index = np.where(felix_bcr[0] == common_packet)[0][0] 

491 adc_packet_index = adc_bcr[1][adc_index] 

492 felix_packet_index = felix_bcr[1][felix_index] 

493 

494 return int(felix_packet_index - adc_packet_index) 

495 

496 

497def add_full_adc_bcid(df: pl.DataFrame, bcid_offset: int) -> pl.DataFrame: 

498 """ 

499 Add full, unwrapped, BCID by adding the bcid_offset to the felix_bcid 

500 """ 

501 

502 return df.with_columns( 

503 pl.col("felix_bcid") 

504 .map_elements(lambda x: (x.to_numpy() + bcid_offset) % (constants.N_BCID), return_dtype=pl.List(pl.Int64)) 

505 .alias("adc_full_bcid") 

506 ) 

507 

508 

509def correct_baseline( 

510 baseline_corr_integration_period: float, 

511 raw_data: pl.DataFrame, 

512 plot_dir: Optional[Path], 

513 trigger_window: int = 128, 

514 trigger_rate: float = constants.FELIX_TRIGGER_RATE[1], # Hz 

515 bnl_data: bool = False, 

516) -> pl.DataFrame: 

517 """ 

518 Correct variations in the pedestal over an integration period vs the mean over the whole run. 

519 Assumed to be run after alignment. 

520 

521 Args: 

522 baseline_corr_integration_period: time period in seconds to attempt to integrate over 

523 raw_data: raw dataframe to correct 

524 plot_dir: directory for adding run_info 

525 trigger_window: size of trigger window in samples, overwritten if DF has this metadata 

526 trigger_rate: trigger rate in Hz, overwritten if DF has this metadata 

527 bnl_data: if BNL data the trigger rate is different 

528 

529 

530 Returns: 

531 pl.DataFrame: The data frame with corrected samples replacing the raw samples 

532 

533 """ 

534 

535 # Overwrite input arguments with values from metadata if it exists 

536 if "trigger_window" in raw_data.columns: 

537 trigger_window_meta = raw_data.select(pl.col("trigger_window").first()).item() 

538 if trigger_window_meta is not None and trigger_window_meta > 0 and trigger_window_meta != trigger_window: 

539 log.warning(f"Trigger window {trigger_window} overridden by window in metadata {trigger_window_meta}") 

540 trigger_window = trigger_window 

541 

542 if "trigger_rate" in raw_data.columns: 

543 trigger_rate_int = raw_data.select(pl.col("trigger_rate").first()).item() 

544 run_number = raw_data.select(pl.col("run_number").first()).item() 

545 if trigger_rate_int is not None and trigger_rate_int > 0: 

546 trigger_rate_meta = constants.felix_trigger_rate(run_number, trigger_rate_int, bnl_data) 

547 

548 if trigger_rate_meta != trigger_rate: 

549 log.warning(f"Trigger rate {trigger_rate} overridden by rate in metadata {trigger_rate_meta}") 

550 trigger_rate = trigger_rate_meta 

551 

552 n_trigger_windows = len(raw_data["samples"][0]) / trigger_window 

553 

554 if trigger_rate is None or trigger_window is None or trigger_rate < 0 or trigger_window < 1: 

555 log.error(f"Need the {trigger_rate=} and {trigger_window=} to perform baseline correction.") 

556 sys.exit() 

557 

558 trigger_window_period = trigger_window * 25e-9 # trigger window period in seconds 

559 trigger_period = 1 / trigger_rate 

560 

561 # Find cloest time period to baseline_corr_integration_period 

562 min_idx: int 

563 if np.abs(trigger_window_period - baseline_corr_integration_period) < np.abs( 

564 trigger_period - baseline_corr_integration_period 

565 ): 

566 log.info( 

567 f"Closest integration period for baseline correction to {baseline_corr_integration_period:.04f} s" 

568 f" is integrating a single trigger window {trigger_window_period:.04f} s" 

569 ) 

570 min_idx = 1 

571 integration_period = trigger_window_period 

572 else: 

573 # allows up to 5s at fastest trigger rate 

574 # [0, 0, 1, ...] we need to integrate 2 windows to get the trigger period, so index 2 

575 trigger_period_multiples = trigger_period * np.concatenate([[0, 0], np.arange(1, 25000, 1)]) 

576 min_idx = int(np.abs(baseline_corr_integration_period - trigger_period_multiples).argmin()) 

577 

578 if min_idx >= 25000: 

579 log.error( 

580 "The requested integration period is longer than the hardcoded max of 25000 trigger windows. " 

581 "No correction applied." 

582 ) 

583 return raw_data 

584 

585 if min_idx >= n_trigger_windows: 

586 log.error("The requested integration period is longer than the run. No correction applied") 

587 return raw_data 

588 

589 integration_period = trigger_period_multiples[min_idx] 

590 log.info( 

591 f"Closest integration period for baseline correction to {baseline_corr_integration_period:.04f} s" 

592 f" is integrating {min_idx} trigger windows {integration_period:.04f} s and there are {n_trigger_windows=}" 

593 ) 

594 # Mean per row of samples as stable baseline to correct to 

595 # window_averages = averages over N trigger windows * trigger window samples 

596 raw_data = raw_data.with_columns(pl.col("samples").list.mean().alias("mean")).with_columns( 

597 pl.col("samples") 

598 .map_elements( 

599 lambda x: ( 

600 np.mean(sliding_window_view(x, min_idx * trigger_window)[:: min_idx * trigger_window], axis=1).reshape( 

601 # For picking out only some samples per trigger window 

602 # np.mean(sliding_window_view(x, min_idx * trigger_window)[:: min_idx * trigger_window].reshape(-1, trigger_window)[:,0:4].reshape(-1,min_idx*4), axis=1).reshape( #noqa E501 

603 -1, 

604 1, 

605 ) 

606 * np.ones(min_idx * trigger_window) 

607 ).flatten(), 

608 return_dtype=pl.List(pl.Float64), 

609 ) 

610 .alias("window_averages") 

611 ) 

612 # sliding_window_view can trim the data, so we need to keep the other columns in sync 

613 raw_data = trim_df_to_shortest_array(raw_data) 

614 # Overwrite samples with corrected samples 

615 raw_data = raw_data.with_columns( 

616 ((pl.col("samples") - pl.col("window_averages")) + pl.col("mean")).alias("samples") 

617 ) 

618 

619 if plot_dir is not None: 

620 for board_id in raw_data["board_id"].unique().to_list(): 

621 utils.add_run_info( 

622 "Baseline drift correction timescale", f"{integration_period:.04f} s", board_id, plot_dir, True 

623 ) 

624 

625 return raw_data 

626 

627 

628def check_and_align_frames_wrapper( 

629 loader: DataSource, 

630 run_number: int, 

631 swap_frame18: bool, 

632 require_positive: bool = False, 

633 require_unsaturated: bool = False, 

634 require_nonempty: bool = True, 

635 baseline_corr_integration_period: Optional[float] = None, 

636 plot_dir: Optional[Path] = None, 

637 do_alignment: bool = True, 

638 bnl_data: bool = False, 

639) -> tuple[pl.DataFrame, np.ndarray]: 

640 is_nevis_data = False 

641 if isinstance(loader, SQLSource): 

642 is_nevis_data = True 

643 elif isinstance(loader, DeltaSource): # DeltaSource or ParquetSource 

644 is_nevis_data = "flx-srv-atlas" in str(loader.raw_dir).lower() 

645 

646 if is_nevis_data and run_number < 2046 and not swap_frame18: 

647 log.warning(f"Run number {run_number} is older than ~2046, swap_frame18 likely needs to be set!") 

648 

649 columns_to_drop = [ 

650 "adc", 

651 "adc_bcid", # keep for plotting vs ADC bcid, ie comment out 

652 "adc_first", 

653 "felix_bcid", 

654 "merged_frame", 

655 "trigger_rate_right", 

656 ] 

657 

658 raw_data = loader.load_raw_data( 

659 run_number, 

660 require_positive=require_positive, 

661 require_unsaturated=require_unsaturated, 

662 require_nonempty=require_nonempty, 

663 ) 

664 frame_data = loader.load_frame_data(run_number) 

665 

666 # Flag for trigger vs single ADC mode 

667 is_triggerMode = raw_data["measurement"].unique().len() == 1 

668 

669 if is_triggerMode: 

670 # Can do it all in one go 

671 frame_data = unwrap_frame_data(frame_data, swap_frame18, drop_missing_frame_chs=do_alignment) 

672 alignment = check_bcid_alignment(frame_data) 

673 

674 trigger_window = -1 

675 if raw_data["trigger_window"][0] is not None and raw_data["trigger_window"][0] > 0: 

676 trigger_window = raw_data["trigger_window"][0] 

677 elif (fec := frame_data["felix_event_count"][0]) is not None: 

678 trigger_window = len(np.where(fec == 0)[0]) 

679 

680 raw_data = merge_samples_frame_data(raw_data, frame_data) 

681 

682 # Check if frames are already aligned 

683 if len(alignment[0]) == 0: 

684 log.info("Frames already aligned") 

685 elif do_alignment: 

686 log.info("Aligning frames") 

687 # prune columns to be consistent with single adc for now 

688 raw_data = align_adc_bcids(raw_data, alignment, trigger_window) 

689 else: 

690 log.info("Frames not aligned, but do_alignment is False.") 

691 alignment = np.array([[], [], []]) 

692 

693 # perform baseline correction 

694 if baseline_corr_integration_period is not None: 

695 raw_data = correct_baseline(baseline_corr_integration_period, raw_data, plot_dir, bnl_data) 

696 

697 bcid_offset = compute_adc_felix_bcid_offset(frame_data, trigger_window) 

698 raw_data = add_full_adc_bcid(raw_data, bcid_offset).drop(columns_to_drop, strict=False) 

699 

700 else: # single ADC 

701 # Need to loop through measurments since they can have independent lengths 

702 

703 raw_data_per_meas = [] 

704 alignment = np.array([[], [], []]) 

705 for meas in raw_data["measurement"].unique().to_list(): 

706 frame_data_meas = unwrap_frame_data(frame_data.filter(pl.col("measurement") == meas), swap_frame18) 

707 alignment_temp = check_bcid_alignment(frame_data_meas) 

708 

709 raw_data_meas = raw_data.filter(pl.col("measurement") == meas) 

710 raw_data_meas = merge_samples_frame_data(raw_data_meas, frame_data_meas) 

711 

712 # Check if frames are already aligned 

713 if len(alignment_temp[0]) == 0: 

714 log.info(f"Measurement {meas} frames already aligned") 

715 elif do_alignment: 

716 log.info(f"Aligning frames for measurement {meas}") 

717 

718 # Need to prune columns since if no alignment is needed these don't get added to that measurement 

719 raw_data_meas = align_adc_bcids(raw_data_meas, alignment_temp).drop(columns_to_drop, strict=False) 

720 alignment = np.concatenate([alignment, alignment_temp], axis=1).astype(np.int32) 

721 else: 

722 log.info(f"Measurement {meas} frames not aligned, but do_alignment is False.") 

723 

724 # perform baseline correction 

725 if baseline_corr_integration_period is not None: 

726 log.warning("Running the baseline pedestal drift correction on single ADC mode runs is a little janky") 

727 

728 # For purposes of chunking the data, make 'trigger windows' to match the requested integration period. 

729 # If that's larger than the data we have, take the length of the data. 

730 trigger_window_requested = np.floor(baseline_corr_integration_period / 25e-9) 

731 max_samples = np.floor(cast(int, raw_data_meas["samples"].list.len().min())) 

732 

733 trigger_window = int(np.min([trigger_window_requested, max_samples])) 

734 log.info(f"Splitting the single ADC data into {trigger_window} sample chunks") 

735 raw_data_meas = correct_baseline( 

736 baseline_corr_integration_period, 

737 raw_data_meas, 

738 plot_dir, 

739 trigger_window=trigger_window, 

740 trigger_rate=40e6, 

741 ) 

742 

743 # Doesn't work for single ADC at the moment 

744 # bcid_offset = compute_adc_felix_bcid_offset(frame_data, trigger_window) 

745 # raw_data = add_full_adc_bcid(raw_data, bcid_offset).drop(columns_to_drop, strict=False) 

746 

747 raw_data_per_meas.append(raw_data_meas) 

748 

749 if len(alignment[0]) != 0 or baseline_corr_integration_period is not None: 

750 raw_data = pl.concat(raw_data_per_meas, how="diagonal") 

751 

752 return raw_data, np.array(alignment)