Coverage for polars_analysis / db_interface / prod_db_data_uploader.py: 58%

173 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import logging 

2import subprocess as sp 

3from pathlib import Path 

4from typing import Any, Dict, List, Literal, Optional, Tuple 

5 

6import polars as pl 

7 

8from polars_analysis.analysis import constants, cut_thresholds 

9from polars_analysis.db_interface.production_test_db import ProductionTestDB 

10from polars_analysis.models import Gain, Label, QC_Data, QC_MeasType 

11 

12log = logging.getLogger(__name__) 

13 

14 

15def prod_db_label_builder( 

16 variable: str, 

17 meas_type: QC_MeasType, 

18 gain: Optional[Gain] = None, 

19 channel: Optional[int] = None, 

20 amp: Optional[float] = None, 

21 attn: Optional[float] = None, 

22 link: Optional[int] = None, 

23 adc: Optional[int] = None, 

24) -> Dict[str, Any]: 

25 """ 

26 Produce dictionary label for production db variable 

27 example: 

28 { 

29 variable: mean, 

30 meas_type: pedestal, 

31 gain: lo, 

32 channel: 114 

33 } 

34 """ 

35 

36 return Label( 

37 variable=variable, 

38 meas_type=meas_type, 

39 gain=gain, 

40 channel=channel, 

41 awg_amp=amp, 

42 att_val=attn, 

43 link=link, 

44 adc=adc, 

45 ).model_dump(exclude_none=True) 

46 

47 

48def upload_variables(db: ProductionTestDB) -> bool: 

49 """ 

50 Probably not needed since upload_thresholds will upload missing variables, but keeping for now. 

51 """ 

52 

53 gains: List[Gain] = ["hi", "lo"] 

54 amps: Dict[str, list[float]] = {"lo": [constants.LO_GAIN_AWG_AMP_VAL], "hi": [constants.HI_GAIN_AWG_AMP_VAL]} 

55 attns: Dict[str, list[float]] = { 

56 "lo": [constants.LO_GAIN_ATTN_VAL], 

57 "hi": [constants.HI_GAIN_ATTN_VAL], 

58 } # must have same length of entries as amps 

59 channels = range(128) 

60 adcs = range(32) 

61 links = range(22) # 22 data lpGBT uplinks 

62 

63 pedestal_variables = [ 

64 "std", 

65 "mean", 

66 "std_50ohm", 

67 "mean_50ohm", 

68 "std_hps2", 

69 "mean_hps2", 

70 "ber", # BER is done via ADC patterns, so for each channel 

71 "ber_scan", # pass/fail if scan around default is good 

72 ] 

73 

74 pedestal_variables_per_gain = ["coherent_noise_0_128"] 

75 

76 pulse_variables = ["risetime_mean", "simple_INL"] 

77 pgr = "gain_ratio" # is special since it's at the same amp for hi and lo 

78 pgr_amp = constants.HI_GAIN_AWG_AMP_VAL 

79 pgr_attn = constants.HI_GAIN_ATTN_VAL 

80 

81 pulse_variables_per_amp = [ 

82 "energy_mean", 

83 "energy_std", 

84 "time_mean", 

85 "time_std", 

86 "zero_crossing_time", 

87 "ref_corr", 

88 "ref_rmse", 

89 ] 

90 

91 per_link_variables = [ 

92 "optical_power", 

93 "locked", # pass/fail 

94 ] 

95 

96 per_board_variables = [ 

97 "power_draw_init", 

98 "power_draw_configured", 

99 ] 

100 

101 per_adc_variables = ["valid_clockscan", "valid_calibration"] 

102 

103 ret = True 

104 

105 # Pedestal per gain, channel 

106 for var in pedestal_variables: 

107 for gain in gains: 

108 for channel in channels: 

109 ret |= db.upload_label(prod_db_label_builder(var, "pedestal", gain=gain, channel=int(channel))) 

110 

111 # Pedestal per gain 

112 for var_gain in pedestal_variables_per_gain: 

113 for gain in gains: 

114 ret |= db.upload_label(prod_db_label_builder(var_gain, "pedestal", gain=gain)) 

115 

116 # Pulse per gain, channel 

117 for var in pulse_variables: 

118 for gain in gains: 

119 for channel in channels: 

120 ret |= db.upload_label(prod_db_label_builder(var, "pulse", gain=gain, channel=int(channel))) 

121 

122 # Pulse per gain, channel, amp 

123 for var in pulse_variables_per_amp: 

124 for gain in gains: 

125 for channel in channels: 

126 for amp, attn in zip(amps[gain], attns[gain]): 

127 ret |= db.upload_label( 

128 prod_db_label_builder(var, "pulse", gain=gain, channel=int(channel), amp=amp, attn=attn) 

129 ) 

130 

131 # Pulse gain ratio special case 

132 for channel in channels: 

133 ret |= db.upload_label( 

134 prod_db_label_builder(pgr, "pulse", gain=None, channel=int(channel), amp=pgr_amp, attn=pgr_attn) 

135 ) 

136 

137 # Per link 

138 for var in per_link_variables: 

139 for link in links: 

140 ret |= db.upload_label(prod_db_label_builder(var, "qc", link=link)) 

141 

142 # Per board 

143 for var in per_board_variables: 

144 ret |= db.upload_label(prod_db_label_builder(var, "qc")) 

145 

146 # Per ADC 

147 for var in per_adc_variables: 

148 for adc in adcs: 

149 ret |= db.upload_label(prod_db_label_builder(var, "qc", adc=adc)) 

150 

151 return ret 

152 

153 

154def upload_thresholds( 

155 db: ProductionTestDB, 

156 tag: Optional[str] = None, 

157 thresholds_file: Path = Path("polars_analysis/analysis/qc_thresholds.csv"), 

158): 

159 """ 

160 Upload the cuts from the csv file to the production db. 

161 If the variable (label) doesn't exist in the prod db yet, it will be inserted. 

162 """ 

163 

164 ct = cut_thresholds.CutThresholds(thresholds_file) 

165 df_cuts = ct.explode_cuts() 

166 

167 known_vars = db.get_valid_var_labels() 

168 

169 for row in df_cuts.iter_rows(named=True): 

170 var: Optional[str] = row.get("name") 

171 amp: Optional[float] = row.get("awg_amp") 

172 attn: Optional[float] = row.get("att_val") 

173 link: Optional[int] = row.get("link") 

174 adc: Optional[int] = row.get("adc") 

175 board_type: Literal["EM", "HEC"] = row["board_type"] 

176 meas_type: Optional[QC_MeasType] = row.get("meas_type") 

177 gain: Literal["lo", "hi"] = row["gain"] 

178 channel_temp: Optional[str] = row.get("channel") 

179 channel: Optional[int] = None 

180 if channel_temp is not None: 

181 channel = int(channel_temp) 

182 

183 requirement = row.get("requirement") 

184 max_requirement = row.get("max_requirement") 

185 min_requirement = row.get("min_requirement") 

186 

187 if var is None: 

188 log.error("No variable name found in threshold row, continuing...") 

189 continue 

190 

191 if meas_type is None: 

192 log.error("No meas_type found in threshold row, continuing...") 

193 continue 

194 

195 n_reqs = int(requirement is not None) + int(max_requirement is not None) + int(min_requirement is not None) 

196 if n_reqs == 0: 

197 log.warning(f"{var=} has no set requirement, skipping") 

198 continue 

199 

200 label = prod_db_label_builder( 

201 var, meas_type, gain=gain, channel=channel, amp=amp, attn=attn, link=link, adc=adc 

202 ) 

203 

204 if label not in known_vars: 

205 log.info(f"{label} missing in db, uploading") 

206 if not db.upload_label(label): 

207 log.warning(f"{label=} failed to upload, skipping") 

208 continue 

209 

210 if requirement is None and max_requirement is None and min_requirement is None: 

211 log.error(f"Must have threshold defined: {label=}") 

212 continue 

213 

214 test_op = None 

215 if requirement is not None: 

216 test_op = "==" 

217 if db.upload_test_threshold(label, test_op, requirement, board_type, tag): 

218 log.info(f"Uploaded {label=}, {tag=}, {test_op=}, {requirement=}, {board_type=}") 

219 else: 

220 log.warning(f"Failed to upload threshold: {label=}, {test_op=}, {requirement=}, {board_type=}") 

221 if max_requirement is not None: 

222 test_op = "<" 

223 

224 if db.upload_test_threshold(label, test_op, max_requirement, board_type, tag): 

225 log.info(f"Uploaded {label=}, {tag=}, {test_op=}, {max_requirement=}, {board_type=}") 

226 else: 

227 log.warning(f"Failed to upload threshold: {label=}, {test_op=}, {max_requirement=}, {board_type=}") 

228 if min_requirement is not None: 

229 test_op = ">" 

230 

231 if db.upload_test_threshold(label, test_op, min_requirement, board_type, tag): 

232 log.info(f"Uploaded {label=}, {tag=}, {test_op=}, {min_requirement=}, {board_type=}") 

233 else: 

234 log.warning(f"Failed to upload threshold: {label=}, {test_op=}, {min_requirement=}, {board_type=}") 

235 

236 if test_op is None: 

237 log.error(f"Must have a test_op defined: {label=}") 

238 continue 

239 

240 

241def upload_derived_data( 

242 df: pl.DataFrame, 

243 db: ProductionTestDB, 

244 df_var_name: Dict[str, str], 

245 meas_type: QC_MeasType, 

246) -> bool: 

247 """ 

248 Function to automate uploading values from derived dataframe. 

249 

250 df_var_name: dictionary of variable name in dataframe to name for database 

251 """ 

252 

253 # retrieve githash 

254 githash = sp.check_output(["git", "rev-parse", "HEAD"]).decode("ascii").strip() 

255 

256 run_numbers = df.select(pl.col("run_number")).unique().to_series().to_list() 

257 if len(run_numbers) > 1: 

258 log.error(f"Multiple run numbers found {run_numbers}") 

259 return False 

260 run_number = int(run_numbers[0]) 

261 

262 board_ids = df.select(pl.col("board_id")).unique().to_series().to_list() 

263 if len(board_ids) > 1: 

264 log.error(f"Multiple boards found in derived data {board_ids}") 

265 return False 

266 board_id = board_ids[0] 

267 

268 board_type = None 

269 if "board_variant" in df.columns: 

270 board_types = df.select(pl.col("board_variant")).unique().to_series().to_list() 

271 if len(board_types) > 1: 

272 log.error("Multiple board types found {board_types}") 

273 board_type = board_types[0] 

274 else: 

275 # could retrieve from some other place 

276 log.warning("Board type not provided in derived dataframe") 

277 # board_type = "EM" # for testing 

278 # return False 

279 

280 board_version = None 

281 if "board_version" in df.columns: 

282 board_versions = df.select(pl.col("board_version")).unique().to_series().to_list() 

283 if len(board_versions) > 1: 

284 log.error("Multiple board types found {board_version}") 

285 board_version = board_versions[0] 

286 else: 

287 log.warning("Board version not provided in derived dataframe") 

288 

289 # Filter pulse runs to largest amp 

290 if meas_type == "pulse": 

291 df = df.filter( 

292 ( 

293 (pl.col("gain") == "hi") 

294 & (pl.col("awg_amp") == constants.HI_GAIN_AWG_AMP_VAL) 

295 & (pl.col("att_val") == constants.HI_GAIN_ATTN_VAL) 

296 ) 

297 | ( 

298 (pl.col("gain") == "lo") 

299 & (pl.col("awg_amp") == constants.LO_GAIN_AWG_AMP_VAL) 

300 & (pl.col("att_val") == constants.LO_GAIN_ATTN_VAL) 

301 ) 

302 ) 

303 

304 variables_without_amp = ["simple_INL", "risetime_mean"] 

305 variables_without_gain = ["gain_ratio"] 

306 

307 # Data to upload 

308 test_data: List[Tuple[Dict[str, Any], float]] = [] 

309 for var, var_label in df_var_name.items(): 

310 for row in df.iter_rows(named=True): 

311 gain = row.get("gain") 

312 channel = row.get("channel") 

313 amp = row.get("awg_amp") 

314 attn = row.get("att_val") 

315 # link = row.get("link") 

316 # adc = row.get("adc") 

317 value = row[var] 

318 if value is None: 

319 continue 

320 

321 if var_label in variables_without_amp: 

322 amp = None 

323 attn = None 

324 if var_label in variables_without_gain: 

325 gain = None 

326 

327 test_data.append( 

328 ( 

329 prod_db_label_builder( 

330 var_label, 

331 meas_type=meas_type, 

332 gain=gain, 

333 amp=amp, 

334 attn=attn, 

335 channel=channel, 

336 # link=link, 

337 # adc=adc 

338 ), 

339 value, 

340 ) 

341 ) 

342 

343 ret = db.upload_test_data_multi(board_id, board_type, board_version, run_number, test_data, githash) 

344 

345 return ret 

346 

347 

348def upload_json_data( 

349 data: QC_Data, 

350 db: ProductionTestDB, 

351) -> bool: 

352 """ 

353 Function to automate uploading values from json dictionaries. 

354 """ 

355 

356 # retrieve githash 

357 githash = sp.check_output(["git", "rev-parse", "HEAD"]).decode("ascii").strip() 

358 

359 # Data to upload 

360 test_data: List[Tuple[Dict[str, Any], float]] = [] 

361 for d in data.qc_payload: 

362 test_data.append( 

363 ( 

364 prod_db_label_builder( 

365 d.variable, 

366 meas_type=data.meas_type, 

367 gain=d.gain, 

368 channel=d.channel, 

369 link=d.link, 

370 adc=d.adc, 

371 ), 

372 d.value, 

373 ) 

374 ) 

375 

376 ret = db.upload_test_data_multi( 

377 data.board_id, 

378 data.board_type, 

379 data.board_version, 

380 data.run_number, 

381 test_data, 

382 githash, 

383 ) 

384 

385 if ret: 

386 log.info(f"Test results in derived dataframe succesfully uploaded to db: {db}") 

387 return True 

388 else: 

389 log.error(f"Test results in derived dataframe failed to upload to db: {db}") 

390 return False