Coverage for polars_analysis / pedestal.py: 73%

306 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 14:47 -0400

1import concurrent.futures 

2import logging 

3import multiprocessing as mp 

4import os 

5import sys 

6import traceback 

7from concurrent.futures import ProcessPoolExecutor 

8from copy import deepcopy 

9from itertools import product 

10from pathlib import Path 

11from typing import Any, Dict, List, Literal, Optional, Tuple, Union 

12 

13import numpy as np 

14import polars as pl 

15 

16import polars_analysis.analysis.pedestal_analysis as analysis 

17import polars_analysis.plotting.pedestal_plotting as plotting 

18from polars_analysis import frame, frame_utils, utils 

19from polars_analysis.analysis import constants 

20from polars_analysis.data_sources import DataSource 

21from polars_analysis.db_interface import prod_db_data_uploader 

22from polars_analysis.db_interface.production_test_db import ProductionTestDB 

23from polars_analysis.plotting import board_summary_plotting 

24from polars_analysis.plotting.helper import Metadata 

25from polars_analysis.utils import get_columns_or_exit 

26 

27# Instantiate logger 

28log = logging.getLogger(__name__) 

29 

30""" 

31High level commands to run pedestal loading, calculations, and plotting 

32""" 

33 

34 

35def calc_derived(df: pl.DataFrame) -> pl.DataFrame: 

36 return ( 

37 df.select( 

38 pl.col("run_number"), 

39 pl.col("measurement"), 

40 pl.col("channel"), 

41 pl.col("gain"), 

42 pl.col("samples"), 

43 pl.col("board_id"), 

44 pl.col("board_variant"), 

45 pl.col("board_version"), 

46 pl.col("pas_mode"), 

47 ) 

48 .pipe(analysis.pipe_psd) 

49 .pipe(analysis.pipe_autocorr) 

50 .pipe(analysis.pipe_chi2) 

51 .with_columns( 

52 mean=analysis.expr_mean(), 

53 std=analysis.expr_rms(), 

54 maxmin=analysis.expr_max_min(), 

55 ) 

56 .select(pl.exclude("samples")) 

57 ) 

58 

59 

60def calc_all( 

61 raw_data: pl.DataFrame, 

62 skip_channels_lo: Optional[List[int]] = None, 

63 skip_channels_hi: Optional[List[int]] = None, 

64 multiple_boards: Optional[List[str]] = None, 

65) -> Union[Tuple[pl.DataFrame, pl.DataFrame], Tuple[pl.DataFrame, None]]: 

66 if len(raw_data.filter(pl.col("meas_type") == "pedestal")) == 0: 

67 log.critical("No rows in the dataframe correspond to a pedestal run. Aborting.") 

68 raise Exception("Empty dataframe") 

69 

70 columns_to_get = [ 

71 "run_number", 

72 "measurement", 

73 "channel", 

74 "gain", 

75 "samples", 

76 "board_id", 

77 "board_variant", 

78 "board_version", 

79 "pas_mode", 

80 "trigger_window", 

81 ] 

82 raw_data = get_columns_or_exit(raw_data, columns_to_get) 

83 

84 run_number = raw_data.select(pl.col("run_number").first()).item() 

85 board_id = raw_data.select(pl.col("board_id").first()).item() 

86 pas_mode = raw_data.select(pl.col("pas_mode").first()).item() 

87 

88 # HEC boards used to fill pas_mode with NaN, which get converted to floats in the DF instead of an int 

89 if pas_mode != pas_mode: 

90 pas_mode = -1 

91 

92 if multiple_boards is not None: 

93 log.info(f"Calculating coherent noise values across {multiple_boards}") 

94 min_channel_list = [0] 

95 n_channels_list = [128 * len(multiple_boards)] 

96 else: 

97 log.info(f"Calculating coherent noise values for board {board_id}") 

98 min_channel_list = [0, *[64 * i for i in range(2)], *[16 * i for i in range(8)], *[4 * i for i in range(32)]] 

99 n_channels_list = [128, *2 * [64], *8 * [16], *32 * [4]] 

100 measurements = raw_data["measurement"].unique().to_list() 

101 gains = raw_data["gain"].unique().to_list() 

102 

103 # Coherent noise 

104 coherent_noise_data = pl.concat( 

105 ( 

106 analysis.calc_coherent_noise( 

107 raw_data, 

108 min_channel, 

109 n_channel, 

110 run_number, 

111 board_id, 

112 measurement, 

113 pas_mode, 

114 gain, 

115 (skip_channels_lo if gain.lower() == "lo" else skip_channels_hi), 

116 ) 

117 for measurement, gain, (min_channel, n_channel) in product( 

118 measurements, gains, zip(min_channel_list, n_channels_list) 

119 ) 

120 ), 

121 how="diagonal", 

122 ) 

123 

124 if multiple_boards is not None: 

125 return coherent_noise_data, None 

126 

127 # All per sample derived values 

128 log.info("Calculating derived values") 

129 derived_data: pl.DataFrame = calc_derived(raw_data) 

130 return coherent_noise_data, derived_data 

131 

132 

133def plot_all( 

134 raw_data: pl.DataFrame, 

135 coherent_noise_data: pl.DataFrame, 

136 derived_data: pl.DataFrame, 

137 plot_dir: Path, 

138 skip_channels_lo: Optional[List[int]] = None, 

139 skip_channels_hi: Optional[List[int]] = None, 

140): 

141 plot_dir_filled = len([p for p in plot_dir.glob("*png")]) > 0 

142 

143 ### Raw Samples Plots ### 

144 columns_to_get = [ 

145 "run_number", 

146 "measurement", 

147 "channel", 

148 "gain", 

149 "samples", 

150 "board_id", 

151 "pas_mode", 

152 "trigger_window", 

153 ] 

154 raw_data = get_columns_or_exit(raw_data, columns_to_get) 

155 

156 run_number = raw_data["run_number"].unique().to_list()[0] 

157 board_id = raw_data["board_id"].unique().to_list()[0] 

158 pas_mode = raw_data["pas_mode"].unique().to_list()[0] 

159 

160 # HEC boards used to fill pas_mode with NaN, which get converted to floats in the DF instead of an int 

161 if pas_mode != pas_mode: 

162 pas_mode = -1 

163 raw_data.drop_in_place("pas_mode") 

164 

165 if skip_channels_lo: 

166 raw_data = raw_data.filter(~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo)))) 

167 if skip_channels_hi: 

168 raw_data = raw_data.filter(~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi)))) 

169 

170 channels = set(raw_data["channel"].unique().to_list()) 

171 for gain in raw_data["gain"].unique(): 

172 measurements = raw_data.filter(gain=gain)["measurement"].unique().to_list() 

173 matrix = analysis.calc_correlation_matrix(raw_data, measurements, gain) 

174 for min_channel, n_channels in zip(constants.PED_MIN_CHAN_LIST, constants.PED_N_CHAN_LIST): 

175 plot_channels = set(range(min_channel, min_channel + n_channels)) 

176 if channels.isdisjoint(plot_channels): 

177 continue 

178 if analysis.next_power_of_2(len(channels)) < n_channels // 4: 

179 continue 

180 plotting.plot_correlation_matrix( 

181 matrix, gain, min_channel, n_channels, plot_dir, board_id=board_id, pas_mode=pas_mode 

182 ) 

183 

184 for slice in raw_data.iter_slices(n_rows=1): 

185 info = Metadata.fill_from_dataframe(slice) 

186 plotting.plot_raw(info, slice["samples"].to_numpy()[0], plot_dir) 

187 plotting.plot_hist( 

188 info, 

189 slice["samples"].to_numpy()[0], 

190 plot_dir, 

191 ) 

192 

193 ### Derived Values Plots ### 

194 # If a channel is skipped, set its mean and RMS to 0 so it is skipped in the baseline summary plot 

195 if skip_channels_lo: 

196 derived_data = derived_data.with_columns( 

197 mean=pl.when((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo))) 

198 .then(0) 

199 .otherwise(pl.col("mean")), 

200 std=pl.when((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo))) 

201 .then(0) 

202 .otherwise(pl.col("std")), 

203 ) 

204 if skip_channels_hi: 

205 derived_data = derived_data.with_columns( 

206 mean=pl.when((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi))) 

207 .then(0) 

208 .otherwise(pl.col("mean")), 

209 std=pl.when((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi))) 

210 .then(0) 

211 .otherwise(pl.col("std")), 

212 ) 

213 gains: List[Literal["lo", "hi"]] = ["lo", "hi"] 

214 info = Metadata.fill_from_dataframe(raw_data) 

215 for gain in gains: 

216 info.gain = gain 

217 

218 gain_df = derived_data.filter(gain=gain) 

219 if gain == "hi" and skip_channels_hi is not None: 

220 gain_df = gain_df.filter(~pl.col("channel").is_in(skip_channels_hi)) 

221 elif gain == "lo" and skip_channels_lo is not None: 

222 gain_df = gain_df.filter(~pl.col("channel").is_in(skip_channels_lo)) 

223 

224 plotting.plot_fft2d( 

225 gain_df["freq"][0], 

226 gain_df["psd"], 

227 plot_dir, 

228 gain_df["channel"], 

229 gain, 

230 gain_df["run_number"].unique()[0], 

231 board_id, 

232 pas_mode, 

233 ) 

234 

235 plotting.plot_chi2( 

236 gain_df, 

237 gain, 

238 plot_dir, 

239 gain_df["run_number"].unique()[0], 

240 board_id, 

241 pas_mode, 

242 ) 

243 

244 plotting.plot_chi2_hist( 

245 gain_df, 

246 gain, 

247 plot_dir, 

248 constants.CHI2_THRESHOLD, 

249 gain_df["run_number"].unique()[0], 

250 board_id, 

251 pas_mode, 

252 ) 

253 

254 means = gain_df["mean"].to_numpy() 

255 stds = gain_df["std"].to_numpy() 

256 maxmins = gain_df["maxmin"].to_numpy() 

257 

258 board_summary_plotting.plot_pedestal_mean_hist( 

259 means, 

260 plot_dir, 

261 info, 

262 show_cuts=False, 

263 ) 

264 

265 board_summary_plotting.plot_pedestal_rms_hist( 

266 stds, 

267 plot_dir, 

268 info, 

269 show_cuts=False, 

270 ) 

271 

272 board_summary_plotting.plot_pedestal_maxmin_hist( 

273 maxmins, 

274 plot_dir, 

275 info, 

276 show_cuts=False, 

277 ) 

278 

279 plotting.plot_baseline_means_rms( 

280 derived_data, 

281 plot_dir, 

282 skip_channels_hi=skip_channels_hi, 

283 skip_channels_lo=skip_channels_lo, 

284 run_number=run_number, 

285 board_id=board_id, 

286 pas_mode=pas_mode, 

287 ) 

288 

289 for row in derived_data.iter_rows(named=True): 

290 if skip_channels_hi is not None and row["gain"] == "hi" and row["channel"] in skip_channels_hi: 

291 continue 

292 if skip_channels_lo is not None and row["gain"] == "lo" and row["channel"] in skip_channels_lo: 

293 continue 

294 plotting.plot_autocorrelation( 

295 row["run_number"], 

296 row["channel"], 

297 row["gain"], 

298 row["autocorr"], 

299 plot_dir, 

300 pas_mode=pas_mode, 

301 board_id=board_id, 

302 ) 

303 plotting.plot_fft( 

304 row["channel"], 

305 row["gain"], 

306 row["freq"], 

307 row["psd"], 

308 row["peaks"], 

309 plot_dir, 

310 run_number=run_number, 

311 board_id=board_id, 

312 pas_mode=pas_mode, 

313 ) 

314 

315 ### Coherent Noise Results Plots ### 

316 for row in coherent_noise_data.iter_rows(named=True): 

317 plotting.plot_coherent_noise(row, plot_dir, pas_mode=pas_mode) 

318 if row["n_channels"] == 128 or row["n_channels"] == 64: 

319 plotting.plot_coherent_noise(row, plot_dir, pas_mode=pas_mode, use_log_scale=True) 

320 

321 if not plot_dir_filled: 

322 for f in plot_dir.glob("*png"): 

323 os.chmod(f, 0o664) 

324 for f in plot_dir.glob("*json"): 

325 os.chmod(f, 0o664) 

326 

327 

328def parallel_plot_all( 

329 raw_data: pl.DataFrame, 

330 coherent_noise_data: pl.DataFrame, 

331 derived_data: pl.DataFrame, 

332 plot_dir: Path, 

333 skip_channels_lo: Optional[List[int]] = None, 

334 skip_channels_hi: Optional[List[int]] = None, 

335): 

336 plot_dir_filled = len([p for p in plot_dir.glob("*png")]) > 0 

337 

338 with ProcessPoolExecutor(mp_context=mp.get_context("spawn")) as executor: 

339 job_handles: Dict[Any, Any] = dict() 

340 ### Raw Samples Plots ### 

341 columns_to_get = [ 

342 "run_number", 

343 "measurement", 

344 "channel", 

345 "gain", 

346 "samples", 

347 "board_id", 

348 "pas_mode", 

349 "trigger_window", 

350 ] 

351 raw_data = get_columns_or_exit(raw_data, columns_to_get) 

352 

353 if skip_channels_lo: 

354 raw_data = raw_data.filter(~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo)))) 

355 derived_data = derived_data.filter( 

356 ~((pl.col("gain") == "lo") & (pl.col("channel").is_in(skip_channels_lo))) 

357 ) 

358 if skip_channels_hi: 

359 raw_data = raw_data.filter(~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi)))) 

360 derived_data = derived_data.filter( 

361 ~((pl.col("gain") == "hi") & (pl.col("channel").is_in(skip_channels_hi))) 

362 ) 

363 

364 run_number = raw_data["run_number"].unique().to_list()[0] 

365 board_id = raw_data["board_id"].unique().to_list()[0] 

366 pas_mode = raw_data["pas_mode"].unique().to_list()[0] 

367 

368 # HEC boards used to fill pas_mode with NaN, which get converted to floats in the DF instead of an int 

369 if pas_mode != pas_mode: 

370 pas_mode = -1 

371 

372 for gain in raw_data["gain"].unique(): 

373 measurements = raw_data.filter(gain=gain)["measurement"].unique().to_list() 

374 channels = set(raw_data.filter(gain=gain)["channel"].unique().to_list()) 

375 corr_matrix = analysis.calc_correlation_matrix(raw_data, measurements, gain) 

376 for min_channel, n_channels in zip(constants.PED_MIN_CHAN_LIST, constants.PED_N_CHAN_LIST): 

377 plot_channels = set(range(min_channel, min_channel + n_channels)) 

378 if channels.isdisjoint(plot_channels): 

379 continue 

380 if analysis.next_power_of_2(len(channels)) < n_channels // 4: 

381 continue 

382 job_handles[ 

383 executor.submit( 

384 plotting.plot_correlation_matrix, 

385 corr_matrix, 

386 gain, 

387 min_channel, 

388 n_channels, 

389 plot_dir, 

390 run_number=run_number, 

391 board_id=board_id, 

392 pas_mode=pas_mode, 

393 ) 

394 ] = f"plot_correlation_matrix_{gain}" 

395 

396 for slice in raw_data.iter_slices(n_rows=1): 

397 job_handles[ 

398 executor.submit( 

399 plotting.plot_raw, 

400 Metadata.fill_from_dataframe(slice), 

401 slice["samples"].to_numpy()[0], 

402 plot_dir, 

403 ) 

404 ] = "plot_raw" 

405 job_handles[ 

406 executor.submit( 

407 plotting.plot_hist, 

408 Metadata.fill_from_dataframe(slice), 

409 slice["samples"].to_numpy()[0], 

410 plot_dir, 

411 ) 

412 ] = "plot_hist" 

413 

414 ### Derived Values Plots ### 

415 # We're lying to the type checking, we only run over the gains that exist in the dataframe 

416 gains: List[Literal["lo", "hi"]] = raw_data["gain"].unique().to_list() 

417 info = Metadata.fill_from_dataframe(raw_data) 

418 for gain in gains: 

419 # Need to copy info for each gain or else parallel plotting might have wrong labels 

420 info_g = deepcopy(info) 

421 info_g.gain = gain 

422 

423 gain_df = derived_data.filter(gain=gain) 

424 

425 means = gain_df["mean"].to_numpy() 

426 stds = gain_df["std"].to_numpy() 

427 maxmins = gain_df["maxmin"].to_numpy() 

428 

429 job_handles[ 

430 executor.submit( 

431 plotting.plot_fft2d, 

432 gain_df["freq"][0], 

433 gain_df["psd"], 

434 plot_dir, 

435 gain_df["channel"], 

436 gain, 

437 gain_df["run_number"].unique()[0], 

438 board_id, 

439 pas_mode, 

440 ) 

441 ] = f"plot_pedestal_fft_2D_{gain}" 

442 

443 job_handles[ 

444 executor.submit( 

445 plotting.plot_chi2, 

446 gain_df, 

447 gain, 

448 plot_dir, 

449 gain_df["run_number"].unique()[0], 

450 board_id, 

451 pas_mode, 

452 ) 

453 ] = f"plot_chi2_{gain}" 

454 

455 job_handles[ 

456 executor.submit( 

457 plotting.plot_chi2_hist, 

458 gain_df, 

459 gain, 

460 plot_dir, 

461 constants.CHI2_THRESHOLD, 

462 gain_df["run_number"].unique()[0], 

463 board_id, 

464 pas_mode, 

465 ) 

466 ] = f"plot_chi2_hist_{gain}" 

467 

468 job_handles[ 

469 executor.submit( 

470 board_summary_plotting.plot_pedestal_mean_hist, 

471 means, 

472 plot_dir, 

473 info_g, 

474 show_cuts=False, 

475 ) 

476 ] = f"plot_pedestal_mean_hist_{gain}" 

477 

478 job_handles[ 

479 executor.submit( 

480 board_summary_plotting.plot_pedestal_rms_hist, 

481 stds, 

482 plot_dir, 

483 info_g, 

484 show_cuts=False, 

485 ) 

486 ] = f"plot_pedestal_rms_hist_{gain}" 

487 

488 job_handles[ 

489 executor.submit( 

490 board_summary_plotting.plot_pedestal_maxmin_hist, 

491 maxmins, 

492 plot_dir, 

493 info_g, 

494 show_cuts=False, 

495 ) 

496 ] = f"plot_pedestal_maxmin_hist_{gain}" 

497 

498 job_handles[ 

499 executor.submit( 

500 plotting.plot_baseline_means_rms, 

501 derived_data, 

502 plot_dir, 

503 skip_channels_hi=skip_channels_hi, 

504 skip_channels_lo=skip_channels_lo, 

505 run_number=run_number, 

506 board_id=board_id, 

507 pas_mode=pas_mode, 

508 ) 

509 ] = "plot_baseline_means_rms" 

510 

511 for row in derived_data.iter_rows(named=True): 

512 job_handles[ 

513 executor.submit( 

514 plotting.plot_autocorrelation, 

515 row["run_number"], 

516 row["channel"], 

517 row["gain"], 

518 row["autocorr"], 

519 plot_dir, 

520 board_id=board_id, 

521 pas_mode=pas_mode, 

522 ) 

523 ] = "job_handles" 

524 job_handles[ 

525 executor.submit( 

526 plotting.plot_fft, 

527 row["channel"], 

528 row["gain"], 

529 row["freq"], 

530 row["psd"], 

531 row["peaks"], 

532 plot_dir, 

533 run_number=run_number, 

534 board_id=board_id, 

535 pas_mode=pas_mode, 

536 ) 

537 ] = "plot_fft" 

538 

539 ### Coherent Noise Results Plots ### 

540 for row in coherent_noise_data.iter_rows(named=True): 

541 info_g = deepcopy(info) 

542 info_g.gain = row["gain"] 

543 

544 job_handles[ 

545 executor.submit( 

546 plotting.plot_coherent_noise, 

547 row, 

548 plot_dir, 

549 pas_mode=pas_mode, 

550 use_log_scale=False, 

551 ) 

552 ] = "plot_coherent_noise" 

553 

554 if row["n_channels"] == 128 or row["n_channels"] == 64: 

555 job_handles[ 

556 executor.submit( 

557 plotting.plot_coherent_noise, 

558 row, 

559 plot_dir, 

560 pas_mode=pas_mode, 

561 use_log_scale=True, 

562 ) 

563 ] = "plot_coherent_noise" 

564 

565 # Check for exceptions 

566 for future in concurrent.futures.as_completed(job_handles): 

567 job = job_handles[future] 

568 try: 

569 future.result() 

570 except Exception as exc: 

571 log.error(f"{job} generated an exception: {exc}") 

572 print(traceback.format_exc()) 

573 

574 if not plot_dir_filled: 

575 for f in plot_dir.glob("*png"): 

576 os.chmod(f, 0o664) 

577 for f in plot_dir.glob("*json"): 

578 os.chmod(f, 0o664) 

579 

580 

581def upload_coherent_noise_data(coherent_noise_data: pl.DataFrame, uri: str): 

582 prod_db = ProductionTestDB(uri) 

583 run_number = coherent_noise_data["run_number"].unique().to_list() 

584 filtered_data = coherent_noise_data.filter(n_channels=128) 

585 if filtered_data.is_empty(): 

586 log.warning("No n_channels=128 entries found in coherent noise data") 

587 return 

588 success = prod_db_data_uploader.upload_derived_data( 

589 filtered_data, 

590 prod_db, 

591 { 

592 "pct_coh": "coherent_noise_0_128", 

593 }, 

594 "pedestal", 

595 ) 

596 if success: 

597 log.info(f"Uploaded run {run_number} production coherent noise data to db: {prod_db}") 

598 else: 

599 log.error(f"Failed to upload run {run_number} to production coherent noise data to db: {prod_db}") 

600 

601 

602def upload_derived_data(derived_data: pl.DataFrame, uri: str): 

603 prod_db = ProductionTestDB(uri) 

604 run_number = derived_data["run_number"].unique().to_list() 

605 

606 # Get PAS mode to distinguish ALFE 50 ohm or HPS gain 2 runs 

607 pas_mode: Optional[str] = None 

608 if "pas_mode" in derived_data.columns: 

609 pas_modes = derived_data.select(pl.col("pas_mode")).unique().to_series().to_list() 

610 if len(pas_modes) != 1: 

611 log.error(f"Invalid number of PAS modes for QC db: {pas_modes}") 

612 else: 

613 pas_mode = pas_modes[0] 

614 else: 

615 log.warning("pas_mode not available in derived dataframe.") 

616 

617 qc_alt_hps_alfe_mode = "" 

618 if pas_mode is not None and int(pas_mode) == 50: 

619 qc_alt_hps_alfe_mode = "_50ohm" 

620 elif pas_mode is not None and int(pas_mode) == 2: 

621 qc_alt_hps_alfe_mode = "_hps2" 

622 

623 success = prod_db_data_uploader.upload_derived_data( 

624 derived_data, 

625 prod_db, 

626 { 

627 "std": "std" + qc_alt_hps_alfe_mode, 

628 "mean": "mean" + qc_alt_hps_alfe_mode, 

629 }, 

630 "pedestal", 

631 ) 

632 if success: 

633 log.info(f"Uploaded run {run_number} production derived data to db: {prod_db}") 

634 else: 

635 log.error(f"Failed to upload run {run_number} to production derived data to db: {prod_db}") 

636 

637 

638def calc_plot_all( 

639 loader: DataSource, 

640 run_number: int, 

641 plot_dir: Path, 

642 skip_channels_lo: Optional[List[int]] = None, 

643 skip_channels_hi: Optional[List[int]] = None, 

644 load_frames: bool = False, 

645 align_frames: bool = False, 

646 swap_frame18: bool = False, 

647 uri: Optional[str] = None, 

648 baseline_corr_integration_period: Optional[float] = None, 

649 bnl_data: bool = False, 

650): 

651 if not plot_dir.exists(): 

652 plot_dir.mkdir(parents=True, exist_ok=True) 

653 os.chmod(plot_dir, 0o775) 

654 

655 alignment_info = "Alignment not checked" 

656 

657 board_ids = loader.get_boards_list(run_number)["board_id"].to_list() 

658 

659 if load_frames: 

660 raw_data, alignment = frame_utils.check_and_align_frames_wrapper( 

661 loader, 

662 run_number, 

663 swap_frame18, 

664 baseline_corr_integration_period=baseline_corr_integration_period, 

665 plot_dir=plot_dir, 

666 do_alignment=align_frames, 

667 bnl_data=bnl_data, 

668 ) 

669 

670 if len(alignment[0]) != 0: 

671 channels = [int(i) for i in alignment[0]] 

672 alignment_info = "Alignment performed on ch " + str(channels) 

673 for board_id in board_ids: 

674 utils.add_run_info("channels", channels, board_id, plot_dir, print_to_website=False) 

675 utils.add_run_info( 

676 "first_sample", [int(i) for i in alignment[1]], board_id, plot_dir, print_to_website=False 

677 ) 

678 utils.add_run_info("offset", [int(i) for i in alignment[2]], board_id, plot_dir, print_to_website=False) 

679 else: 

680 alignment_info = "Frames already aligned" 

681 else: 

682 raw_data = loader.load_raw_data(run_number) 

683 

684 for board_id in board_ids: 

685 utils.add_run_info("alignment_info", alignment_info, board_id, plot_dir) 

686 

687 # Check if we're in an extended readout 

688 extended_readout = False 

689 n_trigger_windows: int = -1 

690 trigger_window: int = -1 

691 if raw_data["trigger_window"][0] is not None and raw_data["trigger_window"][0] > 0: 

692 trigger_window = raw_data["trigger_window"][0] 

693 n_trigger_windows = int(len(raw_data["samples"][0]) / trigger_window) 

694 elif load_frames and (fec := loader.load_frame_data(run_number)["felix_event_count"][0]) is not None: 

695 n_trigger_windows = len(fec.unique()) 

696 trigger_window = len(np.where(fec == 0)[0]) 

697 extended_readout = n_trigger_windows > constants.EXTENDED_PEDESTAL_TRIGGER_WINDOW_N 

698 

699 if ( 

700 "trigger_rate" in raw_data.columns 

701 and raw_data["trigger_rate"][0] is not None 

702 and raw_data["trigger_rate"][0] >= 0 

703 ): 

704 for board_id in board_ids: 

705 utils.add_run_info( 

706 "Trigger rate", 

707 f"{constants.felix_trigger_rate(run_number, raw_data['trigger_rate'][0], bnl_data):0.2f} Hz", 

708 board_id, 

709 plot_dir, 

710 True, 

711 ) 

712 utils.add_run_info( 

713 "Total duration", 

714 f"{n_trigger_windows / constants.felix_trigger_rate(run_number, raw_data['trigger_rate'][0], bnl_data):0.2f} s", # noqa: E501 

715 board_id, 

716 plot_dir, 

717 True, 

718 ) 

719 for board_id in board_ids: 

720 utils.add_run_info("Trigger window size", trigger_window, board_id, plot_dir, True) 

721 

722 all_coherent_noise_data = [] 

723 all_derived_data = [] 

724 boards = raw_data["board_id"].unique().sort().to_list() 

725 coh_matrix: Dict[str, Dict[str, Any]] = {"lo": {}, "hi": {}} 

726 for board in boards: 

727 raw_data_board = raw_data.filter(pl.col("board_id") == board) 

728 coherent_noise_data_board, derived_data_board = calc_all(raw_data_board, skip_channels_lo, skip_channels_hi) 

729 if derived_data_board is None: 

730 log.error(f"Failed to calculate derived data for board {board}") 

731 sys.exit(1) 

732 

733 for row in coherent_noise_data_board.iter_rows(named=True): 

734 if row["n_channels"] == 128: 

735 coh_matrix[row["gain"]][board] = row["pct_coh"] 

736 all_coherent_noise_data.append(coherent_noise_data_board) 

737 all_derived_data.append(derived_data_board) 

738 

739 log.info("Making pedestal plots for board %s", board) 

740 if log.getEffectiveLevel() == 10: # debug 

741 plot_all( 

742 raw_data_board, 

743 coherent_noise_data_board, 

744 derived_data_board, 

745 plot_dir, 

746 skip_channels_lo, 

747 skip_channels_hi, 

748 ) 

749 else: 

750 parallel_plot_all( 

751 raw_data_board, 

752 coherent_noise_data_board, 

753 derived_data_board, 

754 plot_dir, 

755 skip_channels_lo, 

756 skip_channels_hi, 

757 ) 

758 

759 if len(boards) == 1 and uri is not None: 

760 upload_derived_data(derived_data_board, uri) 

761 upload_coherent_noise_data(coherent_noise_data_board, uri) 

762 

763 # For multiboard runs 

764 if len(boards) > 1: 

765 for multiple_boards in utils.get_board_combinations(boards): 

766 log.debug(f"Creating multiboard plots for boards {multiple_boards}") 

767 board_id_str: str = "_".join(multiple_boards) 

768 raw_data_offset: List[pl.DataFrame] = [] 

769 for i, board in enumerate(multiple_boards): 

770 board_data = raw_data.filter(pl.col("board_id") == board) 

771 board_data = board_data.with_columns(channel=pl.col("channel") + 128 * i) 

772 raw_data_offset.append(board_data) 

773 combined_raw_data = pl.concat(raw_data_offset) 

774 

775 log.debug("Calculating coherent noise") 

776 coherent_noise_data, _ = calc_all( 

777 combined_raw_data, skip_channels_lo, skip_channels_hi, multiple_boards=multiple_boards 

778 ) 

779 all_coherent_noise_data.append(coherent_noise_data) 

780 

781 log.debug("Plotting coherent noise") 

782 for row in coherent_noise_data.iter_rows(named=True): 

783 _, pct_coh = plotting.plot_coherent_noise(row, plot_dir, use_log_scale=True, board_id=board_id_str) 

784 # Store with board_id_str as key 

785 coh_matrix[row["gain"]][board_id_str] = pct_coh 

786 

787 for gain in combined_raw_data["gain"].unique(): 

788 measurements = combined_raw_data.filter(gain=gain)["measurement"].unique().to_list() 

789 log.debug(f"Calculating {gain} correlation matrix") 

790 matrix = analysis.calc_correlation_matrix( 

791 combined_raw_data, measurements, gain, multiple_boards=multiple_boards 

792 ) 

793 log.debug(f"Plotting {gain} correlation matrix") 

794 plotting.plot_correlation_matrix( 

795 matrix, gain, 0, 128 * len(multiple_boards), plot_dir, board_id=board_id_str, plot_numbers=False 

796 ) 

797 

798 log.debug("Plotting coherent noise matrix") 

799 plotting.plot_coherent_noise_matrix(coh_matrix, plot_dir) 

800 

801 loader.save_coherent_noise_data(pl.concat(all_coherent_noise_data), run_number=run_number) 

802 # Don't save autocorr, a column with type List[Float64] and length n_samples 

803 loader.save_derived_data( 

804 pl.concat(all_derived_data).select(pl.exclude("autocorr")), 

805 run_number=run_number, 

806 meas_type="pedestal", 

807 ) 

808 

809 if extended_readout and load_frames: 

810 log.info("Making extended readout plots") 

811 frame.plot_extended_readout( 

812 raw_data, 

813 run_number, 

814 plot_dir.parent, # need to strip off runWXYZ 

815 skip_channels_lo=skip_channels_lo, 

816 skip_channels_hi=skip_channels_hi, 

817 bnl_data=bnl_data, 

818 )