Coverage for polars_analysis / analysis / linearity_analysis.py: 92%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2from collections import defaultdict 

3 

4import numpy as np 

5import polars as pl 

6from scipy.optimize import curve_fit 

7 

8import polars_analysis.analysis.pulse_analysis as analysis 

9from polars_analysis.plotting import helper 

10 

11log = logging.getLogger(__name__) 

12 

13 

14def calc_linearity_slopes(df: pl.DataFrame, att_val: str = "att_val") -> pl.DataFrame: 

15 """ 

16 Calculates the slope and its error of measured energy_mean vs input current amplitude. 

17 Uses weighted least squares, assuming a y-intercept of 0. 

18 See: https://math.stackexchange.com/a/2701014 

19 Handles measurements with different attenuations, gains, and channels. 

20 

21 Adds columns `slope` and `slope_error`, both floats 

22 """ 

23 output = defaultdict(list) 

24 for gain, channel, attenuation in df.select("gain", "channel", att_val).unique().iter_rows(): 

25 filtered_df = df.filter( 

26 pl.col("gain") == gain, 

27 pl.col("channel") == channel, 

28 pl.col(att_val) == attenuation, 

29 ) 

30 amps_arr = filtered_df["amp"].to_numpy() 

31 e_arr = filtered_df["energy_mean"].to_numpy() 

32 dE_arr = filtered_df["energy_std"].to_numpy() 

33 if len(e_arr) <= 1: 

34 continue 

35 popt, pcov = curve_fit( 

36 helper.lin, 

37 amps_arr, 

38 e_arr, 

39 p0=[e_arr[1] / amps_arr[1], 0], 

40 sigma=dE_arr, 

41 absolute_sigma=True, 

42 ) 

43 slope = popt[0] 

44 slope_error = np.sqrt(pcov[0][0]) 

45 if slope == 0 or slope_error == np.inf: 

46 log.warning(f"Failed to find slope for {gain=}, {channel=}, {attenuation=}") 

47 continue 

48 output["gain"].append(gain) 

49 output["channel"].append(channel) 

50 output[att_val].append(attenuation) 

51 output["slope"].append(slope) 

52 output["slope_error"].append(slope_error) 

53 

54 slopes_df = pl.DataFrame(output) 

55 return df.join(slopes_df, on=[att_val, "gain", "channel"]) 

56 

57 

58def calibrate_scale_factors(df: pl.DataFrame, reference_att: float = 35.0) -> pl.DataFrame: 

59 """ 

60 Uses the slopes of measured energy_mean vs input current (amp) to derived scale 

61 factors between attenuations. The reference_att value is taken as "correct", and 

62 the other attenuations are scaled relative to that. 

63 

64 The input current can then be multipled by the resulting scale_factor to determine 

65 the "correct" post-attenuation input current. 

66 

67 Adds columns `scale_factor`, `calibrated_att_val`, and `calibrated_att_error` 

68 """ 

69 ref_df = ( 

70 df.select("gain", "slope", "att_val") 

71 .filter(att_val=reference_att) 

72 .select( 

73 "gain", 

74 pl.col("slope").alias("ref_slope"), 

75 ) 

76 .unique() 

77 ) 

78 calibration_df = ( 

79 df.select("gain", "slope", "slope_error", "att_val") 

80 .unique() 

81 .join(ref_df, on=["gain"]) # type: ignore 

82 .with_columns( 

83 [ 

84 # Scale factor = measured_slope / reference_slope 

85 (pl.col("slope") / pl.col("ref_slope")).alias("scale_factor"), 

86 # Relative error in scale factor (for error propagation) 

87 (pl.col("slope_error") / pl.col("slope")).alias("relative_slope_error"), 

88 ] 

89 ) 

90 .with_columns( 

91 [ 

92 # dB correction = 20 * log10(scale_factor) 

93 (20.0 * pl.col("scale_factor").log10()).alias("db_correction"), 

94 # Error in dB correction: σ_dB ≈ (20/ln(10)) * (σ_scale/scale) ≈ 8.686 * relative_error 

95 (8.686 * pl.col("relative_slope_error")).alias("db_correction_error"), 

96 ] 

97 ) 

98 .select(["gain", "att_val", "scale_factor", "db_correction", "db_correction_error"]) 

99 ) 

100 # Apply calibration 

101 result_df = ( 

102 df.join(calibration_df, on=["gain", "att_val"], how="left") # type: ignore 

103 .with_columns( 

104 [ 

105 # Calibrated attenuation = nominal + dB correction 

106 (pl.col("att_val") + pl.col("db_correction")).alias("calibrated_att_val"), 

107 # Error propagates directly in dB space 

108 pl.col("db_correction_error").alias("calibrated_att_error"), 

109 ] 

110 ) 

111 .drop(["db_correction", "db_correction_error"]) 

112 ) 

113 

114 return result_df 

115 

116 

117def calc_derived(target_df: pl.DataFrame, source_df: pl.DataFrame, all_phases: bool) -> pl.DataFrame: 

118 """ 

119 Take the OFCs from source_df and use them to calculate derived values for target_df 

120 """ 

121 return ( 

122 target_df.drop("OFCs_a", "OFCs_b", "energies", "times", "energy_mean", "energy_std", "time_mean", "time_std") 

123 .join(source_df.select("gain", "OFCs_a", "OFCs_b").unique(), on="gain") 

124 .pipe(analysis.pipe_apply_OFCs, all_phases=all_phases) 

125 .pipe(analysis.pipe_energy_sigma, all_phases=all_phases) 

126 .drop("samples", "samples_triggered", strict=False) 

127 )