Coverage for polars_analysis / db_interface / production_test_db.py: 65%
314 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 13:37 -0400
1import json
2import logging
3from contextlib import contextmanager
4from datetime import datetime
5from typing import Any, Dict, List, Literal, NamedTuple, Optional, Tuple, Union
7import IPython
8import polars as pl
9import psycopg
10from psycopg import sql
11from psycopg.conninfo import conninfo_to_dict, make_conninfo
12from psycopg.rows import namedtuple_row
13from psycopg.types.json import Jsonb
15# Instantiate logger
16if __name__ == "__main__":
17 log = logging.getLogger(__name__)
18else:
19 # Logger used by uvicorn
20 log = logging.getLogger("uvicorn.error")
22"""
23Interface class to production testing database.
24Stores thresholds and data per item.
26Tables:
27thresholds -- cut value and operator to check
28threshold_labels -- variables to test and compare to thresholds
29boards -- known boards and types
30derived -- calculated values per label and board
32Views:
33latest_thresholds -- thresholds with most recent creation date
34latest_thresholds_per_tag -- thresholds with most recent creation date per tag
35latest_derived -- derived values with most recent creation date
36"""
39def eval_test(value: float, operator: str, threshold: float) -> Tuple[bool, Optional[str]]:
40 """
41 Evaluate a single test instance against a value
43 Returns:
44 tuple: (passed, error_message) where error_message is not None iff test evaluation errored
45 """
46 try:
47 r = bool(eval(f"{value} {operator} {threshold}"))
48 except Exception as e:
49 error_msg = f"Evaluation error: {e}"
50 log.error(error_msg)
51 return False, error_msg
52 return r, None
55def no_null_json_dumps(data: dict) -> str:
56 """
57 Custom json dumps function to ignore keys with None values
58 """
59 return json.dumps({k: v for k, v in data.items() if v is not None})
62class ProductionTestDB:
63 def __init__(self, postgres_uri: str):
64 """
65 Class to interface with the database storing production test data and thresholds to compare against.
67 args:
68 postgres_uri: URI for connecting to PostgreSQL database
69 """
71 self._postgres_uri: str = postgres_uri
73 self._conn: Optional[psycopg.Connection] = None
74 self._curs: Optional[psycopg.Cursor[NamedTuple]] = None
76 # Useful schema strings
77 self.schema = {
78 "thresholds_table_name": "thresholds",
79 "runs_table_name": "runs",
80 "boards_table_name": "boards",
81 "data_table_name": "derived",
82 "labels_table_name": "threshold_labels",
83 "latest_thresholds_view_name": "latest_thresholds",
84 "latest_thresholds_per_tag_view_name": "latest_thresholds_per_tag",
85 "latest_data_view_name": "latest_derived",
86 }
88 def __del__(self):
89 """
90 Clean up connections
91 """
92 if self._conn:
93 self._conn.close()
95 def __str__(self):
96 conninfo = conninfo_to_dict(self._postgres_uri)
97 _ = conninfo.pop("password", None)
98 return make_conninfo("", **conninfo)
100 @contextmanager
101 def _connection(self):
102 """
103 Open connection to postgres
104 """
105 if not self._conn:
106 self._conn = psycopg.connect(self._postgres_uri)
108 yield self._conn
110 @contextmanager
111 def _cursor(self): # -> Union[sqlite.Cursor, postgres.?]
112 """
113 Return cursor, create if needed, for database
114 """
115 if not self._curs:
116 with self._connection() as conn:
117 self._curs = conn.cursor(row_factory=namedtuple_row)
118 yield self._curs
120 def _execute(self, cmd: sql.SQL | sql.Composed):
121 """
122 Run SQL command without returning anything
123 """
125 log.debug(f"Executing SQL command: {cmd.as_string()}")
126 with psycopg.connect(self._postgres_uri) as conn:
127 with conn.cursor() as cursor:
128 cursor.execute(cmd)
130 def _query(self, query: sql.SQL | sql.Composed) -> List[NamedTuple]:
131 """
132 Run and fetch results of an SQL query (really any sql command)
133 """
134 log.debug(f"Executing SQL query: {query.as_string()}")
135 with psycopg.connect(self._postgres_uri) as conn:
136 with conn.cursor(row_factory=namedtuple_row) as cursor:
137 cursor.execute(query)
138 result = cursor.fetchall()
140 return result
142 def _insert(self, table_name: str, data: dict, on_conflict_do_nothing=False) -> bool:
143 """
144 Insert one data element into a table
146 args:
147 table_name: name of the table to insert data into
148 data: dictionary of column_name:value
149 """
150 with psycopg.connect(self._postgres_uri) as conn:
151 with conn.cursor() as cursor:
152 # Verify that the table exists
153 table_exists = False
155 cmd = sql.SQL("SELECT table_name FROM information_schema.tables WHERE table_name={}").format(table_name)
157 cursor.execute(cmd)
159 table_exists = len(cursor.fetchall()) > 0
161 if not table_exists:
162 log.error(f"Table {table_name} does not exist in database")
163 return False
165 # Extract column names and values
166 columns = data.keys()
168 # Create SQL query
169 query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
170 sql.Identifier(table_name),
171 sql.SQL(", ").join(map(sql.Identifier, columns)),
172 sql.SQL(", ").join(map(sql.Placeholder, columns)),
173 )
175 if on_conflict_do_nothing:
176 query = sql.SQL(" ").join([query, sql.SQL("ON CONFLICT DO NOTHING")])
178 # Execute the query
179 cursor.execute(query, data)
180 conn.commit()
182 log.debug("Insert successful")
183 return True
185 def _insert_many(self, table_name: str, data: List[dict], on_conflict_do_nothing=False) -> bool:
186 """
187 Insert many data elements into a table
189 args:
190 table_name: name of the table to insert data into
191 data: list of dictionaries of column_name:value
192 """
193 if len(data) == 0:
194 log.error("Received empty list of data in _insert_many.")
195 return False
196 with psycopg.connect(self._postgres_uri) as conn:
197 with conn.cursor() as cursor:
198 # Verify that the table exists
199 table_exists = False
201 cmd = sql.SQL("SELECT table_name FROM information_schema.tables WHERE table_name={}").format(table_name)
203 cursor.execute(cmd)
205 table_exists = len(cursor.fetchall()) > 0
207 if not table_exists:
208 log.error(f"Table {table_name} does not exist in database")
209 return False
211 # Extract column names and values
212 columns = data[0].keys()
214 # Create SQL query
215 query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
216 sql.Identifier(table_name),
217 sql.SQL(", ").join(map(sql.Identifier, columns)),
218 sql.SQL(", ").join(map(sql.Placeholder, columns)),
219 )
221 if on_conflict_do_nothing:
222 query = sql.SQL(" ").join([query, sql.SQL("ON CONFLICT DO NOTHING")])
224 # Execute the query
225 cursor.executemany(query, data)
226 conn.commit()
228 log.debug("Insert successful")
229 return True
231 def _insert_threshold_or_derived(self, table_name: str, data: dict) -> bool:
232 """
233 Insert one threshold or derived value into a table
234 Handles insertion of labels and retrieval of label_id in the inset query
236 args:
237 table_name: name of the table to insert data into
238 data: dictionary of column_name:value
239 """
240 with self._connection() as con:
241 with con.cursor() as cursor:
242 # Verify that the table exists
243 table_exists = False
245 cmd = sql.SQL("SELECT table_name FROM information_schema.tables WHERE table_name={}").format(table_name)
247 cursor.execute(cmd)
249 table_exists = len(cursor.fetchall()) > 0
251 if not table_exists:
252 log.error(f"Table {table_name} does not exist in database")
253 return False
255 # Extract column names and values
256 columns = [k for k in data.keys() if k != "label"]
258 # Create SQL query
259 query = sql.SQL(
260 """
261 WITH label_insert AS (
262 INSERT INTO {labels_table_name} (label)
263 VALUES ({label})
264 ON CONFLICT (label) DO NOTHING
265 RETURNING id
266 ),
267 label_select AS (
268 SELECT id FROM label_insert
269 UNION ALL
270 SELECT id FROM {labels_table_name} WHERE label = {label}
271 )
272 INSERT INTO {table} (label_id, {columns})
273 SELECT l.id, {names}
274 FROM label_select l
275 LIMIT 1
276 """
277 ).format(
278 table=sql.Identifier(table_name),
279 columns=sql.SQL(", ").join(map(sql.Identifier, columns)),
280 names=sql.SQL(", ").join(map(sql.Placeholder, columns)),
281 labels_table_name=sql.Identifier(self.schema["labels_table_name"]),
282 label=sql.Placeholder("label"),
283 )
285 # Execute the query
286 cursor.execute(query, data)
287 con.commit()
289 log.debug("Insert successful")
290 return True
292 """
293 Database-type agnostic functions for interface
294 """
296 def upload_label(self, var_label: Dict[str, Any]) -> bool:
297 try:
298 return self._insert(
299 self.schema["labels_table_name"],
300 {"label": Jsonb(var_label, dumps=no_null_json_dumps)},
301 on_conflict_do_nothing=True,
302 )
303 except psycopg.errors.UniqueViolation:
304 if self._conn is not None:
305 self._conn.rollback()
306 log.info(f"{var_label} already exists in the db, skipping")
307 return False
309 def upload_labels(self, var_labels: List[Dict[str, Any]]) -> bool:
310 jsonb_labels = [{"label": Jsonb(var_label, dumps=no_null_json_dumps)} for var_label in var_labels]
311 try:
312 return self._insert_many(self.schema["labels_table_name"], jsonb_labels, on_conflict_do_nothing=True)
313 except psycopg.errors.UniqueViolation:
314 if self._conn is not None:
315 self._conn.rollback()
316 log.info(f"{var_labels} already exists in the db, skipping")
317 return False
319 def get_valid_var_labels(self) -> List[Dict[str, Any]]:
320 """
321 Returns list of all labels in table, along with their label id
322 """
324 query = sql.SQL("SELECT label FROM {}").format(sql.Identifier(self.schema["labels_table_name"]))
325 v = self._query(query)
326 return [i[0] for i in v]
328 def get_label_id(self, label: Dict[str, Any]) -> int:
329 query = sql.SQL("SELECT id FROM {table} WHERE label={label}").format(
330 table=sql.Identifier(self.schema["labels_table_name"]),
331 label=Jsonb(label, dumps=no_null_json_dumps),
332 )
333 return self._query(query)[0][0]
335 def upload_board(self, board_id: str, board_type: Literal["EM", "HEC"], board_version: Optional[str]) -> bool:
336 if board_type not in ["EM", "HEC"]:
337 log.error(f"{board_type} is not a known type: {['EM', 'HEC']}")
338 return False
340 return self._insert(
341 self.schema["boards_table_name"],
342 {"board_id": board_id, "board_type": board_type, "board_version": board_version},
343 )
345 def get_board_ids(self) -> List[str]:
346 query = sql.SQL("SELECT board_id FROM {}").format(sql.Identifier(self.schema["boards_table_name"]))
347 b = self._query(query)
348 return [i[0] for i in b]
350 def get_board_type(self, board_id: str) -> Union[Literal["EM", "HEC"], None]:
351 if board_id in self.get_board_ids():
352 query = sql.SQL("SELECT board_type FROM {table} WHERE board_id={board_id}").format(
353 table=sql.Identifier(self.schema["boards_table_name"]),
354 board_id=board_id,
355 )
356 q = self._query(query)
357 return q[0][0]
358 else:
359 log.warning(f"Board ID {board_id} doesn't exist")
360 return None
362 def get_board_summary(self) -> pl.DataFrame:
363 """
364 Fetches QC results for every board in QC database and returns as a pl.DataFrame
365 """
366 query = sql.SQL("""
367 SELECT
368 board_id,
369 board_version,
370 board_type,
371 qc_check_tag,
372 qc_passed,
373 qc_check_timestamp
374 FROM boards
375 ORDER BY qc_check_timestamp DESC
376 """)
377 result = self._query(query)
378 return pl.DataFrame(result)
380 def get_board_qc_summary_runs(self, board_id: str) -> pl.DataFrame:
381 """
382 Fetches the list of runs used for the QC testing of the given board ID
383 """
384 query = sql.SQL("""
385 SELECT
386 run_number,
387 l.label->'meas_type' AS meas_type,
388 array_agg(DISTINCT l.label->'channel') AS channels
389 FROM latest_derived d
390 JOIN threshold_labels l
391 ON d.label_id = l.id
392 WHERE board_id = {board_id}
393 AND l.label->>'meas_type' != 'qc'
394 AND l.label->>'channel' IS NOT NULL
395 GROUP BY run_number, meas_type
396 ORDER BY run_number
397 """).format(board_id=board_id)
398 result = self._query(query)
399 return pl.DataFrame(result)
401 def upload_test_threshold(
402 self,
403 var_label: Dict[str, Any],
404 test_op: str,
405 threshold: float,
406 board_type: Literal["EM", "HEC"],
407 tag: Optional[str] = None,
408 ) -> bool:
409 """
410 Add a new test to the database
412 Args:
413 var_label: string name for the variable
414 test_op: string representation of test to perform, eg var "<" threshold
415 threshold: value to test against
417 Returns:
418 status
419 """
421 # Require correct board type
422 if board_type not in ["EM", "HEC"]:
423 log.error(f"{board_type} is not a known type: {['EM', 'HEC']}")
424 return False
426 # Insert label (or do nothing) and retrieve label_id, then insert threshold
427 return self._insert_threshold_or_derived(
428 self.schema["thresholds_table_name"],
429 {
430 "label": Jsonb(var_label, dumps=no_null_json_dumps),
431 "operator": test_op,
432 "threshold": threshold,
433 "board_type": board_type,
434 "tag": tag,
435 },
436 )
438 def get_thresholds(self, board_type: Literal["EM", "HEC"], tag: Optional[str] = None) -> List[dict]:
439 """
440 Return list of dictionaries of threshold information based on the most recent entry.
441 """
443 if board_type not in ["EM", "HEC"]:
444 log.error(f"{board_type} is not a known type: {['EM', 'HEC']}")
445 return []
447 q = self._query(
448 sql.SQL("""
449 SELECT
450 l.label,
451 t.operator,
452 t.threshold,
453 t.board_type,
454 t.label_id,
455 t.tag
456 FROM {thresholds} t
457 JOIN {labels} l ON l.id=t.label_id
458 WHERE t.board_type={board_type}
459 """).format(
460 thresholds=(
461 sql.Identifier(self.schema["latest_thresholds_view_name"])
462 if tag is None
463 else sql.Identifier(self.schema["latest_thresholds_per_tag_view_name"])
464 ),
465 labels=sql.Identifier(self.schema["labels_table_name"]),
466 board_type=board_type,
467 )
468 + (sql.SQL("\nAND t.tag = {tag}").format(tag=tag) if tag is not None else sql.SQL(""))
469 )
470 out = [row._asdict() for row in q]
472 return out
474 def print_thresholds(self, board_type: Literal["EM", "HEC"]):
475 print(json.dumps(self.get_thresholds(board_type), indent=4))
477 def get_threshold_tags(self) -> List[str]:
478 """
479 Return list of threshold tags, most recent first
480 """
482 q = self._query(
483 sql.SQL("""
484 SELECT
485 tag,
486 MAX(created_at),
487 MIN(created_at)
488 FROM {thresholds}
489 GROUP BY tag
490 ORDER BY MAX(created_at) DESC
491 """).format(thresholds=sql.Identifier(self.schema["latest_thresholds_per_tag_view_name"]))
492 )
494 df = pl.DataFrame(
495 {
496 "tag": [i[0] for i in q],
497 "first_created": [i[2] for i in q],
498 "last_edited": [i[1] for i in q],
499 }
500 )
502 log.info(df)
504 return df["tag"].to_list()
506 def upload_test_data(
507 self,
508 board_id: str,
509 board_type: Literal["EM", "HEC"],
510 board_version: str,
511 run_number: int,
512 var_label: Dict[str, Any],
513 value: float,
514 githash: str,
515 ) -> bool:
516 """
517 Add new test result to database
518 """
520 # Check if board id exists, if not, create it
521 if board_id not in self.get_board_ids():
522 if self.upload_board(board_id, board_type, board_version):
523 log.info(f"New {board_id=} of type {board_type} inserted into database")
524 else:
525 log.error(f"Failed to add new board{board_id} to db")
526 return False
528 # Check if board is right type
529 stored_type = self.get_board_type(board_id)
530 if stored_type != board_type:
531 log.error(f"Board type for board {board_id} is {stored_type} not {board_type}")
532 return False
534 # Check if there are unknown variables
535 valid_vars = self.get_valid_var_labels()
536 if var_label not in valid_vars:
537 log.error(f"""Trying to insert unknown variable: {var_label}.\
538 Returning without inserting any data.""")
539 return False
541 return self._insert(
542 self.schema["data_table_name"],
543 {
544 "label_id": self.get_label_id(var_label),
545 "board_id": board_id,
546 "run_number": run_number,
547 "value": value,
548 "githash": githash,
549 },
550 )
552 def upload_test_data_multi(
553 self,
554 board_id: str,
555 board_type: Optional[Literal["EM", "HEC"]],
556 board_version: Optional[str],
557 run_number: int,
558 var_label_val: List[Tuple[Dict[str, Any], float]],
559 githash: str,
560 ) -> bool:
561 """
562 Add new test result to database
563 """
564 # Check if board id exists, if not, create it
565 if board_id not in self.get_board_ids():
566 if board_type is None:
567 log.error(f"Board {board_id} not found in DB and board type not provided")
568 return False
569 if self.upload_board(board_id, board_type, board_version):
570 log.info(f"New {board_id=} of type {board_type} inserted into database")
571 else:
572 log.error(f"Failed to add new board{board_id} to db")
573 return False
575 # Check if board is right type
576 stored_type = self.get_board_type(board_id)
577 if board_type is not None and stored_type != board_type:
578 log.error(f"Board type for board {board_id} is {stored_type} not {board_type}")
579 return False
581 # Loop through dictionary to insert data
582 # _insert_threshold_or_derived inserts label (or does nothing) and retrieve label_id, then inserts threshold
583 retval = True
584 for label, value in var_label_val:
585 log.debug(f"Trying to upload: {label}:{value}")
586 retval &= self._insert_threshold_or_derived(
587 self.schema["data_table_name"],
588 {
589 "label": Jsonb(label, dumps=no_null_json_dumps),
590 "board_id": board_id,
591 "run_number": run_number,
592 "value": value,
593 "githash": githash,
594 },
595 )
597 return retval
599 def get_test_data(self, board_id: int) -> list:
600 """
601 Returns all test data for board
603 Args:
604 board_id: the board
606 Returns:
607 dict:
608 list of all test data
609 """
611 # Grab all of the test data
612 q = self._query(
613 sql.SQL("""
614 SELECT
615 d.created_at,
616 l.label,
617 d.board_id,
618 b.board_type,
619 b.board_version,
620 d.run_number,
621 d.value,
622 d.githash,
623 r.username
624 FROM {derived} d
625 JOIN {labels} l ON l.id=d.label_id
626 JOIN {boards} b ON b.board_id=d.board_id
627 JOIN {runs} r ON r.run_number=d.run_number
628 WHERE d.board_id={board_id}""").format(
629 derived=sql.Identifier(self.schema["data_table_name"]),
630 labels=sql.Identifier(self.schema["labels_table_name"]),
631 boards=sql.Identifier(self.schema["boards_table_name"]),
632 runs=sql.Identifier(self.schema["runs_table_name"]),
633 board_id=board_id,
634 )
635 )
637 if len(q) == 0:
638 log.warning(f"Test data is empty for {board_id}")
639 return []
641 test_data = [row._asdict() for row in q]
643 return test_data
645 def get_test_results(
646 self, board_id: str, return_only_failures: bool = False, tag: Optional[str] = None
647 ) -> Dict[Any, Any]:
648 """
649 Returns json of most recent test results.
650 Sets QC test result for board_id.
652 Args:
653 board_id: the board
654 return_only_failures: only return test result info if failed
655 tag: tag of QC thresholds to use
657 Returns:
658 dict:
659 json dictionary of "passed":True/False
660 the test data var_label:value
661 "complete":True/False if all test data is available
662 a message
663 """
665 message: str = ""
667 # Grab all of the test data
668 q = self._query(
669 sql.SQL("""
670 SELECT
671 l.label,
672 d.board_id,
673 b.board_type,
674 b.board_version,
675 d.run_number,
676 d.value,
677 t.operator,
678 t.threshold
679 FROM {derived} d
680 JOIN {labels} l ON l.id=d.label_id
681 JOIN {boards} b ON b.board_id=d.board_id
682 JOIN {thresholds} t ON (
683 t.label_id=d.label_id
684 AND t.board_type=b.board_type
685 AND t.operator=operator
686 )
687 WHERE d.board_id={board_id}
688 """).format(
689 derived=sql.Identifier(self.schema["latest_data_view_name"]),
690 labels=sql.Identifier(self.schema["labels_table_name"]),
691 boards=sql.Identifier(self.schema["boards_table_name"]),
692 thresholds=(
693 sql.Identifier(self.schema["latest_thresholds_view_name"])
694 if tag is None
695 else sql.Identifier(self.schema["latest_thresholds_per_tag_view_name"])
696 ),
697 board_id=board_id,
698 )
699 + (sql.SQL("\nAND t.tag = {tag}").format(tag=tag) if tag is not None else sql.SQL(""))
700 )
701 board_passed = True
702 complete_test = False
704 if len(q) == 0:
705 message = f"Test data is empty for {board_id}"
706 log.error(message)
707 return {"passed": False, "complete_test": False, "data": [], "message": message}
709 test_data = [row._asdict() for row in q]
711 # Append passed per test
712 for d in test_data:
713 passed, error_msg = eval_test(d["value"], d["operator"], d["threshold"])
714 d["passed"] = passed
715 d["error_message"] = error_msg
716 if error_msg:
717 d["value"] = None
718 board_passed &= passed
720 if return_only_failures:
721 test_data_out = []
722 for d in test_data:
723 if not d["passed"]:
724 test_data_out.append(d)
725 else:
726 test_data_out = test_data
728 # Check if we have test_data for all tests
729 board_type = self.get_board_type(board_id)
730 if board_type:
731 all_checks = {json.dumps(t["label"], sort_keys=True) for t in self.get_thresholds(board_type, tag)}
732 test_set = {json.dumps(t["label"], sort_keys=True) for t in test_data}
733 complete_test = all_checks == test_set
734 missing_tests = [json.loads(t) for t in all_checks - test_set]
735 log.info(f"Missing tests: {missing_tests}")
736 message = f"{'Complete' if complete_test else 'Incomplete'} test results retrieved for {tag=}"
737 else:
738 message = f"board_type missing for board {board_id}"
739 log.error(message)
740 return {"passed": False, "complete_test": False, "data": test_data_out, "message": message}
742 # Update board status in QC database
743 qc_passed = board_passed and complete_test
744 qc_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f%z")
746 self._execute(
747 sql.SQL("""
748 UPDATE {boards}
749 SET
750 qc_passed={qc_passed},
751 qc_check_timestamp={qc_timestamp},
752 qc_check_tag={qc_tag}
753 WHERE board_id={board_id}
754 """).format(
755 boards=sql.Identifier(self.schema["boards_table_name"]),
756 qc_passed=qc_passed,
757 qc_timestamp=qc_timestamp,
758 qc_tag=("latest_cuts" if tag is None else tag),
759 board_id=board_id,
760 )
761 )
763 return {
764 "passed": (board_passed and complete_test),
765 "complete_test": complete_test,
766 "data": test_data_out,
767 "message": message,
768 }
771def fill_test_data(db: ProductionTestDB) -> bool:
772 """
773 Data for testing
774 """
775 try:
776 db.upload_label({"variable": "energy_resolution", "channel": 1, "gain": "lo"})
777 db.upload_label({"variable": "pedestal_rms", "channel": 1, "gain": "lo"})
778 db.upload_label({"variable": "n_channels", "channel": 1, "gain": "lo"})
779 db.upload_label({"variable": "energy_resolution", "channel": 1, "gain": "hi"})
780 db.upload_label({"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5})
781 db.upload_label({"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5})
783 db.upload_test_threshold({"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, ">", 5, "EM")
784 db.upload_test_threshold({"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, "<", 16, "EM")
785 db.upload_test_threshold({"variable": "n_channels", "channel": 1, "gain": "lo"}, "==", 128, "EM")
786 db.upload_test_threshold({"variable": "n_channels", "channel": 1, "gain": "lo"}, "==", 128, "HEC")
787 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "lo"}, ">", 1, "EM")
788 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "lo"}, "<", 12, "EM")
789 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "lo"}, "<", 9, "EM")
790 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "hi"}, ">", 1, "EM")
791 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "hi"}, "<", 9, "EM")
792 db.upload_test_threshold({"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}, ">", 23, "EM")
793 db.upload_test_threshold({"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}, "<", 24, "EM")
794 db.upload_test_threshold({"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}, ">", 23, "EM")
795 db.upload_test_threshold({"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}, "<", 24, "EM")
797 db.upload_test_data(
798 "E191703", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 20, "DEADBEEF"
799 )
800 db.upload_test_data(
801 "E191701", "EM", "v2.5", 2002, {"variable": "n_channels", "channel": 1, "gain": "lo"}, 128, "DEADBEEF"
802 )
803 db.upload_test_data(
804 "E191701", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 20, "DEADBEEF"
805 )
806 db.upload_test_data(
807 "E191701", "EM", "v2.5", 2002, {"variable": "energy_resolution", "channel": 1, "gain": "lo"}, 50, "DEADBEEF"
808 )
809 db.upload_test_data(
810 "E191702", "EM", "v2.5", 2002, {"variable": "energy_resolution", "channel": 1, "gain": "lo"}, 5, "DEADBEEF"
811 )
812 db.upload_test_data(
813 "E191702", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 12, "DEADBEEF"
814 )
815 db.upload_test_data(
816 "E191702", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 13, "DEADBEEF"
817 )
818 db.upload_test_data(
819 "E191702", "EM", "v2.5", 2002, {"variable": "n_channels", "channel": 1, "gain": "lo"}, 128, "DEADBEEF"
820 )
821 db.upload_test_data(
822 "E191701", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "hi"}, 20, "DEADBEEF"
823 )
824 db.upload_test_data(
825 "E191701", "EM", "v2.5", 2002, {"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}, 23.25, "DEADBEEF"
826 )
827 db.upload_test_data(
828 "E191701", "EM", "v2.5", 2003, {"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}, 23.75, "DEADBEEF"
829 )
831 return True
832 except Exception as e:
833 log.error("Failed to fill test db tables")
834 log.error(e)
835 raise e
838if __name__ == "__main__":
839 """
840 Start a REPL with interface to db
841 """
843 import sys
845 from rich.logging import RichHandler
847 from polars_analysis.db_interface import prod_db_data_uploader
849 # tell pyright we want this available in iPython
850 prod_db_data_uploader = prod_db_data_uploader
852 log_level = "INFO"
853 logging.basicConfig(
854 level=logging.getLevelNamesMapping()[log_level.upper()],
855 format="%(message)s", # "%(asctime)s:%(levelname)s:%(name)s:%(lineno)s: %(message)s"
856 handlers=[
857 RichHandler(
858 markup=True,
859 show_time=log_level.upper() == "DEBUG", # Only show timestamp for debug level
860 show_path=log_level.upper() == "DEBUG", # Only show filepath for debug level
861 )
862 ],
863 datefmt="[%Y-%m-%d %H:%M:%S]",
864 ) # ,uuu for milliseconds doesn't work with richhandler...
866 if len(sys.argv) == 2:
867 uri = sys.argv[1]
868 proddb = ProductionTestDB(uri)
869 else:
870 print("Provide a URI")
871 sys.exit()
873 IPython.embed(
874 header=f"""The interface `proddb` will be connected to {uri}
875"""
876 )