Coverage for polars_analysis / analysis / pulse_analysis.py: 91%

309 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2from contextlib import suppress 

3from pathlib import Path 

4from typing import List, Optional, Tuple 

5 

6import numpy as np 

7import polars as pl 

8from scipy import linalg 

9from scipy.optimize import curve_fit 

10from scipy.signal import find_peaks 

11 

12from polars_analysis.analysis import constants 

13from polars_analysis.plotting import helper 

14 

15# Instantiate logger 

16log = logging.getLogger(__name__) 

17 

18""" 

19Functions to calculate derived values for pulse runs. 

20""" 

21 

22 

23def expr_awg_amp_to_amp(col1: str = "awg_amp", col2: str = "att_val") -> pl.Expr: 

24 """ 

25 Convert AWG amplitude to amplitude in mA. 

26 

27 Args: 

28 col1: Column name of AWG amplitude. 

29 col2: Column name of attenuation value. 

30 

31 Returns: 

32 Expression to calculate amplitude in mA. 

33 """ 

34 return 4.0 * pl.col(col1) * 10 ** (-pl.col(col2) / 20.0) 

35 

36 

37def expr_max_pulse_amp(col: str = "samples") -> pl.Expr: 

38 """ 

39 Get maximum pulse amplitude. 

40 

41 Args: 

42 col: Column name of pulse samples. 

43 

44 Returns: 

45 Expression to calculate maximum pulse amplitude. 

46 """ 

47 return pl.col(col).list.max() - pl.col(col).list.median() 

48 

49 

50def expr_max_phase_indices(mean_interleaved_pulse: str = "mean_interleaved_pulse", phase_shift: int = 0) -> pl.Expr: 

51 """ 

52 Get indices of samples around the pulse peak, shifted by a phase. 

53 Uses the maximum amplitude pulse per channel and gain, to mirror how the OFCs are derived. 

54 

55 Args: 

56 mean_interleaved_pulse: Column name of mean interleaved pulse. 

57 phase_shift: Phase shift in samples. 

58 

59 Returns: 

60 Expression to calculate indices of samples around the maximum pulse amplitude. 

61 """ 

62 samples_around_max = [-2, -1, 0, 1, 2] 

63 return pl.concat_list( 

64 [ 

65 ( 

66 pl.col(mean_interleaved_pulse) 

67 .list.arg_max() 

68 .filter(pl.col("awg_amp") == pl.col("awg_amp").max()) 

69 .first() 

70 .over(["channel", "gain"]) 

71 + phase_shift 

72 + (constants.N_PHASES * i) 

73 ) 

74 % (constants.PULSES_PER_TRAIN * constants.SAMPLES_PER_PULSE) 

75 for i in samples_around_max 

76 ] 

77 ) 

78 

79 

80def per_ch_interleaving_util( 

81 all_pulses: np.ndarray, 

82 trimmed_width: int, 

83) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: 

84 """ 

85 Utility function for pipe_samples_interleaved below 

86 """ 

87 

88 mean_pulse_train = all_pulses.reshape([-1, constants.PULSE_TRAIN_PERIOD]).mean(axis=0) 

89 threshold = np.median(mean_pulse_train) + 3 * np.std(mean_pulse_train) 

90 # Require peaks be at least SAMPLES_PER_PULSE away from each other, with 5 samples of wiggle room 

91 # Based on some testing with large pulses 1 or 2 samples should be fine, but I wanted to be safe 

92 PEAK_SPACING_PADDING = 5 

93 peaks: np.ndarray = find_peaks( 

94 mean_pulse_train, 

95 height=threshold, 

96 distance=constants.SAMPLES_PER_PULSE - PEAK_SPACING_PADDING, 

97 )[0] 

98 if len(peaks) > constants.PULSES_PER_TRAIN: 

99 log.warning( 

100 ( 

101 f"Found more than {constants.PULSES_PER_TRAIN} possible " 

102 f"pulses, will use {constants.PULSES_PER_TRAIN} highest peaks" 

103 ) 

104 ) 

105 peaks = np.sort(peaks[np.argsort(mean_pulse_train[peaks])][-constants.PULSES_PER_TRAIN :]) 

106 if len(peaks) < constants.PULSES_PER_TRAIN: 

107 log.warning(f"Found {len(peaks)} peaks, retrying with lower threshold") 

108 threshold = np.median(mean_pulse_train) + 2 * np.std(mean_pulse_train) 

109 peaks = find_peaks( 

110 mean_pulse_train, 

111 height=threshold, 

112 distance=constants.SAMPLES_PER_PULSE - PEAK_SPACING_PADDING, 

113 )[0] 

114 peaks = np.sort(peaks[np.argsort(mean_pulse_train[peaks])][-constants.PULSES_PER_TRAIN :]) 

115 if len(peaks) < constants.PULSES_PER_TRAIN: 

116 # Sometimes we get unlucky and sample on the falling edge of the first peak in a train. 

117 # We can add one pulse to the end of the train to let us find the first peak 

118 mean_pulse_train = np.concatenate([mean_pulse_train, mean_pulse_train[: constants.SAMPLES_PER_PULSE]]) 

119 log.warning(f"Found {len(peaks)} peaks after lowering threshold, retrying with extra first pulse") 

120 threshold = np.median(mean_pulse_train) + 3 * np.std(mean_pulse_train) 

121 peaks = find_peaks( 

122 mean_pulse_train, 

123 height=threshold, 

124 distance=constants.SAMPLES_PER_PULSE - PEAK_SPACING_PADDING, 

125 )[0] 

126 peaks = np.sort(peaks[np.argsort(mean_pulse_train[peaks])][-constants.PULSES_PER_TRAIN :]) 

127 

128 start, end = None, None 

129 if len(peaks) > 0: 

130 peaks = np.append(peaks, peaks[0] + constants.PULSE_TRAIN_PERIOD) 

131 # Offset by PEAK_INDEX to make sure pulse peak is in the right spot 

132 start = (peaks[np.diff(peaks).argmax() + 1] - constants.PEAK_INDEX) % constants.PULSE_TRAIN_PERIOD 

133 # # This lines up with the previous version, but I'm not sure why that timing was chosen 

134 # # The new timing also fixes the OFC samples plot, so maybe the old was a mistake 

135 # start = (peaks[np.diff(peaks).argmax() + 1] - 8) % constants.PULSE_TRAIN_PERIOD 

136 end = start + constants.PULSE_TRAIN_PERIOD * ((trimmed_width // constants.PULSE_TRAIN_PERIOD) - 1) 

137 else: 

138 log.error("No peaks found for interleaving!") 

139 

140 return (start, end) 

141 

142 

143def pipe_samples_interleaved( 

144 df: pl.DataFrame, samples: str = "samples", is_reference_pulse: Optional[str] = None 

145) -> pl.DataFrame: 

146 """ 

147 Trigger and interleave pulse samples, producing series of pulses that are SAMPLES_PER_PULSE * N_PHASES long. 

148 Adds "samples_triggered", "samples_interleaved", "mean_interleaved_pulse" 

149 and "samples_baseline" columns to the DataFrame. 

150 

151 Args: 

152 df: DataFrame. 

153 samples: Column name of pulse samples. 

154 

155 Returns: 

156 DataFrame with 1D array of triggered pulse samples in "samples_triggered", 

157 2D array of interleaved pulse samples in "samples_interleaved" column, 

158 the mean of all interleaved pulses in "mean_interleaved_pulse" column, 

159 and 2D array of baseline samples (in between pulse trains) in "samples_baseline" column. 

160 """ 

161 width = df.select(pl.col(samples).list.len().min()).item() 

162 num_trains = width // constants.PULSE_TRAIN_PERIOD 

163 samples_interleaved = np.zeros([len(df), num_trains - 1, constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN]) 

164 samples_baseline = np.zeros([len(df), num_trains - 1, constants.TRIGGER_OFFSET]) 

165 # If you want to save the interleaved samples without the baseline subtracted 

166 # samples_interleaved_raw = np.zeros( 

167 # [len(df), num_trains - 1, constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN] 

168 # ) 

169 

170 trimmed_width = constants.PULSE_TRAIN_PERIOD * num_trains 

171 samples_array = df[samples].list.slice(0, trimmed_width).list.to_array(trimmed_width).to_numpy() 

172 

173 ref_start, ref_end = None, None 

174 if is_reference_pulse: 

175 # The reference pulse for cross talk runs 

176 reference_samples = ( # noqa F841 

177 df.filter(pl.col(is_reference_pulse)) 

178 .sort(pl.col(samples).list.max()) # Need to pick high gain 

179 .select(pl.last(samples))[samples] 

180 .list.slice(0, trimmed_width) 

181 .list.to_array(trimmed_width) 

182 .to_numpy() 

183 ) 

184 ref_start, ref_end = per_ch_interleaving_util(reference_samples, trimmed_width) 

185 

186 for i, all_pulses in enumerate(samples_array): 

187 if is_reference_pulse: 

188 start, end = ref_start, ref_end 

189 else: 

190 start, end = per_ch_interleaving_util(all_pulses, trimmed_width) 

191 

192 if start is None and end is None: 

193 log.warning(f"No start and end found for index {i}") 

194 log.warning(f"{df['channel', 'gain', 'awg_amp'][i]}") 

195 

196 # Samples triggered starts right at the peak, mainly used for autocorrelation 

197 triggered_pulses = all_pulses[start:end].reshape([-1, constants.PULSE_TRAIN_PERIOD]) 

198 baseline_samples = triggered_pulses[:, constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN :] 

199 interleaved_pulses = triggered_pulses[:, constants.TIME_INDICES_SORTED] - baseline_samples.mean() 

200 with suppress(ValueError): 

201 samples_interleaved[i] = interleaved_pulses 

202 samples_baseline[i] = baseline_samples 

203 

204 # samples_interleaved_raw[i] = triggered_pulses[:, constants.TIME_INDICES_SORTED] 

205 

206 mean_interleaved_pulse = np.mean(samples_interleaved, axis=1) 

207 

208 df = df.with_columns( 

209 pl.Series(name="samples_interleaved", values=samples_interleaved, dtype=pl.List(pl.List(pl.Float64))), 

210 pl.Series(name="mean_interleaved_pulse", values=mean_interleaved_pulse, dtype=pl.List(pl.Float64)), 

211 pl.Series(name="samples_baseline", values=samples_baseline), 

212 # pl.Series(name="samples_interleaved_raw", values=samples_interleaved_raw), 

213 ) 

214 

215 # # Replace samples_interleaved with one where the baseline is determined once per channel, not per measurement 

216 # df = df.join( 

217 # df.filter(pl.col("awg_amp") == pl.col("awg_amp").max().over("gain")).with_columns( 

218 # pl.col("samples_baseline") 

219 # .arr.to_list() 

220 # .list.eval(pl.element().arr.to_list().list.mean()) 

221 # .list.mean() 

222 # .alias("max_amp_baseline_mean") 

223 # )["run_number", "channel", "gain", "max_amp_baseline_mean"], 

224 # on=["run_number", "channel", "gain"], 

225 # how="left", 

226 # ) 

227 # df = df.with_columns( 

228 # (pl.col("samples_interleaved_raw") - pl.col("max_amp_baseline_mean")).alias("samples_interleaved") 

229 # ).drop("max_amp_baseline_mean") 

230 

231 return df 

232 

233 

234def calc_autocorr_along_axis(x: np.ndarray) -> np.ndarray: 

235 """ 

236 Helper function to calculate autocorrelation along an axis. 

237 TODO: Incorporate this into calc_autocorr. 

238 

239 Args: 

240 x: Array of samples. 

241 

242 Returns: 

243 Autocorrelation of the samples. 

244 """ 

245 correlated = np.correlate(x, x, mode="full") 

246 return correlated[correlated.size // 2 :] 

247 

248 

249def baseline_calc_autocorr(trains: np.ndarray) -> np.ndarray: 

250 """ 

251 Helper function to calculate autocorrelation of baseline samples. 

252 Only uses the last 500 samples of each baseline, where the ADC response has returned to baseline. 

253 

254 Args: 

255 trains: Array of baseline samples. 

256 

257 Returns: 

258 Mean of the autocorrelations of the baseline samples. 

259 """ 

260 last_500_samples = trains[:, -500:].astype(float) 

261 last_500_samples -= np.nanmean(last_500_samples) 

262 x = np.apply_along_axis(calc_autocorr_along_axis, 1, last_500_samples[:, :60]) 

263 ac = np.nanmean(x, axis=0) 

264 return ac[:60] 

265 

266 

267def pipe_OFCs( 

268 df: pl.DataFrame, 

269 mean_interleaved_pulse: str = "mean_interleaved_pulse", 

270 max_phase_indices: str = "max_phase_indices", 

271 amp: str = "amp", 

272 is_crosstalk: bool = False, 

273 quantile: float = 1.0, 

274) -> pl.DataFrame: 

275 """ 

276 Calculate Optimal Filter Coefficients (OFCs) for pulse samples. 

277 Returns DataFrame with "autocorr", "OFCs_a", "OFCs_b", and "OFC_amp" columns. 

278 

279 Args: 

280 df: DataFrame. 

281 mean_interleaved_pulse: Column name of mean interleaved pulse. 

282 max_phase_indices: Column name of indices around the maximum pulse amplitude. 

283 amp: Column name of pulse amplitude. 

284 

285 Returns: 

286 DataFrame with OFCs and autocorrelation. 

287 """ 

288 if is_crosstalk: 

289 # Only one row per channel and gain with max amplitude, and must be a reference pulse 

290 ofc_df = df.filter(pl.col(amp) == pl.col(amp).max().over(["channel", "gain"]), pl.col("is_reference_pulse")) 

291 join_columns = ["gain"] 

292 if len(ofc_df) > 2: 

293 log.warning(f"Found {len(ofc_df)} references pulses, only 2 (1 per gain) are supported for crosstalk runs") 

294 else: 

295 # One row per channel and gain with max amplitude 

296 # Code to use Nth quantile amplitude 

297 ofc_df = df.filter(pl.col(amp) == pl.quantile(amp, quantile).over(["channel", "gain"])) 

298 log.debug(f"Amp used for OFCs = {ofc_df['channel', 'gain', 'amp', 'awg_amp']}") 

299 # Code to use max amplitude 

300 # ofc_df = df.filter(pl.col(amp) == pl.col(amp).max().over(["channel", "gain"])) 

301 

302 join_columns = ["channel", "gain"] 

303 

304 ofc_df = ofc_df.unique(subset=["channel", "gain"], keep="first", maintain_order=True) 

305 updated_rows = [] 

306 

307 for channel in ofc_df.select("channel").unique().to_series(): 

308 channel_df = ofc_df.filter(pl.col("channel") == channel) 

309 

310 a_coeffs: dict[str, List[List[float]]] = { 

311 "hi": [[] for _ in range(constants.N_PHASES)], 

312 "lo": [[] for _ in range(constants.N_PHASES)], 

313 } 

314 b_coeffs: dict[str, List[List[float]]] = { 

315 "hi": [[] for _ in range(constants.N_PHASES)], 

316 "lo": [[] for _ in range(constants.N_PHASES)], 

317 } 

318 autocorr: dict[str, List[float]] = {"hi": [], "lo": []} 

319 

320 for row in channel_df.iter_rows(named=True): 

321 autocorrelation = baseline_calc_autocorr(np.array(row["samples_baseline"])) 

322 autocorr[row["gain"]] = autocorrelation.tolist() 

323 max_peak = np.max(np.array(row[mean_interleaved_pulse])[row[max_phase_indices]]) 

324 

325 for i in range(constants.N_PHASES): 

326 phase_shift = i + constants.PHASE_SHIFT0 

327 indices = np.asarray(row[max_phase_indices]) + phase_shift 

328 ofcs = calc_OFCs( 

329 autocorrelation[:5], 

330 np.array(row[mean_interleaved_pulse])[indices], 

331 np.gradient(row[mean_interleaved_pulse])[indices], 

332 max_peak, 

333 ) 

334 a_coeffs[row["gain"]][i] = ofcs[0].tolist() 

335 b_coeffs[row["gain"]][i] = ofcs[1].tolist() 

336 

337 updated_rows.append( 

338 { 

339 "channel": row["channel"], 

340 "gain": row["gain"], 

341 "autocorr": autocorr[row["gain"]], 

342 "OFCs_a": a_coeffs[row["gain"]], 

343 "OFCs_b": b_coeffs[row["gain"]], 

344 "OFC_amp": row["amp"], 

345 } 

346 ) 

347 

348 for i in range(constants.N_PHASES): 

349 phase_shift = i + constants.PHASE_SHIFT0 

350 if a_coeffs["lo"][i] == []: 

351 log.error(f"Failed to compute lo gain OFCs for channel {channel} phase shift {phase_shift}!") 

352 # Uncomment these to get around error 

353 # a_coeffs["lo"][i] = a_coeffs["hi"][i] 

354 # b_coeffs["lo"][i] = b_coeffs["hi"][i] 

355 

356 if a_coeffs["hi"][i] == []: 

357 log.error(f"Failed to compute hi gain OFCs for channel {channel} phase shift {phase_shift}!") 

358 # Uncomment these to get around error 

359 # a_coeffs["hi"][i] = a_coeffs["lo"][i] 

360 # b_coeffs["hi"][i] = b_coeffs["lo"][i] 

361 

362 return df.join( 

363 pl.DataFrame(updated_rows, schema_overrides={"OFC_amp": pl.Float32}), on=join_columns, how="left" 

364 ).drop("channel_right", strict=False) 

365 

366 

367def calc_OFCs( 

368 autocorrelation: np.ndarray, samples: np.ndarray, gradient: np.ndarray, max_peak: float 

369) -> Tuple[np.ndarray, np.ndarray]: 

370 """ 

371 Helper function to calculate Optimal Filter Coefficients (OFCs) for pulse samples. 

372 Uses method and notation described in the Cleland & Stern paper, 

373 "Signal processing considerations for liquid ionization calorimeters in a high rate environment". 

374 

375 Args: 

376 autocorrelation: Autocorrelation of the pulse samples. 

377 samples: Pulse samples. 

378 

379 Returns: 

380 Tuple of OFCs a and b for the pulse samples. 

381 """ 

382 autocorrelation = np.ravel(autocorrelation) / autocorrelation[0] 

383 samples = np.ravel(samples) 

384 gradient = np.ravel(gradient) 

385 # scale = max(samples) 

386 samples /= max_peak 

387 gradient /= max_peak 

388 # calculate V = R^{-1}. 

389 inv_ac = linalg.inv(linalg.toeplitz(autocorrelation)) 

390 # calculate V*g and V*dg only once. 

391 vg = np.dot(inv_ac, samples) 

392 vdg = np.dot(inv_ac, gradient) 

393 # calculate helper variables 

394 q1 = np.dot(samples, vg) 

395 q2 = np.dot(gradient, vdg) 

396 q3 = np.dot(gradient, vg) 

397 delta = q1 * q2 - q3 * q3 

398 # calculate Lagrange multipliers 

399 lm_lambda = q2 / delta 

400 lm_kappa = -q3 / delta 

401 lm_mu = q3 / delta 

402 lm_rho = -q1 / delta 

403 # calculate optimal filter coefficients 

404 a_coeffs = lm_lambda * vg + lm_kappa * vdg 

405 b_coeffs = lm_mu * vg + lm_rho * vdg 

406 return a_coeffs, b_coeffs 

407 

408 

409def pipe_apply_OFCs( 

410 df: pl.DataFrame, 

411 samples_interleaved: str = "samples_interleaved", 

412 max_phase_indices: str = "max_phase_indices", 

413 OFCs_a: str = "OFCs_a", 

414 OFCs_b: str = "OFCs_b", 

415 all_phases: bool = False, 

416) -> pl.DataFrame: 

417 """ 

418 Applies Optimal Filter Coefficients (OFCs) to pulse samples. 

419 Returns DataFrame with "energies", "times", "energy_mean", "energy_std", "time_mean", and "time_std" columns. 

420 

421 Args: 

422 df: DataFrame. 

423 samples_interleaved: Column name of interleaved pulse samples. 

424 max_phase_indices: Column name of indices around the maximum pulse amplitude. 

425 OFCs_a: Column name of a OFCs. 

426 OFCs_b: Column name of b OFCs. 

427 

428 Returns: 

429 DataFrame with energies, times, and statistics. 

430 """ 

431 

432 log.debug("Applying OFCs") 

433 # Calculate energies for all 30 phases of OFCs 

434 energies = ( 

435 df.lazy() 

436 .with_row_index("index0") # per dataframe row 

437 .with_columns(pl.arange(0, constants.N_PHASES).implode().alias("phase")) 

438 .explode(samples_interleaved) 

439 .with_row_index("index1") # per pulse train 

440 .explode(OFCs_a, "phase") 

441 .select( 

442 "index0", 

443 "index1", 

444 OFCs_a, 

445 pl.col(samples_interleaved).list.gather( 

446 (pl.col(max_phase_indices) + pl.col("phase") + constants.PHASE_SHIFT0) 

447 % (constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN) 

448 ), 

449 "run_number", 

450 "measurement", 

451 "channel", 

452 "gain", 

453 ) 

454 .with_row_index("index2") # per phase per pulse train 

455 .explode(OFCs_a, samples_interleaved) 

456 .group_by("index2", maintain_order=True) 

457 .agg( 

458 pl.col("index0").first(), 

459 pl.col("index1").first(), 

460 pl.col(samples_interleaved).dot(OFCs_a).alias("energies"), 

461 pl.col("run_number").first(), 

462 pl.col("measurement").first(), 

463 pl.col("channel").first(), 

464 pl.col("gain").first(), 

465 ) 

466 .drop("index2") 

467 .group_by("index1", maintain_order=True) 

468 .agg( 

469 pl.col("index0").first(), 

470 pl.col("energies"), 

471 pl.col("run_number").first(), 

472 pl.col("measurement").first(), 

473 pl.col("channel").first(), 

474 pl.col("gain").first(), 

475 ) 

476 .drop("index1") 

477 .group_by("index0") 

478 .agg( 

479 pl.col("energies"), 

480 pl.col("run_number").first(), 

481 pl.col("measurement").first(), 

482 pl.col("channel").first(), 

483 pl.col("gain").first(), 

484 ) 

485 .drop("index0") 

486 .collect() 

487 ) 

488 df = df.join(energies, on=["run_number", "measurement", "channel", "gain"]) 

489 

490 # Calculate times for all 30 phases of OFCs 

491 times = ( 

492 df.lazy() 

493 .with_row_index("index0") # per dataframe row 

494 .with_columns(pl.arange(0, constants.N_PHASES).implode().alias("phase")) 

495 .explode("energies", samples_interleaved) 

496 .with_row_index("index1") # per pulse train 

497 .explode(OFCs_b, "energies", "phase") 

498 .select( 

499 "index0", 

500 "index1", 

501 "energies", 

502 OFCs_b, 

503 pl.col(samples_interleaved).list.gather( 

504 (pl.col(max_phase_indices) + pl.col("phase") + constants.PHASE_SHIFT0) 

505 % (constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN) 

506 ), 

507 "run_number", 

508 "measurement", 

509 "channel", 

510 "gain", 

511 ) 

512 .with_row_index("index2") # per phase 

513 .explode(OFCs_b, samples_interleaved) 

514 .group_by("index2", maintain_order=True) 

515 .agg( 

516 pl.col("index0").first(), 

517 pl.col("index1").first(), 

518 pl.col(samples_interleaved).dot(OFCs_b).truediv(pl.col("energies").first()).alias("times"), 

519 pl.col("run_number").first(), 

520 pl.col("measurement").first(), 

521 pl.col("channel").first(), 

522 pl.col("gain").first(), 

523 ) 

524 .drop("index2") 

525 .group_by("index1", maintain_order=True) 

526 .agg( 

527 pl.col("index0").first(), 

528 pl.col("times"), 

529 pl.col("run_number").first(), 

530 pl.col("measurement").first(), 

531 pl.col("channel").first(), 

532 pl.col("gain").first(), 

533 ) 

534 .drop("index1") 

535 .group_by("index0") 

536 .agg( 

537 pl.col("times"), 

538 pl.col("run_number").first(), 

539 pl.col("measurement").first(), 

540 pl.col("channel").first(), 

541 pl.col("gain").first(), 

542 ) 

543 .drop("index0") 

544 .collect() 

545 ) 

546 df = df.join(times, on=["run_number", "measurement", "channel", "gain"]) 

547 

548 # Calculate statistics just for peak pulse 

549 if not all_phases: 

550 df = ( 

551 df.lazy() 

552 .with_columns( 

553 pl.col("energies").list.eval(pl.element().list.get(constants.PHASE_SHIFT_PEAK)).alias("energies"), 

554 pl.col("times").list.eval(pl.element().list.get(constants.PHASE_SHIFT_PEAK)).alias("times"), 

555 ) 

556 .with_columns( 

557 pl.col(OFCs_a).list.get(constants.PHASE_SHIFT_PEAK).alias(OFCs_a), 

558 pl.col(OFCs_b).list.get(constants.PHASE_SHIFT_PEAK).alias(OFCs_b), 

559 pl.col("energies").list.mean().alias("energy_mean"), 

560 pl.col("energies").list.std().alias("energy_std"), 

561 pl.col("energies").list.max().alias("energy_max"), 

562 pl.col("energies").list.min().alias("energy_min"), 

563 ) 

564 .collect() 

565 ) 

566 # Calculate statistics across all 30 pulses in a train, rather than just the peak pulse 

567 else: 

568 df = ( 

569 df.lazy() 

570 .with_columns( 

571 pl.col("energies").list.eval(pl.element().explode()).list.mean().alias("energy_mean"), 

572 pl.col("energies").list.eval(pl.element().explode()).list.max().alias("energy_max"), 

573 pl.col("energies").list.eval(pl.element().explode()).list.min().alias("energy_min"), 

574 pl.col("energies").list.eval(pl.element().explode()).list.std().alias("energy_std"), 

575 ) 

576 .collect() 

577 ) 

578 

579 # This paper about Run 1 topoclustering says only times from cells with energy significance > 2 

580 # are used to calculate cluster timing 

581 # Refer to Section 4.2.3, "Signal timing": https://arxiv.org/abs/1603.02934 

582 CELL_SIGMA_TRESHOLD = 2 

583 if not all_phases: 

584 df = ( 

585 df.drop("times") 

586 .join( 

587 df.lazy() 

588 .select("run_number", "measurement", "channel", "gain", "energies", "times", "energy_std") 

589 .with_row_index() 

590 .explode("energies", "times") 

591 .filter(pl.col("energies").abs() > CELL_SIGMA_TRESHOLD * pl.col("energy_std")) 

592 .group_by("index") 

593 .agg(pl.col("run_number", "measurement", "channel", "gain").first(), "times") 

594 .drop("index") 

595 .collect(), 

596 on=["run_number", "measurement", "channel", "gain"], 

597 ) 

598 .with_columns( 

599 pl.col("times").list.mean().alias("time_mean"), 

600 pl.col("times").list.std().alias("time_std"), 

601 ) 

602 ) 

603 else: 

604 df = ( 

605 df.drop("times") 

606 .join( 

607 df.lazy() 

608 .select("run_number", "measurement", "channel", "gain", "energies", "times", "energy_std") 

609 .with_row_index("index0") 

610 .explode("energies", "times") 

611 .with_row_index("index1") 

612 .explode("energies", "times") 

613 # .filter(pl.col("energies").abs() > CELL_SIGMA_TRESHOLD * pl.col("energy_std")) 

614 .with_columns( 

615 pl.when(pl.col("energies").abs() > CELL_SIGMA_TRESHOLD * pl.col("energy_std")) 

616 .then(pl.col("times")) 

617 .otherwise(None) 

618 .alias("times") 

619 ) 

620 .group_by("index1", maintain_order=True) 

621 .agg(pl.col("run_number", "measurement", "channel", "gain", "index0").first(), "times") 

622 .drop("index1") 

623 .group_by("index0", maintain_order=True) 

624 .agg(pl.col("run_number", "measurement", "channel", "gain").first(), "times") 

625 .drop("index0") 

626 .collect(), 

627 on=["run_number", "measurement", "channel", "gain"], 

628 ) 

629 .lazy() 

630 .with_columns( 

631 pl.col("times").list.eval(pl.element().explode()).list.mean().alias("time_mean"), 

632 pl.col("times").list.eval(pl.element().explode()).list.std().alias("time_std"), 

633 ) 

634 .collect() 

635 ) 

636 

637 ### Ignoring times from low energy cells is designed to replicate what ATLAS does, 

638 ### but some less aggressive ways to discard outlier times are included below. 

639 # # Drop outlier times 

640 # df = df.with_columns( 

641 # pl.col("times") 

642 # .list.eval( 

643 # pl.element().filter( 

644 # pl.element() 

645 # < pl.max_horizontal( 

646 # pl.element().quantile(0.75) + 1.5 * (pl.element().quantile(0.75)-pl.element().quantile(0.25)), 25 

647 # ), 

648 # pl.element() 

649 # > pl.min_horizontal( 

650 # pl.element().quantile(0.25) - 1.5 * (pl.element().quantile(0.75)-pl.element().quantile(0.25)), -25 

651 # ), 

652 # ) 

653 # ) 

654 # .alias("times") 

655 # ) 

656 # # Drop outlier times and their corresponding energies 

657 # df = df.select(pl.exclude("energies", "times")).hstack( 

658 # df["measurement", "channel", "gain", "energies", "times"] 

659 # .explode(["energies", "times"]) 

660 # .with_columns( 

661 # pl.col("times").quantile(0.75).over("measurement", "channel", "gain").alias("Q3"), 

662 # pl.col("times").quantile(0.25).over("measurement", "channel", "gain").alias("Q1"), 

663 # ) 

664 # .with_columns((pl.col("Q3") - pl.col("Q1")).alias("IQR")) 

665 # .filter( 

666 # pl.col("times") < pl.max_horizontal(pl.col("Q3") + 1.5 * pl.col("IQR"), 25), 

667 # pl.col("times") > pl.min_horizontal(pl.col("Q1") - 1.5 * pl.col("IQR"), -25), 

668 # ) 

669 # .group_by("measurement", "channel", "gain", maintain_order=True) 

670 # .agg("energies", "times") 

671 # .select("energies", "times") 

672 # ) 

673 

674 return df 

675 

676 

677def pipe_rise_time( 

678 df: pl.DataFrame, 

679 samples_interleaved: str = "samples_interleaved", 

680 mean_interleaved_pulse: str = "mean_interleaved_pulse", 

681) -> pl.DataFrame: 

682 """ 

683 Calculate both rise time (from mean pulse) and rise time error (Gaussian sigma from individual pulses). 

684 

685 Args: 

686 df: Polars DataFrame. 

687 samples_interleaved: Column name with list of pulses. 

688 mean_interleaved_pulse: Column name with mean pulse. 

689 

690 Returns: 

691 DataFrame with 'rise_time' and 'rise_time_error' columns. 

692 """ 

693 log.debug("Calculating rise times") 

694 FIVE_PERCENT = 0.05 

695 rise_times = [] 

696 rise_time_errors: List[float] = [] 

697 

698 for row in df.iter_rows(named=True): 

699 # --- Rise Time from Mean Pulse --- 

700 flipped_pulse = np.flip(row[mean_interleaved_pulse]) 

701 flipped_times = np.flip(constants.INTERLEAVED_TIMES) 

702 max_i = np.argmax(flipped_pulse) 

703 pulse_peak_time = flipped_times[max_i] 

704 flipped_pulse = flipped_pulse[max_i:] 

705 

706 low_points = np.copy(flipped_pulse) 

707 low_points[flipped_pulse > FIVE_PERCENT * flipped_pulse[0]] = 0 

708 

709 if np.nonzero(low_points)[0].shape[0] == 0: 

710 rise_times.append(0.0) 

711 else: 

712 five_percent_point = np.nonzero(low_points)[0][0] 

713 five_percent_time = flipped_times[max_i + five_percent_point] 

714 rise_times.append(pulse_peak_time - five_percent_time) 

715 

716 # Rise time error from all pulses 

717 rise_time_vals = [] 

718 for sample in row[samples_interleaved]: 

719 flipped = np.flip(sample) 

720 flipped_times = np.flip(constants.INTERLEAVED_TIMES) 

721 max_i = np.argmax(flipped) 

722 peak_time = flipped_times[max_i] 

723 flipped = flipped[max_i:] 

724 

725 low_points = np.copy(flipped) 

726 low_points[flipped > FIVE_PERCENT * flipped[0]] = 0 

727 

728 if np.nonzero(low_points)[0].shape[0] == 0: 

729 continue 

730 else: 

731 five_percent_point = np.nonzero(low_points)[0][0] 

732 five_percent_time = flipped_times[max_i + five_percent_point] 

733 rise_time_vals.append(peak_time - five_percent_time) 

734 

735 # Dummy values to make sure this isn't empty 

736 if len(rise_time_vals) == 0: 

737 rise_time_errors = [0 for _ in rise_times] 

738 else: 

739 hist_bins = np.linspace(min(rise_time_vals), max(rise_time_vals), 25) 

740 fit_pars = helper.calc_gaussian(rise_time_vals, hist_bins) 

741 d_mu = fit_pars[1] 

742 rise_time_errors.append(d_mu) 

743 

744 return df.with_columns( 

745 [ 

746 pl.Series(name="rise_time", values=rise_times), 

747 pl.Series(name="rise_time_error", values=rise_time_errors, strict=False), 

748 ] 

749 ) 

750 

751 

752def pipe_zero_crossing( 

753 df: pl.DataFrame, 

754 mean_interleaved_pulse: str = "mean_interleaved_pulse", 

755 samples_interleaved: str = "samples_interleaved", 

756) -> pl.DataFrame: 

757 """ 

758 Calculate falling zero crossing time (after peak) and zero crossing error 

759 from pulse samples. Adds 'zero_crossing_time' and 'zero_crossing_error' 

760 columns to the DataFrame. 

761 

762 Args: 

763 df: Polars DataFrame. 

764 mean_interleaved_pulse: Column with mean interleaved pulse. 

765 samples_interleaved: Column with list of interleaved pulses. 

766 

767 Returns: 

768 DataFrame with 'zero_crossing_time' and 'zero_crossing_error' columns. 

769 """ 

770 log.debug("Calculating zero crossing times") 

771 zero_crossing_times: list[float] = [] 

772 zero_crossing_errors: list[float] = [] 

773 times = constants.INTERLEAVED_TIMES 

774 

775 for row in df.iter_rows(named=True): 

776 # zero crossing time (from mean pulse) 

777 pulse = row[mean_interleaved_pulse] 

778 max_i = np.argmax(pulse) 

779 peak_time = times[max_i] 

780 

781 for i in range(max_i, len(pulse) - 1): 

782 if pulse[i] == 0: 

783 zero_crossing_times.append(round(times[i], 1)) 

784 break 

785 elif pulse[i] > 0 and pulse[i + 1] < 0: 

786 y1, y2 = pulse[i], pulse[i + 1] 

787 closest_to_zero = min([y1, y2], key=lambda y: abs(y)) 

788 crossing_idx = i if closest_to_zero == y1 else i + 1 

789 zero_crossing_time = round(times[crossing_idx] - peak_time, 1) 

790 zero_crossing_times.append(zero_crossing_time) 

791 break 

792 else: 

793 zero_crossing_times.append(np.nan) 

794 

795 # zero crossing error (from samples) 

796 zero_crossing_time_row = [] 

797 for sample in row[samples_interleaved]: 

798 max_i = np.argmax(sample) 

799 peak_time = times[max_i] 

800 

801 for i in range(max_i, len(sample) - 1): 

802 if sample[i] == 0: 

803 zero_crossing_time_row.append(round(times[i] - peak_time, 1)) 

804 break 

805 elif sample[i] > 0 and sample[i + 1] < 0: 

806 y1, y2 = sample[i], sample[i + 1] 

807 closest_to_zero = min([y1, y2], key=lambda y: abs(y)) 

808 crossing_idx = i if closest_to_zero == y1 else i + 1 

809 zero_crossing_time_row.append(round(times[crossing_idx] - peak_time, 1)) 

810 break 

811 

812 if zero_crossing_time_row: 

813 d_mu = np.std(zero_crossing_time_row) / np.sqrt(len(zero_crossing_time_row)) 

814 zero_crossing_errors.append(d_mu) 

815 else: 

816 zero_crossing_errors.append(np.nan) 

817 

818 return df.with_columns( 

819 [ 

820 pl.Series(name="zero_crossing_time", values=zero_crossing_times, dtype=pl.Float64), 

821 pl.Series(name="zero_crossing_error", values=zero_crossing_errors, dtype=pl.Float64), 

822 ] 

823 ) 

824 

825 

826def pipe_inl( 

827 df: pl.DataFrame, 

828 skip_last_n_hi: Optional[int] = None, 

829 skip_last_n_lo: Optional[int] = None, 

830) -> pl.DataFrame: 

831 """ 

832 Args: 

833 df: Dataframe. Needs columns 'energy_mean', 

834 'energy_std', 'amp', 'gain', 'board_id', 

835 'run_number', 'att_val', and 'channel'. 

836 skip_last_n_hi: skip last n points for hi gain 

837 skip_last_n_lo: skip last n points for lo gain. If None, it's set to hi 

838 

839 Returns: 

840 Updated dataframe with INL 

841 """ 

842 

843 if skip_last_n_hi is not None: 

844 skip_last_n_hi = -skip_last_n_hi 

845 

846 if skip_last_n_lo is not None: 

847 skip_last_n_lo = -skip_last_n_lo 

848 elif skip_last_n_hi is not None: 

849 skip_last_n_lo = skip_last_n_hi 

850 

851 INL_df: pl.DataFrame = pl.DataFrame() 

852 

853 for (gain, channel), frame in df.group_by(["gain", "channel"]): 

854 if gain == "hi": 

855 skip_last_n = skip_last_n_hi 

856 else: 

857 skip_last_n = skip_last_n_lo 

858 

859 energies_lf = frame.lazy().select("energies", "amp").explode("energies") 

860 if frame["energies"][0].dtype.is_nested(): 

861 energies_lf = energies_lf.explode("energies") 

862 energies_df = ( 

863 energies_lf.group_by("amp") 

864 .agg( 

865 energy_mean=pl.col("energies").mean(), 

866 energy_std=pl.col("energies").std(), 

867 n_energies=pl.col("energies").len(), 

868 ) 

869 .sort(by="amp") 

870 .collect() 

871 ) 

872 amps_arr: np.ndarray = energies_df["amp"].to_numpy() 

873 n_energies: np.ndarray = energies_df["n_energies"].to_numpy() 

874 e_arr: np.ndarray = energies_df["energy_mean"].to_numpy() 

875 dE_arr: np.ndarray = energies_df["energy_std"].to_numpy() / np.sqrt(n_energies) 

876 

877 if len(e_arr[:skip_last_n]) <= 1 or len(dE_arr[:skip_last_n]) <= 1 or len(amps_arr[:skip_last_n]) <= 1: 

878 # You can't fit a _unique_ line to a single point 

879 log.error("pipe_inl has only one energy") 

880 return df 

881 

882 popt, _ = curve_fit( 

883 helper.lin, 

884 amps_arr[:skip_last_n], 

885 e_arr[:skip_last_n], 

886 p0=[e_arr[1] / amps_arr[1], 0], 

887 sigma=dE_arr[:skip_last_n], 

888 absolute_sigma=True, 

889 ) 

890 

891 y_pred: np.ndarray = helper.lin(amps_arr, *popt) 

892 INL: np.ndarray = 100 * (e_arr - y_pred) / max(e_arr) 

893 

894 INL_df = pl.concat( 

895 [ 

896 INL_df, 

897 pl.DataFrame( 

898 { 

899 "channel": channel, 

900 "gain": gain, 

901 "amp": pl.Series(amps_arr, dtype=pl.Float32), 

902 "INL": pl.Series(INL, dtype=pl.Float32), 

903 }, 

904 ), 

905 ] 

906 ) 

907 

908 return df.join(INL_df, on=["gain", "channel", "amp"]) 

909 

910 

911def pipe_energy_sigma(df: pl.DataFrame, all_phases: bool = False) -> pl.DataFrame: 

912 """ 

913 Calculate per channel/amplitude energy histogram std and rms from pulse samples. 

914 Adds 'd_sigma' and 'd_mu' columns to the DataFrame. 

915 

916 Args: 

917 df: Polars DataFrame. 

918 mean_interleaved_pulse: Column with mean interleaved pulse. 

919 samples_interleaved: Column with list of interleaved pulses. 

920 OFCs_a: Column with OFCs_a 

921 energies: Column with energies 

922 

923 Returns: 

924 DataFrame with 'zero_crossing_time' and 'zero_crossing_error' columns. 

925 """ 

926 log.debug("Calculating energy sigma") 

927 if all_phases: 

928 return df.with_columns( 

929 d_mu=pl.col("energy_std") / pl.col("energies").list.eval(pl.element().list.len()).list.sum().sqrt(), 

930 d_sigma=pl.col("energy_std") 

931 / (2 * pl.col("energies").list.eval(pl.element().list.len()).list.sum() - 2).sqrt(), 

932 ) 

933 else: 

934 return df.with_columns( 

935 d_mu=pl.col("energy_std") / pl.col("energies").list.len().sqrt(), 

936 d_sigma=pl.col("energy_std") / (2 * pl.col("energies").list.len() - 2).sqrt(), 

937 ) 

938 

939 

940def find_max_sum_index(arr: np.ndarray, window_radius: int) -> int: 

941 window_size = 2 * window_radius + 1 

942 window = np.ones(window_size) 

943 sums = np.convolve(arr, window, mode="valid") 

944 max_index_in_sums = np.argmax(sums) 

945 max_index = max_index_in_sums + window_radius 

946 

947 return int(max_index) 

948 

949 

950def get_std_diff( 

951 df: pl.DataFrame, 

952 mean_interleaved_pulse: str = "mean_interleaved_pulse", 

953 ref_pulse_path: Path = constants.ROOTDIR / "polars_analysis/analysis/reference_pulse.txt", 

954) -> pl.DataFrame: 

955 ref_pulse: np.ndarray = np.loadtxt(ref_pulse_path) 

956 

957 pulse_std: list[float] = [] 

958 pulse_highest_diff: list[float] = [] 

959 pulse_ref_diff: list[np.ndarray] = [] 

960 pulse_normalized: list[np.ndarray] = [] 

961 

962 max_ind = find_max_sum_index(ref_pulse, 20) 

963 ref_pulse = ref_pulse[max_ind - 200 : max_ind + 1000] 

964 

965 for row in df.select(mean_interleaved_pulse).iter_rows(named=True): 

966 pulse = np.array(row[mean_interleaved_pulse]) 

967 pulse = pulse / np.max(pulse) 

968 try: 

969 max_ind = find_max_sum_index(pulse, 20) 

970 pulse = pulse[max_ind - 200 : max_ind + 1000] 

971 # print("MAX IND") 

972 # print(max_ind) 

973 

974 pulse_std.append(np.std(pulse - ref_pulse, dtype=float)) 

975 pulse_highest_diff.append(np.max(np.abs(pulse - ref_pulse))) 

976 pulse_ref_diff.append(pulse - ref_pulse) 

977 pulse_normalized.append(pulse) 

978 except ValueError: 

979 pulse_std.append(-100.0) 

980 pulse_highest_diff.append(-100.0) 

981 pulse_ref_diff.append(ref_pulse * 0) 

982 pulse_normalized.append(ref_pulse * 0) 

983 

984 return df.with_columns( 

985 pl.Series("pulse_std", pulse_std), 

986 pl.Series("pulse_highest_diff", pulse_highest_diff), 

987 pl.Series("pulse_ref_diff", values=pulse_ref_diff).cast(pl.List(pl.Float32)), 

988 pl.Series("pulse_normalized", values=pulse_normalized).cast(pl.List(pl.Float32)), 

989 ) 

990 

991 

992def pipe_ref_pulse_correlation( 

993 df: pl.DataFrame, 

994 mean_interleaved_pulse: str = "mean_interleaved_pulse", 

995 ref_pulse_path: Path = constants.ROOTDIR / "polars_analysis/analysis/reference_pulse.txt", 

996) -> pl.DataFrame: 

997 ref_pulse = np.loadtxt(ref_pulse_path) 

998 ref_df = df.with_columns(ref_pulse=pl.Series([ref_pulse]).first()) 

999 

1000 group_by_columns = ["run_number", "measurement", "channel", "gain"] 

1001 

1002 ref_df = ( 

1003 ref_df.lazy() 

1004 .explode(mean_interleaved_pulse, "ref_pulse") 

1005 .with_columns(ref_pulse_corr=pl.corr(mean_interleaved_pulse, "ref_pulse").over(group_by_columns)) 

1006 .group_by(group_by_columns) 

1007 .agg(pl.col("ref_pulse_corr").first().fill_nan(0)) 

1008 .collect() 

1009 ) 

1010 

1011 return df.join(ref_df, on=group_by_columns) 

1012 

1013 

1014def pipe_ref_pulse_rmse( 

1015 df: pl.DataFrame, 

1016 mean_interleaved_pulse: str = "mean_interleaved_pulse", 

1017 ref_pulse_path: Path = constants.ROOTDIR / "polars_analysis/analysis/reference_pulse.txt", 

1018) -> pl.DataFrame: 

1019 ref_pulse = np.loadtxt(ref_pulse_path) 

1020 ref_df = df.with_columns(ref_pulse=pl.Series([ref_pulse]).first()) 

1021 

1022 group_by_columns = ["run_number", "measurement", "channel", "gain"] 

1023 

1024 ref_df = ( 

1025 ref_df.lazy() 

1026 .with_columns( 

1027 pl.col(mean_interleaved_pulse).list.eval((pl.element() - pl.element().mean()) / pl.element().std()), 

1028 pl.col("ref_pulse").list.eval((pl.element() - pl.element().mean()) / pl.element().std()), 

1029 ) 

1030 .explode(mean_interleaved_pulse, "ref_pulse") 

1031 .with_columns( 

1032 ref_pulse_rmse=(pl.col(mean_interleaved_pulse) - pl.col("ref_pulse")) 

1033 .pow(2) 

1034 .mean() 

1035 .sqrt() 

1036 .over(group_by_columns) 

1037 ) 

1038 .group_by(group_by_columns) 

1039 .agg(pl.col("ref_pulse_rmse").first().fill_nan(0)) 

1040 .collect() 

1041 ) 

1042 

1043 return df.join(ref_df, on=group_by_columns) 

1044 

1045 

1046def pipe_gain_ratio(df: pl.DataFrame) -> pl.DataFrame: 

1047 join_columns = ["measurement", "channel"] 

1048 gain_ratios = ( 

1049 df.lazy() 

1050 .filter(pl.col("gain") == "lo") 

1051 .join(df.lazy().filter(gain="hi").select("amp", "energy_mean", "channel"), on=["amp", "channel"], suffix="_hi") 

1052 .select(*join_columns, (pl.col("energy_mean_hi") / pl.col("energy_mean")).alias("gain_ratio")) 

1053 .collect() 

1054 ) 

1055 return df.join(gain_ratios, on=join_columns, how="left")