Coverage for polars_analysis / frame.py: 59%

179 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import concurrent.futures 

2import logging 

3import multiprocessing as mp 

4import os 

5import sys 

6import traceback 

7from concurrent.futures import ProcessPoolExecutor 

8from copy import deepcopy 

9from pathlib import Path 

10from typing import List, Literal, Optional 

11 

12import matplotlib 

13import numpy as np 

14import polars as pl 

15 

16matplotlib.use("agg") 

17import scipy.signal as sps 

18from matplotlib import pyplot as plt 

19 

20import polars_analysis.plotting.frame_plotting as frame_plotting 

21import polars_analysis.plotting.pedestal_plotting as pedestal_plotting 

22from polars_analysis import frame_utils 

23from polars_analysis.analysis import constants, pedestal_analysis 

24from polars_analysis.data_sources import DataSource 

25from polars_analysis.plotting.helper import Metadata 

26 

27""" 

28High level functions for frame analysis 

29""" 

30 

31# Instantiate logger 

32log = logging.getLogger(__name__) 

33 

34 

35def scan_for_misalignment( 

36 loader: DataSource, 

37 min_run_number: int, 

38 max_run_number: int, 

39 plot_dir: Path, 

40 reject_single_adc: bool = True, 

41 test_channel: int = 7, 

42) -> None: 

43 """ 

44 Function to scan over all runs in the range of min to max run number for 

45 misaligned frames. Produces plots of misaligned channels per board. 

46 Previously scan_misalignment.py 

47 """ 

48 old_run_cutoff = 2046 

49 

50 if min_run_number > max_run_number: 

51 log.error("Min run number is larger than max run number") 

52 sys.exit(1) 

53 

54 if min_run_number < old_run_cutoff and max_run_number > old_run_cutoff: 

55 log.error( 

56 "Run number range stradles 2046 cutoff between bugfix of frame data. These need to be run over and handled separately." # noqa: E501 

57 ) 

58 sys.exit(1) 

59 

60 # old runs have swapped frame data 

61 old_runs = max_run_number < old_run_cutoff 

62 

63 df_run_numbers = loader.get_runs_list() 

64 

65 run_numbers = ( 

66 df_run_numbers.filter( 

67 pl.col("meas_type") == "pedestal", 

68 pl.col("run_number") <= max_run_number, 

69 pl.col("run_number") >= min_run_number, 

70 ) 

71 .select(pl.col("run_number")) 

72 .to_series() 

73 .to_list() 

74 ) 

75 

76 df = loader.load_frame_data(*run_numbers, reject_single_adc=reject_single_adc, non_empty=True) 

77 # Filter away trigger runs if needed 

78 if not reject_single_adc: 

79 run_numbers = ( 

80 df.group_by(pl.col("run_number")) 

81 .agg(pl.col("measurement").unique().len() > 1) 

82 .filter(pl.col("measurement"))["run_number"] 

83 .to_list() 

84 ) 

85 df = df.filter(pl.col("run_number").is_in(run_numbers)) 

86 

87 runs_to_test = np.unique(df.select(pl.col("run_number")).to_series().to_list()) 

88 

89 data = [] 

90 

91 for rn in runs_to_test: 

92 print(f"Processing run: {rn}") 

93 df_temp = df.filter(pl.col("run_number") == rn) 

94 if len(df_temp) != 32: 

95 print(f"Skipping run {rn} as it has {len(df_temp)} != 32 rows of ADC data") 

96 continue 

97 

98 # Need to loop over measurements to handle single ADC runs 

99 a = np.array([[], [], []], dtype=np.int32) 

100 for meas in df_temp["measurement"].unique().to_list(): 

101 df_temp_meas = df_temp.filter(pl.col("measurement") == meas) 

102 df_temp_meas = frame_utils.trim_df_to_shortest_array(df_temp_meas) 

103 df_temp_meas = frame_utils.unpack_frame_data(df_temp_meas, old_runs) 

104 a = np.concatenate([a, frame_utils.check_bcid_alignment(df_temp_meas)], axis=1) 

105 

106 data.append((int(rn), int(df_temp["board_id"][0]), a[0].astype(np.int64))) 

107 

108 data_dict = dict() 

109 data_dict["run_number"], data_dict["board_id"], data_dict["misalign_channels"] = list(zip(*data)) 

110 

111 df_alignment_check = pl.from_dict( 

112 data_dict, 

113 schema={"run_number": pl.Int64, "board_id": pl.Int64, "misalign_channels": pl.List(pl.Int64)}, 

114 strict=False, 

115 ) 

116 

117 df_gb = ( 

118 df_alignment_check.filter(pl.col("misalign_channels").list.len() > 0) 

119 .group_by(pl.col("board_id")) 

120 .agg(pl.col("misalign_channels").explode()) 

121 ) 

122 

123 # print out some info for runs that have misaligned channel X 

124 print(f"Runs with ch {test_channel} misaligned") 

125 for r in ( 

126 df_alignment_check.filter(pl.col("misalign_channels").list.contains(test_channel)) 

127 .select(pl.col("board_id"), pl.col("run_number").cast(pl.Int32)) 

128 .join(df, on="run_number", how="left") 

129 .select(pl.col(["run_number", "board_id", "timestamp"])) 

130 .group_by(pl.col("run_number")) 

131 .first() 

132 .sort(pl.col("run_number")) 

133 .iter_rows() 

134 ): 

135 print(r[0], r[1], r[2].strftime("%Y-%m-%d %H:%M:%S")) 

136 

137 for board in df_gb["board_id"].to_list(): 

138 print(f"plotting {board}") 

139 n, _ = np.histogram(df_gb.filter(pl.col("board_id") == board)["misalign_channels"].to_list(), bins=range(129)) 

140 

141 n_runs = len(df_alignment_check.filter(pl.col("board_id") == board)) 

142 # denom = np.ones(len(n)) * n_runs # noqa: F841 

143 

144 plt.figure(figsize=(10, 8)) 

145 plt.bar(range(128), n, fill=False) # /denom 

146 plt.title( 

147 f"Channels with misalignment\nBoard ID = {board}, Runs in {min_run_number}--{max_run_number}, N Runs Total = {n_runs}" # noqa: E501 

148 ) 

149 plt.ylabel("N Misaligned Runs") 

150 plt.xlabel("Channel") 

151 plt.xticks(np.arange(0, 128, 4), rotation=70) 

152 plt.tight_layout() 

153 plt.grid(True) 

154 if old_runs: 

155 plt.savefig(plot_dir / f"ch_misalign_freq_{board}_oldruns.png") 

156 else: 

157 plt.savefig(plot_dir / f"ch_misalign_freq_{board}.png") 

158 plt.cla() 

159 plt.clf() 

160 

161 # todo make plots that are just empty instead 

162 print(f"Aligned boards: {np.setdiff1d(np.unique(data_dict['board_id']), df_gb['board_id'].to_list())}") 

163 

164 

165def plot_extended_readout( 

166 raw_data: pl.DataFrame, 

167 run_number: int, 

168 plot_dir_base: Path, 

169 trigger_window: int = 128, 

170 trigger_rate: float = constants.FELIX_TRIGGER_RATE[1], # Hz 

171 skip_channels_lo: Optional[List[int]] = None, 

172 skip_channels_hi: Optional[List[int]] = None, 

173 bnl_data: bool = False, 

174): 

175 plot_adc_bcid = False # check columns_to_drop (need to keep adc_bcid) in check_and_align_frames_wrapper 

176 # when setting this to True 

177 

178 board_id = raw_data.select(pl.col("board_id").first()).item() 

179 pas_mode = raw_data.select(pl.col("pas_mode").first()).item() 

180 

181 # Overwrite input arguments with values from metadata if it exists 

182 if "trigger_window" in raw_data.columns: 

183 trigger_window_meta = raw_data.select(pl.col("trigger_window").first()).item() 

184 if trigger_window_meta is not None and trigger_window_meta > 0 and trigger_window_meta != trigger_window: 

185 log.warning(f"Trigger window {trigger_window} overridden by window in metadata {trigger_window_meta}") 

186 trigger_window = trigger_window 

187 if "trigger_rate" in raw_data.columns: 

188 trigger_rate_int = raw_data.select(pl.col("trigger_rate").first()).item() 

189 if trigger_rate_int is not None and trigger_rate_int > 0: 

190 trigger_rate_meta = constants.felix_trigger_rate(run_number, trigger_rate_int, bnl_data) 

191 

192 if trigger_rate_meta != trigger_rate: 

193 log.warning(f"Trigger rate {trigger_rate} overridden by rate in metadata {trigger_rate_meta}") 

194 trigger_rate = trigger_rate_meta 

195 

196 plot_dir = Path(plot_dir_base / f"run{run_number}/extended_readout/") 

197 if not plot_dir.exists(): 

198 plot_dir.mkdir(parents=True, exist_ok=True) 

199 os.chmod(plot_dir, 0o775) 

200 plot_dir_filled = len([p for p in plot_dir.glob("*png")]) > 0 

201 

202 # Pair down a few columns 

203 raw_data = raw_data.select( 

204 pl.col( 

205 [ 

206 "board_id", 

207 "gain", 

208 "channel", 

209 "samples", 

210 "measurement", 

211 "felix_event_count", 

212 "awg_amp", 

213 "att_val", 

214 "pas_mode", 

215 "run_number", 

216 "adc_full_bcid", 

217 ] 

218 + (["adc_bcid"] if plot_adc_bcid else []) # needed to plot vs adc_bcid below 

219 ) 

220 ) 

221 

222 # Hacking in ADC_BCID when BCR is held high 

223 # if plot_adc_bcid: 

224 # raw_data = raw_data.with_columns( pl.lit([i%32 for i in range(1957246)]).alias("adc_bcid")) 

225 

226 if skip_channels_lo: 

227 raw_data = raw_data.filter(~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo)))) 

228 if skip_channels_hi: 

229 raw_data = raw_data.filter(~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi)))) 

230 

231 # Sort by channels to preserve order 

232 raw_data.sort(pl.col("channel")) 

233 

234 gains: List[Literal["lo", "hi"]] = ["lo", "hi"] 

235 for gain in gains: 

236 raw_data_g = raw_data.filter(pl.col("gain") == gain) 

237 info_g = Metadata.fill_from_dataframe(raw_data_g) 

238 channels = raw_data_g["channel"].unique().sort().to_list() 

239 

240 cutoff = np.floor(len(raw_data_g["felix_event_count"][0]) / trigger_window).astype(np.int32) * trigger_window 

241 

242 raw_data_g = raw_data_g.with_columns( 

243 pl.col("samples") 

244 .map_elements( 

245 lambda x: x[:cutoff].to_numpy().reshape(-1, trigger_window).mean(axis=1), 

246 return_dtype=pl.List(pl.Float64), 

247 ) 

248 .alias("trig_window_mean"), 

249 pl.col("samples") 

250 .map_elements( 

251 lambda x: x[:cutoff].to_numpy().reshape(-1, trigger_window).std(axis=1), 

252 return_dtype=pl.List(pl.Float64), 

253 ) 

254 .alias("trig_window_std"), 

255 ) 

256 

257 # Mean and std vs time 

258 mean_y = np.array([i for i in raw_data_g["trig_window_mean"]]) 

259 std_y = np.array([i for i in raw_data_g["trig_window_std"]]) 

260 time_x = np.arange(len(mean_y[0])) / trigger_rate 

261 std_samples = raw_data_g["samples"].list.std().to_numpy() 

262 

263 # FFT of mean vs time 

264 freq, psd = sps.welch(mean_y, fs=trigger_rate, axis=1, nperseg=len(mean_y[0]) // 4) 

265 

266 adc_full_bcid = raw_data_g["adc_full_bcid"] 

267 x_bcid = np.arange(constants.N_BCID) 

268 

269 # Correlation Matrix 

270 measurements = raw_data_g["measurement"].unique().to_list() 

271 matrix = pedestal_analysis.calc_correlation_matrix(raw_data_g, measurements, gain, "trig_window_mean") 

272 min_channel: int = raw_data_g.select(pl.col("channel").min()).item() 

273 n_channels: int = raw_data_g.select(pl.col("channel").unique().count()).item() 

274 pedestal_plotting.plot_correlation_matrix( 

275 matrix, gain, min_channel, n_channels, plot_dir, board_id=board_id, pas_mode=pas_mode 

276 ) 

277 

278 log.info(f"Plotting {gain} gain extended readout figures") 

279 frame_plotting.plot_mean_v_time(time_x, mean_y, plot_dir, info=info_g) 

280 

281 frame_plotting.plot_60hz_power(freq, psd, plot_dir, info=info_g, channels=channels) 

282 

283 pedestal_plotting.plot_fft2d( 

284 freq.tolist(), 

285 psd, 

286 plot_dir, 

287 channels=channels, 

288 gain=gain, 

289 run_number=run_number, 

290 board_id=str(board_id), 

291 pas_mode=pas_mode, 

292 unit="Hz", 

293 ) 

294 

295 # only used if plot_adc_bcid == True 

296 x_adc_bcid = np.arange(32) 

297 freq_adc_all = np.array([]) 

298 psd_adc_all = [] 

299 h_adc_bcid_sum = np.zeros(32) 

300 h_adc_bcid_weighted_sum = np.zeros(32) 

301 

302 with ProcessPoolExecutor(mp_context=mp.get_context("spawn")) as executor: 

303 job_handles = dict() 

304 for index, channel in enumerate(channels): 

305 # Need to copy info for each channel or else parallel plotting might have wrong labels 

306 info_ch = deepcopy(info_g) 

307 info_ch.channels = channel 

308 

309 h_bcid = np.histogram(adc_full_bcid[index], bins=np.arange(constants.N_BCID + 1)) 

310 

311 h_weighted = np.histogram( 

312 adc_full_bcid[index], 

313 bins=np.arange(constants.N_BCID + 1), 

314 weights=raw_data_g["samples"][index], 

315 ) 

316 

317 freq_bcid, psd_bcid = sps.welch(h_weighted[0] / h_bcid[0], fs=1, nperseg=constants.N_BCID) 

318 

319 # Internal ADC BCID 

320 if plot_adc_bcid: 

321 adc_internal_bcid = raw_data_g["adc_bcid"] 

322 h_adc_bcid = np.histogram(adc_internal_bcid[index], bins=np.arange(33)) 

323 h_adc_weighted = np.histogram( 

324 adc_internal_bcid[index], 

325 bins=np.arange(33), 

326 weights=raw_data_g["samples"][index], 

327 ) 

328 h_adc_bcid_sum += h_adc_bcid[0] 

329 h_adc_bcid_weighted_sum += h_adc_weighted[0] 

330 

331 freq_adc_bcid, psd_adc_bcid = sps.welch(h_adc_weighted[0] / h_adc_bcid[0], fs=1, nperseg=33) 

332 freq_adc_all = freq_adc_bcid.astype(np.ndarray) 

333 psd_adc_all.append(psd_adc_bcid) 

334 

335 job_handles[ 

336 executor.submit( 

337 frame_plotting.plot_mean_v_bcid, 

338 channel, 

339 x_adc_bcid, 

340 h_adc_bcid[0], 

341 h_adc_weighted[0], 

342 plot_dir, 

343 info_ch, 

344 ) 

345 ] = "plot_mean_v_bcid" 

346 

347 job_handles[ 

348 executor.submit( 

349 frame_plotting.plot_fft_mean_v_bcid, 

350 channel, 

351 freq_adc_bcid, 

352 psd_adc_bcid, 

353 plot_dir, 

354 info_ch, 

355 ) 

356 ] = "plot_fft_mean_v_bcid" 

357 else: 

358 job_handles[ 

359 executor.submit( 

360 frame_plotting.plot_mean_v_bcid, 

361 channel, 

362 x_bcid, 

363 h_bcid[0], 

364 h_weighted[0], 

365 plot_dir, 

366 info_ch, 

367 ) 

368 ] = "plot_mean_v_bcid" 

369 

370 job_handles[ 

371 executor.submit( 

372 frame_plotting.plot_fft_mean_v_bcid, 

373 channel, 

374 freq_bcid, 

375 psd_bcid, 

376 plot_dir, 

377 info_ch, 

378 ) 

379 ] = "plot_fft_mean_v_bcid" 

380 

381 # job_handles[ 

382 # executor.submit( 

383 # frame_plotting.plot_adc_full_bcid, 

384 # channel, 

385 # h_bcid, 

386 # plot_dir, 

387 # info_ch, 

388 # ) 

389 # ] = "plot_adc_full_bcid" 

390 

391 job_handles[ 

392 executor.submit( 

393 frame_plotting.plot_mean_v_time_ch, 

394 channel, 

395 time_x, 

396 mean_y[index], 

397 plot_dir, 

398 info_ch, 

399 ) 

400 ] = "plot_mean_v_time_ch" 

401 

402 # job_handles[ 

403 # executor.submit( 

404 # frame_plotting.plot_mean_v_time_ch_zoom, 

405 # channel, 

406 # time_x, 

407 # mean_y[index], 

408 # plot_dir, 

409 # info_ch, 

410 # ) 

411 # ] = "plot_mean_v_time_ch_zoom" 

412 

413 # job_handles[ 

414 # executor.submit( 

415 # frame_plotting.plot_std_v_time_ch, 

416 # channel, 

417 # time_x, 

418 # std_y[index], 

419 # plot_dir, 

420 # info_ch, 

421 # ) 

422 # ] = "plot_std_v_time_ch" 

423 

424 # job_handles[ 

425 # executor.submit( 

426 # frame_plotting.plot_std_v_time_ch_zoom, 

427 # channel, 

428 # time_x, 

429 # std_y[index], 

430 # plot_dir, 

431 # info_ch, 

432 # ) 

433 # ] = "plot_std_v_time_ch_zoom" 

434 

435 job_handles[ 

436 executor.submit( 

437 frame_plotting.plot_fft_mean_v_time, 

438 channel, 

439 freq, 

440 psd[index], 

441 plot_dir, 

442 info_ch, 

443 ) 

444 ] = "plot_fft_mean_v_time" 

445 

446 # job_handles[ 

447 # executor.submit( 

448 # frame_plotting.plot_window_stds, 

449 # channel, 

450 # plot_dir, 

451 # std_y[index], 

452 # std_samples[index], 

453 # info_ch 

454 # ) 

455 # ] = "plot_window_stds" 

456 

457 job_handles[ 

458 executor.submit( 

459 frame_plotting.plot_window_means, 

460 channel, 

461 plot_dir, 

462 mean_y[index], 

463 std_y[index], 

464 std_samples[index], 

465 trigger_window, 

466 info_ch, 

467 ) 

468 ] = "plot_window_means_sample_mean" 

469 

470 if plot_adc_bcid: 

471 job_handles[ 

472 executor.submit( 

473 frame_plotting.plot_mean_v_bcid, 

474 200, 

475 x_adc_bcid, 

476 h_adc_bcid_sum, 

477 h_adc_bcid_weighted_sum, 

478 plot_dir, 

479 info_g, 

480 ) 

481 ] = "plot_mean_v_bcid_sum" 

482 

483 freq_adc_bcid, psd_adc_bcid = sps.welch(h_adc_bcid_weighted_sum / h_adc_bcid_sum, fs=1, nperseg=33) 

484 job_handles[ 

485 executor.submit( 

486 frame_plotting.plot_fft_mean_v_bcid, 

487 200, 

488 freq_adc_bcid, 

489 psd_adc_bcid, 

490 plot_dir, 

491 info_g, 

492 ) 

493 ] = "plot_fft_mean_v_bcid" 

494 

495 # Check for exceptions 

496 for future in concurrent.futures.as_completed(job_handles): 

497 job = job_handles[future] 

498 try: 

499 future.result() 

500 except Exception as exc: 

501 log.error(f"{job} generated an exception: {exc}") 

502 print(traceback.format_exc()) 

503 

504 if plot_adc_bcid: 

505 pedestal_plotting.plot_fft2d( 

506 freq_adc_all.tolist(), 

507 np.concatenate([[np.array(i)] for i in psd_adc_all]), 

508 plot_dir, 

509 channels=channels, 

510 gain=gain, 

511 run_number=run_number, 

512 board_id=str(board_id), 

513 pas_mode=pas_mode, 

514 unit="1/(ADC BCID)", 

515 extra_filename="adc_bcid", 

516 ) 

517 

518 if not plot_dir_filled: 

519 for f in plot_dir.glob("*png"): 

520 os.chmod(f, 0o664) 

521 for f in plot_dir.glob("*json"): 

522 os.chmod(f, 0o664) 

523 

524 

525def plot_extended_readout_from_loader( 

526 loader: DataSource, 

527 run_number: int, 

528 plot_dir_base: Path, 

529 trigger_window: int = 128, 

530 trigger_rate: float = constants.FELIX_TRIGGER_RATE[1], # Hz 

531 skip_channels_lo: Optional[List[int]] = None, 

532 skip_channels_hi: Optional[List[int]] = None, 

533 swap_frame18: bool = False, 

534 baseline_corr_integration_period: Optional[float] = None, 

535): 

536 """ 

537 Load a dataframe for standalone running 

538 """ 

539 raw_data, _ = frame_utils.check_and_align_frames_wrapper( 

540 loader, 

541 run_number, 

542 swap_frame18, 

543 baseline_corr_integration_period=baseline_corr_integration_period, 

544 plot_dir=Path(plot_dir_base / f"run{run_number}"), 

545 ) 

546 

547 plot_extended_readout( 

548 raw_data, 

549 run_number, 

550 plot_dir_base, 

551 trigger_window, 

552 trigger_rate, 

553 skip_channels_lo, 

554 skip_channels_hi, 

555 )