Coverage for polars_analysis / db_interface / prod_db_data_uploader.py: 58%
173 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import logging
2import subprocess as sp
3from pathlib import Path
4from typing import Any, Dict, List, Literal, Optional, Tuple
6import polars as pl
8from polars_analysis.analysis import constants, cut_thresholds
9from polars_analysis.db_interface.production_test_db import ProductionTestDB
10from polars_analysis.models import Gain, Label, QC_Data, QC_MeasType
12log = logging.getLogger(__name__)
15def prod_db_label_builder(
16 variable: str,
17 meas_type: QC_MeasType,
18 gain: Optional[Gain] = None,
19 channel: Optional[int] = None,
20 amp: Optional[float] = None,
21 attn: Optional[float] = None,
22 link: Optional[int] = None,
23 adc: Optional[int] = None,
24) -> Dict[str, Any]:
25 """
26 Produce dictionary label for production db variable
27 example:
28 {
29 variable: mean,
30 meas_type: pedestal,
31 gain: lo,
32 channel: 114
33 }
34 """
36 return Label(
37 variable=variable,
38 meas_type=meas_type,
39 gain=gain,
40 channel=channel,
41 awg_amp=amp,
42 att_val=attn,
43 link=link,
44 adc=adc,
45 ).model_dump(exclude_none=True)
48def upload_variables(db: ProductionTestDB) -> bool:
49 """
50 Probably not needed since upload_thresholds will upload missing variables, but keeping for now.
51 """
53 gains: List[Gain] = ["hi", "lo"]
54 amps: Dict[str, list[float]] = {"lo": [constants.LO_GAIN_AWG_AMP_VAL], "hi": [constants.HI_GAIN_AWG_AMP_VAL]}
55 attns: Dict[str, list[float]] = {
56 "lo": [constants.LO_GAIN_ATTN_VAL],
57 "hi": [constants.HI_GAIN_ATTN_VAL],
58 } # must have same length of entries as amps
59 channels = range(128)
60 adcs = range(32)
61 links = range(22) # 22 data lpGBT uplinks
63 pedestal_variables = [
64 "std",
65 "mean",
66 "std_50ohm",
67 "mean_50ohm",
68 "std_hps2",
69 "mean_hps2",
70 "ber", # BER is done via ADC patterns, so for each channel
71 "ber_scan", # pass/fail if scan around default is good
72 ]
74 pedestal_variables_per_gain = ["coherent_noise_0_128"]
76 pulse_variables = ["risetime_mean", "simple_INL"]
77 pgr = "gain_ratio" # is special since it's at the same amp for hi and lo
78 pgr_amp = constants.HI_GAIN_AWG_AMP_VAL
79 pgr_attn = constants.HI_GAIN_ATTN_VAL
81 pulse_variables_per_amp = [
82 "energy_mean",
83 "energy_std",
84 "time_mean",
85 "time_std",
86 "zero_crossing_time",
87 "ref_corr",
88 "ref_rmse",
89 ]
91 per_link_variables = [
92 "optical_power",
93 "locked", # pass/fail
94 ]
96 per_board_variables = [
97 "power_draw_init",
98 "power_draw_configured",
99 ]
101 per_adc_variables = ["valid_clockscan", "valid_calibration"]
103 ret = True
105 # Pedestal per gain, channel
106 for var in pedestal_variables:
107 for gain in gains:
108 for channel in channels:
109 ret |= db.upload_label(prod_db_label_builder(var, "pedestal", gain=gain, channel=int(channel)))
111 # Pedestal per gain
112 for var_gain in pedestal_variables_per_gain:
113 for gain in gains:
114 ret |= db.upload_label(prod_db_label_builder(var_gain, "pedestal", gain=gain))
116 # Pulse per gain, channel
117 for var in pulse_variables:
118 for gain in gains:
119 for channel in channels:
120 ret |= db.upload_label(prod_db_label_builder(var, "pulse", gain=gain, channel=int(channel)))
122 # Pulse per gain, channel, amp
123 for var in pulse_variables_per_amp:
124 for gain in gains:
125 for channel in channels:
126 for amp, attn in zip(amps[gain], attns[gain]):
127 ret |= db.upload_label(
128 prod_db_label_builder(var, "pulse", gain=gain, channel=int(channel), amp=amp, attn=attn)
129 )
131 # Pulse gain ratio special case
132 for channel in channels:
133 ret |= db.upload_label(
134 prod_db_label_builder(pgr, "pulse", gain=None, channel=int(channel), amp=pgr_amp, attn=pgr_attn)
135 )
137 # Per link
138 for var in per_link_variables:
139 for link in links:
140 ret |= db.upload_label(prod_db_label_builder(var, "qc", link=link))
142 # Per board
143 for var in per_board_variables:
144 ret |= db.upload_label(prod_db_label_builder(var, "qc"))
146 # Per ADC
147 for var in per_adc_variables:
148 for adc in adcs:
149 ret |= db.upload_label(prod_db_label_builder(var, "qc", adc=adc))
151 return ret
154def upload_thresholds(
155 db: ProductionTestDB,
156 tag: Optional[str] = None,
157 thresholds_file: Path = Path("polars_analysis/analysis/qc_thresholds.csv"),
158):
159 """
160 Upload the cuts from the csv file to the production db.
161 If the variable (label) doesn't exist in the prod db yet, it will be inserted.
162 """
164 ct = cut_thresholds.CutThresholds(thresholds_file)
165 df_cuts = ct.explode_cuts()
167 known_vars = db.get_valid_var_labels()
169 for row in df_cuts.iter_rows(named=True):
170 var: Optional[str] = row.get("name")
171 amp: Optional[float] = row.get("awg_amp")
172 attn: Optional[float] = row.get("att_val")
173 link: Optional[int] = row.get("link")
174 adc: Optional[int] = row.get("adc")
175 board_type: Literal["EM", "HEC"] = row["board_type"]
176 meas_type: Optional[QC_MeasType] = row.get("meas_type")
177 gain: Literal["lo", "hi"] = row["gain"]
178 channel_temp: Optional[str] = row.get("channel")
179 channel: Optional[int] = None
180 if channel_temp is not None:
181 channel = int(channel_temp)
183 requirement = row.get("requirement")
184 max_requirement = row.get("max_requirement")
185 min_requirement = row.get("min_requirement")
187 if var is None:
188 log.error("No variable name found in threshold row, continuing...")
189 continue
191 if meas_type is None:
192 log.error("No meas_type found in threshold row, continuing...")
193 continue
195 n_reqs = int(requirement is not None) + int(max_requirement is not None) + int(min_requirement is not None)
196 if n_reqs == 0:
197 log.warning(f"{var=} has no set requirement, skipping")
198 continue
200 label = prod_db_label_builder(
201 var, meas_type, gain=gain, channel=channel, amp=amp, attn=attn, link=link, adc=adc
202 )
204 if label not in known_vars:
205 log.info(f"{label} missing in db, uploading")
206 if not db.upload_label(label):
207 log.warning(f"{label=} failed to upload, skipping")
208 continue
210 if requirement is None and max_requirement is None and min_requirement is None:
211 log.error(f"Must have threshold defined: {label=}")
212 continue
214 test_op = None
215 if requirement is not None:
216 test_op = "=="
217 if db.upload_test_threshold(label, test_op, requirement, board_type, tag):
218 log.info(f"Uploaded {label=}, {tag=}, {test_op=}, {requirement=}, {board_type=}")
219 else:
220 log.warning(f"Failed to upload threshold: {label=}, {test_op=}, {requirement=}, {board_type=}")
221 if max_requirement is not None:
222 test_op = "<"
224 if db.upload_test_threshold(label, test_op, max_requirement, board_type, tag):
225 log.info(f"Uploaded {label=}, {tag=}, {test_op=}, {max_requirement=}, {board_type=}")
226 else:
227 log.warning(f"Failed to upload threshold: {label=}, {test_op=}, {max_requirement=}, {board_type=}")
228 if min_requirement is not None:
229 test_op = ">"
231 if db.upload_test_threshold(label, test_op, min_requirement, board_type, tag):
232 log.info(f"Uploaded {label=}, {tag=}, {test_op=}, {min_requirement=}, {board_type=}")
233 else:
234 log.warning(f"Failed to upload threshold: {label=}, {test_op=}, {min_requirement=}, {board_type=}")
236 if test_op is None:
237 log.error(f"Must have a test_op defined: {label=}")
238 continue
241def upload_derived_data(
242 df: pl.DataFrame,
243 db: ProductionTestDB,
244 df_var_name: Dict[str, str],
245 meas_type: QC_MeasType,
246) -> bool:
247 """
248 Function to automate uploading values from derived dataframe.
250 df_var_name: dictionary of variable name in dataframe to name for database
251 """
253 # retrieve githash
254 githash = sp.check_output(["git", "rev-parse", "HEAD"]).decode("ascii").strip()
256 run_numbers = df.select(pl.col("run_number")).unique().to_series().to_list()
257 if len(run_numbers) > 1:
258 log.error(f"Multiple run numbers found {run_numbers}")
259 return False
260 run_number = int(run_numbers[0])
262 board_ids = df.select(pl.col("board_id")).unique().to_series().to_list()
263 if len(board_ids) > 1:
264 log.error(f"Multiple boards found in derived data {board_ids}")
265 return False
266 board_id = board_ids[0]
268 board_type = None
269 if "board_variant" in df.columns:
270 board_types = df.select(pl.col("board_variant")).unique().to_series().to_list()
271 if len(board_types) > 1:
272 log.error("Multiple board types found {board_types}")
273 board_type = board_types[0]
274 else:
275 # could retrieve from some other place
276 log.warning("Board type not provided in derived dataframe")
277 # board_type = "EM" # for testing
278 # return False
280 board_version = None
281 if "board_version" in df.columns:
282 board_versions = df.select(pl.col("board_version")).unique().to_series().to_list()
283 if len(board_versions) > 1:
284 log.error("Multiple board types found {board_version}")
285 board_version = board_versions[0]
286 else:
287 log.warning("Board version not provided in derived dataframe")
289 # Filter pulse runs to largest amp
290 if meas_type == "pulse":
291 df = df.filter(
292 (
293 (pl.col("gain") == "hi")
294 & (pl.col("awg_amp") == constants.HI_GAIN_AWG_AMP_VAL)
295 & (pl.col("att_val") == constants.HI_GAIN_ATTN_VAL)
296 )
297 | (
298 (pl.col("gain") == "lo")
299 & (pl.col("awg_amp") == constants.LO_GAIN_AWG_AMP_VAL)
300 & (pl.col("att_val") == constants.LO_GAIN_ATTN_VAL)
301 )
302 )
304 variables_without_amp = ["simple_INL", "risetime_mean"]
305 variables_without_gain = ["gain_ratio"]
307 # Data to upload
308 test_data: List[Tuple[Dict[str, Any], float]] = []
309 for var, var_label in df_var_name.items():
310 for row in df.iter_rows(named=True):
311 gain = row.get("gain")
312 channel = row.get("channel")
313 amp = row.get("awg_amp")
314 attn = row.get("att_val")
315 # link = row.get("link")
316 # adc = row.get("adc")
317 value = row[var]
318 if value is None:
319 continue
321 if var_label in variables_without_amp:
322 amp = None
323 attn = None
324 if var_label in variables_without_gain:
325 gain = None
327 test_data.append(
328 (
329 prod_db_label_builder(
330 var_label,
331 meas_type=meas_type,
332 gain=gain,
333 amp=amp,
334 attn=attn,
335 channel=channel,
336 # link=link,
337 # adc=adc
338 ),
339 value,
340 )
341 )
343 ret = db.upload_test_data_multi(board_id, board_type, board_version, run_number, test_data, githash)
345 return ret
348def upload_json_data(
349 data: QC_Data,
350 db: ProductionTestDB,
351) -> bool:
352 """
353 Function to automate uploading values from json dictionaries.
354 """
356 # retrieve githash
357 githash = sp.check_output(["git", "rev-parse", "HEAD"]).decode("ascii").strip()
359 # Data to upload
360 test_data: List[Tuple[Dict[str, Any], float]] = []
361 for d in data.qc_payload:
362 test_data.append(
363 (
364 prod_db_label_builder(
365 d.variable,
366 meas_type=data.meas_type,
367 gain=d.gain,
368 channel=d.channel,
369 link=d.link,
370 adc=d.adc,
371 ),
372 d.value,
373 )
374 )
376 ret = db.upload_test_data_multi(
377 data.board_id,
378 data.board_type,
379 data.board_version,
380 data.run_number,
381 test_data,
382 githash,
383 )
385 if ret:
386 log.info(f"Test results in derived dataframe succesfully uploaded to db: {db}")
387 return True
388 else:
389 log.error(f"Test results in derived dataframe failed to upload to db: {db}")
390 return False