Coverage for polars_analysis / analysis / pulse_analysis.py: 91%
309 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import logging
2from contextlib import suppress
3from pathlib import Path
4from typing import List, Optional, Tuple
6import numpy as np
7import polars as pl
8from scipy import linalg
9from scipy.optimize import curve_fit
10from scipy.signal import find_peaks
12from polars_analysis.analysis import constants
13from polars_analysis.plotting import helper
15# Instantiate logger
16log = logging.getLogger(__name__)
18"""
19Functions to calculate derived values for pulse runs.
20"""
23def expr_awg_amp_to_amp(col1: str = "awg_amp", col2: str = "att_val") -> pl.Expr:
24 """
25 Convert AWG amplitude to amplitude in mA.
27 Args:
28 col1: Column name of AWG amplitude.
29 col2: Column name of attenuation value.
31 Returns:
32 Expression to calculate amplitude in mA.
33 """
34 return 4.0 * pl.col(col1) * 10 ** (-pl.col(col2) / 20.0)
37def expr_max_pulse_amp(col: str = "samples") -> pl.Expr:
38 """
39 Get maximum pulse amplitude.
41 Args:
42 col: Column name of pulse samples.
44 Returns:
45 Expression to calculate maximum pulse amplitude.
46 """
47 return pl.col(col).list.max() - pl.col(col).list.median()
50def expr_max_phase_indices(mean_interleaved_pulse: str = "mean_interleaved_pulse", phase_shift: int = 0) -> pl.Expr:
51 """
52 Get indices of samples around the pulse peak, shifted by a phase.
53 Uses the maximum amplitude pulse per channel and gain, to mirror how the OFCs are derived.
55 Args:
56 mean_interleaved_pulse: Column name of mean interleaved pulse.
57 phase_shift: Phase shift in samples.
59 Returns:
60 Expression to calculate indices of samples around the maximum pulse amplitude.
61 """
62 samples_around_max = [-2, -1, 0, 1, 2]
63 return pl.concat_list(
64 [
65 (
66 pl.col(mean_interleaved_pulse)
67 .list.arg_max()
68 .filter(pl.col("awg_amp") == pl.col("awg_amp").max())
69 .first()
70 .over(["channel", "gain"])
71 + phase_shift
72 + (constants.N_PHASES * i)
73 )
74 % (constants.PULSES_PER_TRAIN * constants.SAMPLES_PER_PULSE)
75 for i in samples_around_max
76 ]
77 )
80def per_ch_interleaving_util(
81 all_pulses: np.ndarray,
82 trimmed_width: int,
83) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
84 """
85 Utility function for pipe_samples_interleaved below
86 """
88 mean_pulse_train = all_pulses.reshape([-1, constants.PULSE_TRAIN_PERIOD]).mean(axis=0)
89 threshold = np.median(mean_pulse_train) + 3 * np.std(mean_pulse_train)
90 # Require peaks be at least SAMPLES_PER_PULSE away from each other, with 5 samples of wiggle room
91 # Based on some testing with large pulses 1 or 2 samples should be fine, but I wanted to be safe
92 PEAK_SPACING_PADDING = 5
93 peaks: np.ndarray = find_peaks(
94 mean_pulse_train,
95 height=threshold,
96 distance=constants.SAMPLES_PER_PULSE - PEAK_SPACING_PADDING,
97 )[0]
98 if len(peaks) > constants.PULSES_PER_TRAIN:
99 log.warning(
100 (
101 f"Found more than {constants.PULSES_PER_TRAIN} possible "
102 f"pulses, will use {constants.PULSES_PER_TRAIN} highest peaks"
103 )
104 )
105 peaks = np.sort(peaks[np.argsort(mean_pulse_train[peaks])][-constants.PULSES_PER_TRAIN :])
106 if len(peaks) < constants.PULSES_PER_TRAIN:
107 log.warning(f"Found {len(peaks)} peaks, retrying with lower threshold")
108 threshold = np.median(mean_pulse_train) + 2 * np.std(mean_pulse_train)
109 peaks = find_peaks(
110 mean_pulse_train,
111 height=threshold,
112 distance=constants.SAMPLES_PER_PULSE - PEAK_SPACING_PADDING,
113 )[0]
114 peaks = np.sort(peaks[np.argsort(mean_pulse_train[peaks])][-constants.PULSES_PER_TRAIN :])
115 if len(peaks) < constants.PULSES_PER_TRAIN:
116 # Sometimes we get unlucky and sample on the falling edge of the first peak in a train.
117 # We can add one pulse to the end of the train to let us find the first peak
118 mean_pulse_train = np.concatenate([mean_pulse_train, mean_pulse_train[: constants.SAMPLES_PER_PULSE]])
119 log.warning(f"Found {len(peaks)} peaks after lowering threshold, retrying with extra first pulse")
120 threshold = np.median(mean_pulse_train) + 3 * np.std(mean_pulse_train)
121 peaks = find_peaks(
122 mean_pulse_train,
123 height=threshold,
124 distance=constants.SAMPLES_PER_PULSE - PEAK_SPACING_PADDING,
125 )[0]
126 peaks = np.sort(peaks[np.argsort(mean_pulse_train[peaks])][-constants.PULSES_PER_TRAIN :])
128 start, end = None, None
129 if len(peaks) > 0:
130 peaks = np.append(peaks, peaks[0] + constants.PULSE_TRAIN_PERIOD)
131 # Offset by PEAK_INDEX to make sure pulse peak is in the right spot
132 start = (peaks[np.diff(peaks).argmax() + 1] - constants.PEAK_INDEX) % constants.PULSE_TRAIN_PERIOD
133 # # This lines up with the previous version, but I'm not sure why that timing was chosen
134 # # The new timing also fixes the OFC samples plot, so maybe the old was a mistake
135 # start = (peaks[np.diff(peaks).argmax() + 1] - 8) % constants.PULSE_TRAIN_PERIOD
136 end = start + constants.PULSE_TRAIN_PERIOD * ((trimmed_width // constants.PULSE_TRAIN_PERIOD) - 1)
137 else:
138 log.error("No peaks found for interleaving!")
140 return (start, end)
143def pipe_samples_interleaved(
144 df: pl.DataFrame, samples: str = "samples", is_reference_pulse: Optional[str] = None
145) -> pl.DataFrame:
146 """
147 Trigger and interleave pulse samples, producing series of pulses that are SAMPLES_PER_PULSE * N_PHASES long.
148 Adds "samples_triggered", "samples_interleaved", "mean_interleaved_pulse"
149 and "samples_baseline" columns to the DataFrame.
151 Args:
152 df: DataFrame.
153 samples: Column name of pulse samples.
155 Returns:
156 DataFrame with 1D array of triggered pulse samples in "samples_triggered",
157 2D array of interleaved pulse samples in "samples_interleaved" column,
158 the mean of all interleaved pulses in "mean_interleaved_pulse" column,
159 and 2D array of baseline samples (in between pulse trains) in "samples_baseline" column.
160 """
161 width = df.select(pl.col(samples).list.len().min()).item()
162 num_trains = width // constants.PULSE_TRAIN_PERIOD
163 samples_interleaved = np.zeros([len(df), num_trains - 1, constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN])
164 samples_baseline = np.zeros([len(df), num_trains - 1, constants.TRIGGER_OFFSET])
165 # If you want to save the interleaved samples without the baseline subtracted
166 # samples_interleaved_raw = np.zeros(
167 # [len(df), num_trains - 1, constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN]
168 # )
170 trimmed_width = constants.PULSE_TRAIN_PERIOD * num_trains
171 samples_array = df[samples].list.slice(0, trimmed_width).list.to_array(trimmed_width).to_numpy()
173 ref_start, ref_end = None, None
174 if is_reference_pulse:
175 # The reference pulse for cross talk runs
176 reference_samples = ( # noqa F841
177 df.filter(pl.col(is_reference_pulse))
178 .sort(pl.col(samples).list.max()) # Need to pick high gain
179 .select(pl.last(samples))[samples]
180 .list.slice(0, trimmed_width)
181 .list.to_array(trimmed_width)
182 .to_numpy()
183 )
184 ref_start, ref_end = per_ch_interleaving_util(reference_samples, trimmed_width)
186 for i, all_pulses in enumerate(samples_array):
187 if is_reference_pulse:
188 start, end = ref_start, ref_end
189 else:
190 start, end = per_ch_interleaving_util(all_pulses, trimmed_width)
192 if start is None and end is None:
193 log.warning(f"No start and end found for index {i}")
194 log.warning(f"{df['channel', 'gain', 'awg_amp'][i]}")
196 # Samples triggered starts right at the peak, mainly used for autocorrelation
197 triggered_pulses = all_pulses[start:end].reshape([-1, constants.PULSE_TRAIN_PERIOD])
198 baseline_samples = triggered_pulses[:, constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN :]
199 interleaved_pulses = triggered_pulses[:, constants.TIME_INDICES_SORTED] - baseline_samples.mean()
200 with suppress(ValueError):
201 samples_interleaved[i] = interleaved_pulses
202 samples_baseline[i] = baseline_samples
204 # samples_interleaved_raw[i] = triggered_pulses[:, constants.TIME_INDICES_SORTED]
206 mean_interleaved_pulse = np.mean(samples_interleaved, axis=1)
208 df = df.with_columns(
209 pl.Series(name="samples_interleaved", values=samples_interleaved, dtype=pl.List(pl.List(pl.Float64))),
210 pl.Series(name="mean_interleaved_pulse", values=mean_interleaved_pulse, dtype=pl.List(pl.Float64)),
211 pl.Series(name="samples_baseline", values=samples_baseline),
212 # pl.Series(name="samples_interleaved_raw", values=samples_interleaved_raw),
213 )
215 # # Replace samples_interleaved with one where the baseline is determined once per channel, not per measurement
216 # df = df.join(
217 # df.filter(pl.col("awg_amp") == pl.col("awg_amp").max().over("gain")).with_columns(
218 # pl.col("samples_baseline")
219 # .arr.to_list()
220 # .list.eval(pl.element().arr.to_list().list.mean())
221 # .list.mean()
222 # .alias("max_amp_baseline_mean")
223 # )["run_number", "channel", "gain", "max_amp_baseline_mean"],
224 # on=["run_number", "channel", "gain"],
225 # how="left",
226 # )
227 # df = df.with_columns(
228 # (pl.col("samples_interleaved_raw") - pl.col("max_amp_baseline_mean")).alias("samples_interleaved")
229 # ).drop("max_amp_baseline_mean")
231 return df
234def calc_autocorr_along_axis(x: np.ndarray) -> np.ndarray:
235 """
236 Helper function to calculate autocorrelation along an axis.
237 TODO: Incorporate this into calc_autocorr.
239 Args:
240 x: Array of samples.
242 Returns:
243 Autocorrelation of the samples.
244 """
245 correlated = np.correlate(x, x, mode="full")
246 return correlated[correlated.size // 2 :]
249def baseline_calc_autocorr(trains: np.ndarray) -> np.ndarray:
250 """
251 Helper function to calculate autocorrelation of baseline samples.
252 Only uses the last 500 samples of each baseline, where the ADC response has returned to baseline.
254 Args:
255 trains: Array of baseline samples.
257 Returns:
258 Mean of the autocorrelations of the baseline samples.
259 """
260 last_500_samples = trains[:, -500:].astype(float)
261 last_500_samples -= np.nanmean(last_500_samples)
262 x = np.apply_along_axis(calc_autocorr_along_axis, 1, last_500_samples[:, :60])
263 ac = np.nanmean(x, axis=0)
264 return ac[:60]
267def pipe_OFCs(
268 df: pl.DataFrame,
269 mean_interleaved_pulse: str = "mean_interleaved_pulse",
270 max_phase_indices: str = "max_phase_indices",
271 amp: str = "amp",
272 is_crosstalk: bool = False,
273 quantile: float = 1.0,
274) -> pl.DataFrame:
275 """
276 Calculate Optimal Filter Coefficients (OFCs) for pulse samples.
277 Returns DataFrame with "autocorr", "OFCs_a", "OFCs_b", and "OFC_amp" columns.
279 Args:
280 df: DataFrame.
281 mean_interleaved_pulse: Column name of mean interleaved pulse.
282 max_phase_indices: Column name of indices around the maximum pulse amplitude.
283 amp: Column name of pulse amplitude.
285 Returns:
286 DataFrame with OFCs and autocorrelation.
287 """
288 if is_crosstalk:
289 # Only one row per channel and gain with max amplitude, and must be a reference pulse
290 ofc_df = df.filter(pl.col(amp) == pl.col(amp).max().over(["channel", "gain"]), pl.col("is_reference_pulse"))
291 join_columns = ["gain"]
292 if len(ofc_df) > 2:
293 log.warning(f"Found {len(ofc_df)} references pulses, only 2 (1 per gain) are supported for crosstalk runs")
294 else:
295 # One row per channel and gain with max amplitude
296 # Code to use Nth quantile amplitude
297 ofc_df = df.filter(pl.col(amp) == pl.quantile(amp, quantile).over(["channel", "gain"]))
298 log.debug(f"Amp used for OFCs = {ofc_df['channel', 'gain', 'amp', 'awg_amp']}")
299 # Code to use max amplitude
300 # ofc_df = df.filter(pl.col(amp) == pl.col(amp).max().over(["channel", "gain"]))
302 join_columns = ["channel", "gain"]
304 ofc_df = ofc_df.unique(subset=["channel", "gain"], keep="first", maintain_order=True)
305 updated_rows = []
307 for channel in ofc_df.select("channel").unique().to_series():
308 channel_df = ofc_df.filter(pl.col("channel") == channel)
310 a_coeffs: dict[str, List[List[float]]] = {
311 "hi": [[] for _ in range(constants.N_PHASES)],
312 "lo": [[] for _ in range(constants.N_PHASES)],
313 }
314 b_coeffs: dict[str, List[List[float]]] = {
315 "hi": [[] for _ in range(constants.N_PHASES)],
316 "lo": [[] for _ in range(constants.N_PHASES)],
317 }
318 autocorr: dict[str, List[float]] = {"hi": [], "lo": []}
320 for row in channel_df.iter_rows(named=True):
321 autocorrelation = baseline_calc_autocorr(np.array(row["samples_baseline"]))
322 autocorr[row["gain"]] = autocorrelation.tolist()
323 max_peak = np.max(np.array(row[mean_interleaved_pulse])[row[max_phase_indices]])
325 for i in range(constants.N_PHASES):
326 phase_shift = i + constants.PHASE_SHIFT0
327 indices = np.asarray(row[max_phase_indices]) + phase_shift
328 ofcs = calc_OFCs(
329 autocorrelation[:5],
330 np.array(row[mean_interleaved_pulse])[indices],
331 np.gradient(row[mean_interleaved_pulse])[indices],
332 max_peak,
333 )
334 a_coeffs[row["gain"]][i] = ofcs[0].tolist()
335 b_coeffs[row["gain"]][i] = ofcs[1].tolist()
337 updated_rows.append(
338 {
339 "channel": row["channel"],
340 "gain": row["gain"],
341 "autocorr": autocorr[row["gain"]],
342 "OFCs_a": a_coeffs[row["gain"]],
343 "OFCs_b": b_coeffs[row["gain"]],
344 "OFC_amp": row["amp"],
345 }
346 )
348 for i in range(constants.N_PHASES):
349 phase_shift = i + constants.PHASE_SHIFT0
350 if a_coeffs["lo"][i] == []:
351 log.error(f"Failed to compute lo gain OFCs for channel {channel} phase shift {phase_shift}!")
352 # Uncomment these to get around error
353 # a_coeffs["lo"][i] = a_coeffs["hi"][i]
354 # b_coeffs["lo"][i] = b_coeffs["hi"][i]
356 if a_coeffs["hi"][i] == []:
357 log.error(f"Failed to compute hi gain OFCs for channel {channel} phase shift {phase_shift}!")
358 # Uncomment these to get around error
359 # a_coeffs["hi"][i] = a_coeffs["lo"][i]
360 # b_coeffs["hi"][i] = b_coeffs["lo"][i]
362 return df.join(
363 pl.DataFrame(updated_rows, schema_overrides={"OFC_amp": pl.Float32}), on=join_columns, how="left"
364 ).drop("channel_right", strict=False)
367def calc_OFCs(
368 autocorrelation: np.ndarray, samples: np.ndarray, gradient: np.ndarray, max_peak: float
369) -> Tuple[np.ndarray, np.ndarray]:
370 """
371 Helper function to calculate Optimal Filter Coefficients (OFCs) for pulse samples.
372 Uses method and notation described in the Cleland & Stern paper,
373 "Signal processing considerations for liquid ionization calorimeters in a high rate environment".
375 Args:
376 autocorrelation: Autocorrelation of the pulse samples.
377 samples: Pulse samples.
379 Returns:
380 Tuple of OFCs a and b for the pulse samples.
381 """
382 autocorrelation = np.ravel(autocorrelation) / autocorrelation[0]
383 samples = np.ravel(samples)
384 gradient = np.ravel(gradient)
385 # scale = max(samples)
386 samples /= max_peak
387 gradient /= max_peak
388 # calculate V = R^{-1}.
389 inv_ac = linalg.inv(linalg.toeplitz(autocorrelation))
390 # calculate V*g and V*dg only once.
391 vg = np.dot(inv_ac, samples)
392 vdg = np.dot(inv_ac, gradient)
393 # calculate helper variables
394 q1 = np.dot(samples, vg)
395 q2 = np.dot(gradient, vdg)
396 q3 = np.dot(gradient, vg)
397 delta = q1 * q2 - q3 * q3
398 # calculate Lagrange multipliers
399 lm_lambda = q2 / delta
400 lm_kappa = -q3 / delta
401 lm_mu = q3 / delta
402 lm_rho = -q1 / delta
403 # calculate optimal filter coefficients
404 a_coeffs = lm_lambda * vg + lm_kappa * vdg
405 b_coeffs = lm_mu * vg + lm_rho * vdg
406 return a_coeffs, b_coeffs
409def pipe_apply_OFCs(
410 df: pl.DataFrame,
411 samples_interleaved: str = "samples_interleaved",
412 max_phase_indices: str = "max_phase_indices",
413 OFCs_a: str = "OFCs_a",
414 OFCs_b: str = "OFCs_b",
415 all_phases: bool = False,
416) -> pl.DataFrame:
417 """
418 Applies Optimal Filter Coefficients (OFCs) to pulse samples.
419 Returns DataFrame with "energies", "times", "energy_mean", "energy_std", "time_mean", and "time_std" columns.
421 Args:
422 df: DataFrame.
423 samples_interleaved: Column name of interleaved pulse samples.
424 max_phase_indices: Column name of indices around the maximum pulse amplitude.
425 OFCs_a: Column name of a OFCs.
426 OFCs_b: Column name of b OFCs.
428 Returns:
429 DataFrame with energies, times, and statistics.
430 """
432 log.debug("Applying OFCs")
433 # Calculate energies for all 30 phases of OFCs
434 energies = (
435 df.lazy()
436 .with_row_index("index0") # per dataframe row
437 .with_columns(pl.arange(0, constants.N_PHASES).implode().alias("phase"))
438 .explode(samples_interleaved)
439 .with_row_index("index1") # per pulse train
440 .explode(OFCs_a, "phase")
441 .select(
442 "index0",
443 "index1",
444 OFCs_a,
445 pl.col(samples_interleaved).list.gather(
446 (pl.col(max_phase_indices) + pl.col("phase") + constants.PHASE_SHIFT0)
447 % (constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN)
448 ),
449 "run_number",
450 "measurement",
451 "channel",
452 "gain",
453 )
454 .with_row_index("index2") # per phase per pulse train
455 .explode(OFCs_a, samples_interleaved)
456 .group_by("index2", maintain_order=True)
457 .agg(
458 pl.col("index0").first(),
459 pl.col("index1").first(),
460 pl.col(samples_interleaved).dot(OFCs_a).alias("energies"),
461 pl.col("run_number").first(),
462 pl.col("measurement").first(),
463 pl.col("channel").first(),
464 pl.col("gain").first(),
465 )
466 .drop("index2")
467 .group_by("index1", maintain_order=True)
468 .agg(
469 pl.col("index0").first(),
470 pl.col("energies"),
471 pl.col("run_number").first(),
472 pl.col("measurement").first(),
473 pl.col("channel").first(),
474 pl.col("gain").first(),
475 )
476 .drop("index1")
477 .group_by("index0")
478 .agg(
479 pl.col("energies"),
480 pl.col("run_number").first(),
481 pl.col("measurement").first(),
482 pl.col("channel").first(),
483 pl.col("gain").first(),
484 )
485 .drop("index0")
486 .collect()
487 )
488 df = df.join(energies, on=["run_number", "measurement", "channel", "gain"])
490 # Calculate times for all 30 phases of OFCs
491 times = (
492 df.lazy()
493 .with_row_index("index0") # per dataframe row
494 .with_columns(pl.arange(0, constants.N_PHASES).implode().alias("phase"))
495 .explode("energies", samples_interleaved)
496 .with_row_index("index1") # per pulse train
497 .explode(OFCs_b, "energies", "phase")
498 .select(
499 "index0",
500 "index1",
501 "energies",
502 OFCs_b,
503 pl.col(samples_interleaved).list.gather(
504 (pl.col(max_phase_indices) + pl.col("phase") + constants.PHASE_SHIFT0)
505 % (constants.SAMPLES_PER_PULSE * constants.PULSES_PER_TRAIN)
506 ),
507 "run_number",
508 "measurement",
509 "channel",
510 "gain",
511 )
512 .with_row_index("index2") # per phase
513 .explode(OFCs_b, samples_interleaved)
514 .group_by("index2", maintain_order=True)
515 .agg(
516 pl.col("index0").first(),
517 pl.col("index1").first(),
518 pl.col(samples_interleaved).dot(OFCs_b).truediv(pl.col("energies").first()).alias("times"),
519 pl.col("run_number").first(),
520 pl.col("measurement").first(),
521 pl.col("channel").first(),
522 pl.col("gain").first(),
523 )
524 .drop("index2")
525 .group_by("index1", maintain_order=True)
526 .agg(
527 pl.col("index0").first(),
528 pl.col("times"),
529 pl.col("run_number").first(),
530 pl.col("measurement").first(),
531 pl.col("channel").first(),
532 pl.col("gain").first(),
533 )
534 .drop("index1")
535 .group_by("index0")
536 .agg(
537 pl.col("times"),
538 pl.col("run_number").first(),
539 pl.col("measurement").first(),
540 pl.col("channel").first(),
541 pl.col("gain").first(),
542 )
543 .drop("index0")
544 .collect()
545 )
546 df = df.join(times, on=["run_number", "measurement", "channel", "gain"])
548 # Calculate statistics just for peak pulse
549 if not all_phases:
550 df = (
551 df.lazy()
552 .with_columns(
553 pl.col("energies").list.eval(pl.element().list.get(constants.PHASE_SHIFT_PEAK)).alias("energies"),
554 pl.col("times").list.eval(pl.element().list.get(constants.PHASE_SHIFT_PEAK)).alias("times"),
555 )
556 .with_columns(
557 pl.col(OFCs_a).list.get(constants.PHASE_SHIFT_PEAK).alias(OFCs_a),
558 pl.col(OFCs_b).list.get(constants.PHASE_SHIFT_PEAK).alias(OFCs_b),
559 pl.col("energies").list.mean().alias("energy_mean"),
560 pl.col("energies").list.std().alias("energy_std"),
561 pl.col("energies").list.max().alias("energy_max"),
562 pl.col("energies").list.min().alias("energy_min"),
563 )
564 .collect()
565 )
566 # Calculate statistics across all 30 pulses in a train, rather than just the peak pulse
567 else:
568 df = (
569 df.lazy()
570 .with_columns(
571 pl.col("energies").list.eval(pl.element().explode()).list.mean().alias("energy_mean"),
572 pl.col("energies").list.eval(pl.element().explode()).list.max().alias("energy_max"),
573 pl.col("energies").list.eval(pl.element().explode()).list.min().alias("energy_min"),
574 pl.col("energies").list.eval(pl.element().explode()).list.std().alias("energy_std"),
575 )
576 .collect()
577 )
579 # This paper about Run 1 topoclustering says only times from cells with energy significance > 2
580 # are used to calculate cluster timing
581 # Refer to Section 4.2.3, "Signal timing": https://arxiv.org/abs/1603.02934
582 CELL_SIGMA_TRESHOLD = 2
583 if not all_phases:
584 df = (
585 df.drop("times")
586 .join(
587 df.lazy()
588 .select("run_number", "measurement", "channel", "gain", "energies", "times", "energy_std")
589 .with_row_index()
590 .explode("energies", "times")
591 .filter(pl.col("energies").abs() > CELL_SIGMA_TRESHOLD * pl.col("energy_std"))
592 .group_by("index")
593 .agg(pl.col("run_number", "measurement", "channel", "gain").first(), "times")
594 .drop("index")
595 .collect(),
596 on=["run_number", "measurement", "channel", "gain"],
597 )
598 .with_columns(
599 pl.col("times").list.mean().alias("time_mean"),
600 pl.col("times").list.std().alias("time_std"),
601 )
602 )
603 else:
604 df = (
605 df.drop("times")
606 .join(
607 df.lazy()
608 .select("run_number", "measurement", "channel", "gain", "energies", "times", "energy_std")
609 .with_row_index("index0")
610 .explode("energies", "times")
611 .with_row_index("index1")
612 .explode("energies", "times")
613 # .filter(pl.col("energies").abs() > CELL_SIGMA_TRESHOLD * pl.col("energy_std"))
614 .with_columns(
615 pl.when(pl.col("energies").abs() > CELL_SIGMA_TRESHOLD * pl.col("energy_std"))
616 .then(pl.col("times"))
617 .otherwise(None)
618 .alias("times")
619 )
620 .group_by("index1", maintain_order=True)
621 .agg(pl.col("run_number", "measurement", "channel", "gain", "index0").first(), "times")
622 .drop("index1")
623 .group_by("index0", maintain_order=True)
624 .agg(pl.col("run_number", "measurement", "channel", "gain").first(), "times")
625 .drop("index0")
626 .collect(),
627 on=["run_number", "measurement", "channel", "gain"],
628 )
629 .lazy()
630 .with_columns(
631 pl.col("times").list.eval(pl.element().explode()).list.mean().alias("time_mean"),
632 pl.col("times").list.eval(pl.element().explode()).list.std().alias("time_std"),
633 )
634 .collect()
635 )
637 ### Ignoring times from low energy cells is designed to replicate what ATLAS does,
638 ### but some less aggressive ways to discard outlier times are included below.
639 # # Drop outlier times
640 # df = df.with_columns(
641 # pl.col("times")
642 # .list.eval(
643 # pl.element().filter(
644 # pl.element()
645 # < pl.max_horizontal(
646 # pl.element().quantile(0.75) + 1.5 * (pl.element().quantile(0.75)-pl.element().quantile(0.25)), 25
647 # ),
648 # pl.element()
649 # > pl.min_horizontal(
650 # pl.element().quantile(0.25) - 1.5 * (pl.element().quantile(0.75)-pl.element().quantile(0.25)), -25
651 # ),
652 # )
653 # )
654 # .alias("times")
655 # )
656 # # Drop outlier times and their corresponding energies
657 # df = df.select(pl.exclude("energies", "times")).hstack(
658 # df["measurement", "channel", "gain", "energies", "times"]
659 # .explode(["energies", "times"])
660 # .with_columns(
661 # pl.col("times").quantile(0.75).over("measurement", "channel", "gain").alias("Q3"),
662 # pl.col("times").quantile(0.25).over("measurement", "channel", "gain").alias("Q1"),
663 # )
664 # .with_columns((pl.col("Q3") - pl.col("Q1")).alias("IQR"))
665 # .filter(
666 # pl.col("times") < pl.max_horizontal(pl.col("Q3") + 1.5 * pl.col("IQR"), 25),
667 # pl.col("times") > pl.min_horizontal(pl.col("Q1") - 1.5 * pl.col("IQR"), -25),
668 # )
669 # .group_by("measurement", "channel", "gain", maintain_order=True)
670 # .agg("energies", "times")
671 # .select("energies", "times")
672 # )
674 return df
677def pipe_rise_time(
678 df: pl.DataFrame,
679 samples_interleaved: str = "samples_interleaved",
680 mean_interleaved_pulse: str = "mean_interleaved_pulse",
681) -> pl.DataFrame:
682 """
683 Calculate both rise time (from mean pulse) and rise time error (Gaussian sigma from individual pulses).
685 Args:
686 df: Polars DataFrame.
687 samples_interleaved: Column name with list of pulses.
688 mean_interleaved_pulse: Column name with mean pulse.
690 Returns:
691 DataFrame with 'rise_time' and 'rise_time_error' columns.
692 """
693 log.debug("Calculating rise times")
694 FIVE_PERCENT = 0.05
695 rise_times = []
696 rise_time_errors: List[float] = []
698 for row in df.iter_rows(named=True):
699 # --- Rise Time from Mean Pulse ---
700 flipped_pulse = np.flip(row[mean_interleaved_pulse])
701 flipped_times = np.flip(constants.INTERLEAVED_TIMES)
702 max_i = np.argmax(flipped_pulse)
703 pulse_peak_time = flipped_times[max_i]
704 flipped_pulse = flipped_pulse[max_i:]
706 low_points = np.copy(flipped_pulse)
707 low_points[flipped_pulse > FIVE_PERCENT * flipped_pulse[0]] = 0
709 if np.nonzero(low_points)[0].shape[0] == 0:
710 rise_times.append(0.0)
711 else:
712 five_percent_point = np.nonzero(low_points)[0][0]
713 five_percent_time = flipped_times[max_i + five_percent_point]
714 rise_times.append(pulse_peak_time - five_percent_time)
716 # Rise time error from all pulses
717 rise_time_vals = []
718 for sample in row[samples_interleaved]:
719 flipped = np.flip(sample)
720 flipped_times = np.flip(constants.INTERLEAVED_TIMES)
721 max_i = np.argmax(flipped)
722 peak_time = flipped_times[max_i]
723 flipped = flipped[max_i:]
725 low_points = np.copy(flipped)
726 low_points[flipped > FIVE_PERCENT * flipped[0]] = 0
728 if np.nonzero(low_points)[0].shape[0] == 0:
729 continue
730 else:
731 five_percent_point = np.nonzero(low_points)[0][0]
732 five_percent_time = flipped_times[max_i + five_percent_point]
733 rise_time_vals.append(peak_time - five_percent_time)
735 # Dummy values to make sure this isn't empty
736 if len(rise_time_vals) == 0:
737 rise_time_errors = [0 for _ in rise_times]
738 else:
739 hist_bins = np.linspace(min(rise_time_vals), max(rise_time_vals), 25)
740 fit_pars = helper.calc_gaussian(rise_time_vals, hist_bins)
741 d_mu = fit_pars[1]
742 rise_time_errors.append(d_mu)
744 return df.with_columns(
745 [
746 pl.Series(name="rise_time", values=rise_times),
747 pl.Series(name="rise_time_error", values=rise_time_errors, strict=False),
748 ]
749 )
752def pipe_zero_crossing(
753 df: pl.DataFrame,
754 mean_interleaved_pulse: str = "mean_interleaved_pulse",
755 samples_interleaved: str = "samples_interleaved",
756) -> pl.DataFrame:
757 """
758 Calculate falling zero crossing time (after peak) and zero crossing error
759 from pulse samples. Adds 'zero_crossing_time' and 'zero_crossing_error'
760 columns to the DataFrame.
762 Args:
763 df: Polars DataFrame.
764 mean_interleaved_pulse: Column with mean interleaved pulse.
765 samples_interleaved: Column with list of interleaved pulses.
767 Returns:
768 DataFrame with 'zero_crossing_time' and 'zero_crossing_error' columns.
769 """
770 log.debug("Calculating zero crossing times")
771 zero_crossing_times: list[float] = []
772 zero_crossing_errors: list[float] = []
773 times = constants.INTERLEAVED_TIMES
775 for row in df.iter_rows(named=True):
776 # zero crossing time (from mean pulse)
777 pulse = row[mean_interleaved_pulse]
778 max_i = np.argmax(pulse)
779 peak_time = times[max_i]
781 for i in range(max_i, len(pulse) - 1):
782 if pulse[i] == 0:
783 zero_crossing_times.append(round(times[i], 1))
784 break
785 elif pulse[i] > 0 and pulse[i + 1] < 0:
786 y1, y2 = pulse[i], pulse[i + 1]
787 closest_to_zero = min([y1, y2], key=lambda y: abs(y))
788 crossing_idx = i if closest_to_zero == y1 else i + 1
789 zero_crossing_time = round(times[crossing_idx] - peak_time, 1)
790 zero_crossing_times.append(zero_crossing_time)
791 break
792 else:
793 zero_crossing_times.append(np.nan)
795 # zero crossing error (from samples)
796 zero_crossing_time_row = []
797 for sample in row[samples_interleaved]:
798 max_i = np.argmax(sample)
799 peak_time = times[max_i]
801 for i in range(max_i, len(sample) - 1):
802 if sample[i] == 0:
803 zero_crossing_time_row.append(round(times[i] - peak_time, 1))
804 break
805 elif sample[i] > 0 and sample[i + 1] < 0:
806 y1, y2 = sample[i], sample[i + 1]
807 closest_to_zero = min([y1, y2], key=lambda y: abs(y))
808 crossing_idx = i if closest_to_zero == y1 else i + 1
809 zero_crossing_time_row.append(round(times[crossing_idx] - peak_time, 1))
810 break
812 if zero_crossing_time_row:
813 d_mu = np.std(zero_crossing_time_row) / np.sqrt(len(zero_crossing_time_row))
814 zero_crossing_errors.append(d_mu)
815 else:
816 zero_crossing_errors.append(np.nan)
818 return df.with_columns(
819 [
820 pl.Series(name="zero_crossing_time", values=zero_crossing_times, dtype=pl.Float64),
821 pl.Series(name="zero_crossing_error", values=zero_crossing_errors, dtype=pl.Float64),
822 ]
823 )
826def pipe_inl(
827 df: pl.DataFrame,
828 skip_last_n_hi: Optional[int] = None,
829 skip_last_n_lo: Optional[int] = None,
830) -> pl.DataFrame:
831 """
832 Args:
833 df: Dataframe. Needs columns 'energy_mean',
834 'energy_std', 'amp', 'gain', 'board_id',
835 'run_number', 'att_val', and 'channel'.
836 skip_last_n_hi: skip last n points for hi gain
837 skip_last_n_lo: skip last n points for lo gain. If None, it's set to hi
839 Returns:
840 Updated dataframe with INL
841 """
843 if skip_last_n_hi is not None:
844 skip_last_n_hi = -skip_last_n_hi
846 if skip_last_n_lo is not None:
847 skip_last_n_lo = -skip_last_n_lo
848 elif skip_last_n_hi is not None:
849 skip_last_n_lo = skip_last_n_hi
851 INL_df: pl.DataFrame = pl.DataFrame()
853 for (gain, channel), frame in df.group_by(["gain", "channel"]):
854 if gain == "hi":
855 skip_last_n = skip_last_n_hi
856 else:
857 skip_last_n = skip_last_n_lo
859 energies_lf = frame.lazy().select("energies", "amp").explode("energies")
860 if frame["energies"][0].dtype.is_nested():
861 energies_lf = energies_lf.explode("energies")
862 energies_df = (
863 energies_lf.group_by("amp")
864 .agg(
865 energy_mean=pl.col("energies").mean(),
866 energy_std=pl.col("energies").std(),
867 n_energies=pl.col("energies").len(),
868 )
869 .sort(by="amp")
870 .collect()
871 )
872 amps_arr: np.ndarray = energies_df["amp"].to_numpy()
873 n_energies: np.ndarray = energies_df["n_energies"].to_numpy()
874 e_arr: np.ndarray = energies_df["energy_mean"].to_numpy()
875 dE_arr: np.ndarray = energies_df["energy_std"].to_numpy() / np.sqrt(n_energies)
877 if len(e_arr[:skip_last_n]) <= 1 or len(dE_arr[:skip_last_n]) <= 1 or len(amps_arr[:skip_last_n]) <= 1:
878 # You can't fit a _unique_ line to a single point
879 log.error("pipe_inl has only one energy")
880 return df
882 popt, _ = curve_fit(
883 helper.lin,
884 amps_arr[:skip_last_n],
885 e_arr[:skip_last_n],
886 p0=[e_arr[1] / amps_arr[1], 0],
887 sigma=dE_arr[:skip_last_n],
888 absolute_sigma=True,
889 )
891 y_pred: np.ndarray = helper.lin(amps_arr, *popt)
892 INL: np.ndarray = 100 * (e_arr - y_pred) / max(e_arr)
894 INL_df = pl.concat(
895 [
896 INL_df,
897 pl.DataFrame(
898 {
899 "channel": channel,
900 "gain": gain,
901 "amp": pl.Series(amps_arr, dtype=pl.Float32),
902 "INL": pl.Series(INL, dtype=pl.Float32),
903 },
904 ),
905 ]
906 )
908 return df.join(INL_df, on=["gain", "channel", "amp"])
911def pipe_energy_sigma(df: pl.DataFrame, all_phases: bool = False) -> pl.DataFrame:
912 """
913 Calculate per channel/amplitude energy histogram std and rms from pulse samples.
914 Adds 'd_sigma' and 'd_mu' columns to the DataFrame.
916 Args:
917 df: Polars DataFrame.
918 mean_interleaved_pulse: Column with mean interleaved pulse.
919 samples_interleaved: Column with list of interleaved pulses.
920 OFCs_a: Column with OFCs_a
921 energies: Column with energies
923 Returns:
924 DataFrame with 'zero_crossing_time' and 'zero_crossing_error' columns.
925 """
926 log.debug("Calculating energy sigma")
927 if all_phases:
928 return df.with_columns(
929 d_mu=pl.col("energy_std") / pl.col("energies").list.eval(pl.element().list.len()).list.sum().sqrt(),
930 d_sigma=pl.col("energy_std")
931 / (2 * pl.col("energies").list.eval(pl.element().list.len()).list.sum() - 2).sqrt(),
932 )
933 else:
934 return df.with_columns(
935 d_mu=pl.col("energy_std") / pl.col("energies").list.len().sqrt(),
936 d_sigma=pl.col("energy_std") / (2 * pl.col("energies").list.len() - 2).sqrt(),
937 )
940def find_max_sum_index(arr: np.ndarray, window_radius: int) -> int:
941 window_size = 2 * window_radius + 1
942 window = np.ones(window_size)
943 sums = np.convolve(arr, window, mode="valid")
944 max_index_in_sums = np.argmax(sums)
945 max_index = max_index_in_sums + window_radius
947 return int(max_index)
950def get_std_diff(
951 df: pl.DataFrame,
952 mean_interleaved_pulse: str = "mean_interleaved_pulse",
953 ref_pulse_path: Path = constants.ROOTDIR / "polars_analysis/analysis/reference_pulse.txt",
954) -> pl.DataFrame:
955 ref_pulse: np.ndarray = np.loadtxt(ref_pulse_path)
957 pulse_std: list[float] = []
958 pulse_highest_diff: list[float] = []
959 pulse_ref_diff: list[np.ndarray] = []
960 pulse_normalized: list[np.ndarray] = []
962 max_ind = find_max_sum_index(ref_pulse, 20)
963 ref_pulse = ref_pulse[max_ind - 200 : max_ind + 1000]
965 for row in df.select(mean_interleaved_pulse).iter_rows(named=True):
966 pulse = np.array(row[mean_interleaved_pulse])
967 pulse = pulse / np.max(pulse)
968 try:
969 max_ind = find_max_sum_index(pulse, 20)
970 pulse = pulse[max_ind - 200 : max_ind + 1000]
971 # print("MAX IND")
972 # print(max_ind)
974 pulse_std.append(np.std(pulse - ref_pulse, dtype=float))
975 pulse_highest_diff.append(np.max(np.abs(pulse - ref_pulse)))
976 pulse_ref_diff.append(pulse - ref_pulse)
977 pulse_normalized.append(pulse)
978 except ValueError:
979 pulse_std.append(-100.0)
980 pulse_highest_diff.append(-100.0)
981 pulse_ref_diff.append(ref_pulse * 0)
982 pulse_normalized.append(ref_pulse * 0)
984 return df.with_columns(
985 pl.Series("pulse_std", pulse_std),
986 pl.Series("pulse_highest_diff", pulse_highest_diff),
987 pl.Series("pulse_ref_diff", values=pulse_ref_diff).cast(pl.List(pl.Float32)),
988 pl.Series("pulse_normalized", values=pulse_normalized).cast(pl.List(pl.Float32)),
989 )
992def pipe_ref_pulse_correlation(
993 df: pl.DataFrame,
994 mean_interleaved_pulse: str = "mean_interleaved_pulse",
995 ref_pulse_path: Path = constants.ROOTDIR / "polars_analysis/analysis/reference_pulse.txt",
996) -> pl.DataFrame:
997 ref_pulse = np.loadtxt(ref_pulse_path)
998 ref_df = df.with_columns(ref_pulse=pl.Series([ref_pulse]).first())
1000 group_by_columns = ["run_number", "measurement", "channel", "gain"]
1002 ref_df = (
1003 ref_df.lazy()
1004 .explode(mean_interleaved_pulse, "ref_pulse")
1005 .with_columns(ref_pulse_corr=pl.corr(mean_interleaved_pulse, "ref_pulse").over(group_by_columns))
1006 .group_by(group_by_columns)
1007 .agg(pl.col("ref_pulse_corr").first().fill_nan(0))
1008 .collect()
1009 )
1011 return df.join(ref_df, on=group_by_columns)
1014def pipe_ref_pulse_rmse(
1015 df: pl.DataFrame,
1016 mean_interleaved_pulse: str = "mean_interleaved_pulse",
1017 ref_pulse_path: Path = constants.ROOTDIR / "polars_analysis/analysis/reference_pulse.txt",
1018) -> pl.DataFrame:
1019 ref_pulse = np.loadtxt(ref_pulse_path)
1020 ref_df = df.with_columns(ref_pulse=pl.Series([ref_pulse]).first())
1022 group_by_columns = ["run_number", "measurement", "channel", "gain"]
1024 ref_df = (
1025 ref_df.lazy()
1026 .with_columns(
1027 pl.col(mean_interleaved_pulse).list.eval((pl.element() - pl.element().mean()) / pl.element().std()),
1028 pl.col("ref_pulse").list.eval((pl.element() - pl.element().mean()) / pl.element().std()),
1029 )
1030 .explode(mean_interleaved_pulse, "ref_pulse")
1031 .with_columns(
1032 ref_pulse_rmse=(pl.col(mean_interleaved_pulse) - pl.col("ref_pulse"))
1033 .pow(2)
1034 .mean()
1035 .sqrt()
1036 .over(group_by_columns)
1037 )
1038 .group_by(group_by_columns)
1039 .agg(pl.col("ref_pulse_rmse").first().fill_nan(0))
1040 .collect()
1041 )
1043 return df.join(ref_df, on=group_by_columns)
1046def pipe_gain_ratio(df: pl.DataFrame) -> pl.DataFrame:
1047 join_columns = ["measurement", "channel"]
1048 gain_ratios = (
1049 df.lazy()
1050 .filter(pl.col("gain") == "lo")
1051 .join(df.lazy().filter(gain="hi").select("amp", "energy_mean", "channel"), on=["amp", "channel"], suffix="_hi")
1052 .select(*join_columns, (pl.col("energy_mean_hi") / pl.col("energy_mean")).alias("gain_ratio"))
1053 .collect()
1054 )
1055 return df.join(gain_ratios, on=join_columns, how="left")