Coverage for polars_analysis / db_interface / production_test_db.py: 65%

314 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 13:37 -0400

1import json 

2import logging 

3from contextlib import contextmanager 

4from datetime import datetime 

5from typing import Any, Dict, List, Literal, NamedTuple, Optional, Tuple, Union 

6 

7import IPython 

8import polars as pl 

9import psycopg 

10from psycopg import sql 

11from psycopg.conninfo import conninfo_to_dict, make_conninfo 

12from psycopg.rows import namedtuple_row 

13from psycopg.types.json import Jsonb 

14 

15# Instantiate logger 

16if __name__ == "__main__": 

17 log = logging.getLogger(__name__) 

18else: 

19 # Logger used by uvicorn 

20 log = logging.getLogger("uvicorn.error") 

21 

22""" 

23Interface class to production testing database. 

24Stores thresholds and data per item. 

25 

26Tables: 

27thresholds -- cut value and operator to check 

28threshold_labels -- variables to test and compare to thresholds 

29boards -- known boards and types 

30derived -- calculated values per label and board 

31 

32Views: 

33latest_thresholds -- thresholds with most recent creation date 

34latest_thresholds_per_tag -- thresholds with most recent creation date per tag 

35latest_derived -- derived values with most recent creation date 

36""" 

37 

38 

39def eval_test(value: float, operator: str, threshold: float) -> Tuple[bool, Optional[str]]: 

40 """ 

41 Evaluate a single test instance against a value 

42 

43 Returns: 

44 tuple: (passed, error_message) where error_message is not None iff test evaluation errored 

45 """ 

46 try: 

47 r = bool(eval(f"{value} {operator} {threshold}")) 

48 except Exception as e: 

49 error_msg = f"Evaluation error: {e}" 

50 log.error(error_msg) 

51 return False, error_msg 

52 return r, None 

53 

54 

55def no_null_json_dumps(data: dict) -> str: 

56 """ 

57 Custom json dumps function to ignore keys with None values 

58 """ 

59 return json.dumps({k: v for k, v in data.items() if v is not None}) 

60 

61 

62class ProductionTestDB: 

63 def __init__(self, postgres_uri: str): 

64 """ 

65 Class to interface with the database storing production test data and thresholds to compare against. 

66 

67 args: 

68 postgres_uri: URI for connecting to PostgreSQL database 

69 """ 

70 

71 self._postgres_uri: str = postgres_uri 

72 

73 self._conn: Optional[psycopg.Connection] = None 

74 self._curs: Optional[psycopg.Cursor[NamedTuple]] = None 

75 

76 # Useful schema strings 

77 self.schema = { 

78 "thresholds_table_name": "thresholds", 

79 "runs_table_name": "runs", 

80 "boards_table_name": "boards", 

81 "data_table_name": "derived", 

82 "labels_table_name": "threshold_labels", 

83 "latest_thresholds_view_name": "latest_thresholds", 

84 "latest_thresholds_per_tag_view_name": "latest_thresholds_per_tag", 

85 "latest_data_view_name": "latest_derived", 

86 } 

87 

88 def __del__(self): 

89 """ 

90 Clean up connections 

91 """ 

92 if self._conn: 

93 self._conn.close() 

94 

95 def __str__(self): 

96 conninfo = conninfo_to_dict(self._postgres_uri) 

97 _ = conninfo.pop("password", None) 

98 return make_conninfo("", **conninfo) 

99 

100 @contextmanager 

101 def _connection(self): 

102 """ 

103 Open connection to postgres 

104 """ 

105 if not self._conn: 

106 self._conn = psycopg.connect(self._postgres_uri) 

107 

108 yield self._conn 

109 

110 @contextmanager 

111 def _cursor(self): # -> Union[sqlite.Cursor, postgres.?] 

112 """ 

113 Return cursor, create if needed, for database 

114 """ 

115 if not self._curs: 

116 with self._connection() as conn: 

117 self._curs = conn.cursor(row_factory=namedtuple_row) 

118 yield self._curs 

119 

120 def _execute(self, cmd: sql.SQL | sql.Composed): 

121 """ 

122 Run SQL command without returning anything 

123 """ 

124 

125 log.debug(f"Executing SQL command: {cmd.as_string()}") 

126 with psycopg.connect(self._postgres_uri) as conn: 

127 with conn.cursor() as cursor: 

128 cursor.execute(cmd) 

129 

130 def _query(self, query: sql.SQL | sql.Composed) -> List[NamedTuple]: 

131 """ 

132 Run and fetch results of an SQL query (really any sql command) 

133 """ 

134 log.debug(f"Executing SQL query: {query.as_string()}") 

135 with psycopg.connect(self._postgres_uri) as conn: 

136 with conn.cursor(row_factory=namedtuple_row) as cursor: 

137 cursor.execute(query) 

138 result = cursor.fetchall() 

139 

140 return result 

141 

142 def _insert(self, table_name: str, data: dict, on_conflict_do_nothing=False) -> bool: 

143 """ 

144 Insert one data element into a table 

145 

146 args: 

147 table_name: name of the table to insert data into 

148 data: dictionary of column_name:value 

149 """ 

150 with psycopg.connect(self._postgres_uri) as conn: 

151 with conn.cursor() as cursor: 

152 # Verify that the table exists 

153 table_exists = False 

154 

155 cmd = sql.SQL("SELECT table_name FROM information_schema.tables WHERE table_name={}").format(table_name) 

156 

157 cursor.execute(cmd) 

158 

159 table_exists = len(cursor.fetchall()) > 0 

160 

161 if not table_exists: 

162 log.error(f"Table {table_name} does not exist in database") 

163 return False 

164 

165 # Extract column names and values 

166 columns = data.keys() 

167 

168 # Create SQL query 

169 query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format( 

170 sql.Identifier(table_name), 

171 sql.SQL(", ").join(map(sql.Identifier, columns)), 

172 sql.SQL(", ").join(map(sql.Placeholder, columns)), 

173 ) 

174 

175 if on_conflict_do_nothing: 

176 query = sql.SQL(" ").join([query, sql.SQL("ON CONFLICT DO NOTHING")]) 

177 

178 # Execute the query 

179 cursor.execute(query, data) 

180 conn.commit() 

181 

182 log.debug("Insert successful") 

183 return True 

184 

185 def _insert_many(self, table_name: str, data: List[dict], on_conflict_do_nothing=False) -> bool: 

186 """ 

187 Insert many data elements into a table 

188 

189 args: 

190 table_name: name of the table to insert data into 

191 data: list of dictionaries of column_name:value 

192 """ 

193 if len(data) == 0: 

194 log.error("Received empty list of data in _insert_many.") 

195 return False 

196 with psycopg.connect(self._postgres_uri) as conn: 

197 with conn.cursor() as cursor: 

198 # Verify that the table exists 

199 table_exists = False 

200 

201 cmd = sql.SQL("SELECT table_name FROM information_schema.tables WHERE table_name={}").format(table_name) 

202 

203 cursor.execute(cmd) 

204 

205 table_exists = len(cursor.fetchall()) > 0 

206 

207 if not table_exists: 

208 log.error(f"Table {table_name} does not exist in database") 

209 return False 

210 

211 # Extract column names and values 

212 columns = data[0].keys() 

213 

214 # Create SQL query 

215 query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format( 

216 sql.Identifier(table_name), 

217 sql.SQL(", ").join(map(sql.Identifier, columns)), 

218 sql.SQL(", ").join(map(sql.Placeholder, columns)), 

219 ) 

220 

221 if on_conflict_do_nothing: 

222 query = sql.SQL(" ").join([query, sql.SQL("ON CONFLICT DO NOTHING")]) 

223 

224 # Execute the query 

225 cursor.executemany(query, data) 

226 conn.commit() 

227 

228 log.debug("Insert successful") 

229 return True 

230 

231 def _insert_threshold_or_derived(self, table_name: str, data: dict) -> bool: 

232 """ 

233 Insert one threshold or derived value into a table 

234 Handles insertion of labels and retrieval of label_id in the inset query 

235 

236 args: 

237 table_name: name of the table to insert data into 

238 data: dictionary of column_name:value 

239 """ 

240 with self._connection() as con: 

241 with con.cursor() as cursor: 

242 # Verify that the table exists 

243 table_exists = False 

244 

245 cmd = sql.SQL("SELECT table_name FROM information_schema.tables WHERE table_name={}").format(table_name) 

246 

247 cursor.execute(cmd) 

248 

249 table_exists = len(cursor.fetchall()) > 0 

250 

251 if not table_exists: 

252 log.error(f"Table {table_name} does not exist in database") 

253 return False 

254 

255 # Extract column names and values 

256 columns = [k for k in data.keys() if k != "label"] 

257 

258 # Create SQL query 

259 query = sql.SQL( 

260 """ 

261 WITH label_insert AS ( 

262 INSERT INTO {labels_table_name} (label) 

263 VALUES ({label}) 

264 ON CONFLICT (label) DO NOTHING 

265 RETURNING id 

266 ), 

267 label_select AS ( 

268 SELECT id FROM label_insert 

269 UNION ALL 

270 SELECT id FROM {labels_table_name} WHERE label = {label} 

271 ) 

272 INSERT INTO {table} (label_id, {columns}) 

273 SELECT l.id, {names} 

274 FROM label_select l 

275 LIMIT 1 

276 """ 

277 ).format( 

278 table=sql.Identifier(table_name), 

279 columns=sql.SQL(", ").join(map(sql.Identifier, columns)), 

280 names=sql.SQL(", ").join(map(sql.Placeholder, columns)), 

281 labels_table_name=sql.Identifier(self.schema["labels_table_name"]), 

282 label=sql.Placeholder("label"), 

283 ) 

284 

285 # Execute the query 

286 cursor.execute(query, data) 

287 con.commit() 

288 

289 log.debug("Insert successful") 

290 return True 

291 

292 """ 

293 Database-type agnostic functions for interface 

294 """ 

295 

296 def upload_label(self, var_label: Dict[str, Any]) -> bool: 

297 try: 

298 return self._insert( 

299 self.schema["labels_table_name"], 

300 {"label": Jsonb(var_label, dumps=no_null_json_dumps)}, 

301 on_conflict_do_nothing=True, 

302 ) 

303 except psycopg.errors.UniqueViolation: 

304 if self._conn is not None: 

305 self._conn.rollback() 

306 log.info(f"{var_label} already exists in the db, skipping") 

307 return False 

308 

309 def upload_labels(self, var_labels: List[Dict[str, Any]]) -> bool: 

310 jsonb_labels = [{"label": Jsonb(var_label, dumps=no_null_json_dumps)} for var_label in var_labels] 

311 try: 

312 return self._insert_many(self.schema["labels_table_name"], jsonb_labels, on_conflict_do_nothing=True) 

313 except psycopg.errors.UniqueViolation: 

314 if self._conn is not None: 

315 self._conn.rollback() 

316 log.info(f"{var_labels} already exists in the db, skipping") 

317 return False 

318 

319 def get_valid_var_labels(self) -> List[Dict[str, Any]]: 

320 """ 

321 Returns list of all labels in table, along with their label id 

322 """ 

323 

324 query = sql.SQL("SELECT label FROM {}").format(sql.Identifier(self.schema["labels_table_name"])) 

325 v = self._query(query) 

326 return [i[0] for i in v] 

327 

328 def get_label_id(self, label: Dict[str, Any]) -> int: 

329 query = sql.SQL("SELECT id FROM {table} WHERE label={label}").format( 

330 table=sql.Identifier(self.schema["labels_table_name"]), 

331 label=Jsonb(label, dumps=no_null_json_dumps), 

332 ) 

333 return self._query(query)[0][0] 

334 

335 def upload_board(self, board_id: str, board_type: Literal["EM", "HEC"], board_version: Optional[str]) -> bool: 

336 if board_type not in ["EM", "HEC"]: 

337 log.error(f"{board_type} is not a known type: {['EM', 'HEC']}") 

338 return False 

339 

340 return self._insert( 

341 self.schema["boards_table_name"], 

342 {"board_id": board_id, "board_type": board_type, "board_version": board_version}, 

343 ) 

344 

345 def get_board_ids(self) -> List[str]: 

346 query = sql.SQL("SELECT board_id FROM {}").format(sql.Identifier(self.schema["boards_table_name"])) 

347 b = self._query(query) 

348 return [i[0] for i in b] 

349 

350 def get_board_type(self, board_id: str) -> Union[Literal["EM", "HEC"], None]: 

351 if board_id in self.get_board_ids(): 

352 query = sql.SQL("SELECT board_type FROM {table} WHERE board_id={board_id}").format( 

353 table=sql.Identifier(self.schema["boards_table_name"]), 

354 board_id=board_id, 

355 ) 

356 q = self._query(query) 

357 return q[0][0] 

358 else: 

359 log.warning(f"Board ID {board_id} doesn't exist") 

360 return None 

361 

362 def get_board_summary(self) -> pl.DataFrame: 

363 """ 

364 Fetches QC results for every board in QC database and returns as a pl.DataFrame 

365 """ 

366 query = sql.SQL(""" 

367 SELECT 

368 board_id, 

369 board_version, 

370 board_type, 

371 qc_check_tag, 

372 qc_passed, 

373 qc_check_timestamp 

374 FROM boards 

375 ORDER BY qc_check_timestamp DESC 

376 """) 

377 result = self._query(query) 

378 return pl.DataFrame(result) 

379 

380 def get_board_qc_summary_runs(self, board_id: str) -> pl.DataFrame: 

381 """ 

382 Fetches the list of runs used for the QC testing of the given board ID 

383 """ 

384 query = sql.SQL(""" 

385 SELECT 

386 run_number, 

387 l.label->'meas_type' AS meas_type, 

388 array_agg(DISTINCT l.label->'channel') AS channels 

389 FROM latest_derived d 

390 JOIN threshold_labels l 

391 ON d.label_id = l.id 

392 WHERE board_id = {board_id} 

393 AND l.label->>'meas_type' != 'qc' 

394 AND l.label->>'channel' IS NOT NULL 

395 GROUP BY run_number, meas_type 

396 ORDER BY run_number 

397 """).format(board_id=board_id) 

398 result = self._query(query) 

399 return pl.DataFrame(result) 

400 

401 def upload_test_threshold( 

402 self, 

403 var_label: Dict[str, Any], 

404 test_op: str, 

405 threshold: float, 

406 board_type: Literal["EM", "HEC"], 

407 tag: Optional[str] = None, 

408 ) -> bool: 

409 """ 

410 Add a new test to the database 

411 

412 Args: 

413 var_label: string name for the variable 

414 test_op: string representation of test to perform, eg var "<" threshold 

415 threshold: value to test against 

416 

417 Returns: 

418 status 

419 """ 

420 

421 # Require correct board type 

422 if board_type not in ["EM", "HEC"]: 

423 log.error(f"{board_type} is not a known type: {['EM', 'HEC']}") 

424 return False 

425 

426 # Insert label (or do nothing) and retrieve label_id, then insert threshold 

427 return self._insert_threshold_or_derived( 

428 self.schema["thresholds_table_name"], 

429 { 

430 "label": Jsonb(var_label, dumps=no_null_json_dumps), 

431 "operator": test_op, 

432 "threshold": threshold, 

433 "board_type": board_type, 

434 "tag": tag, 

435 }, 

436 ) 

437 

438 def get_thresholds(self, board_type: Literal["EM", "HEC"], tag: Optional[str] = None) -> List[dict]: 

439 """ 

440 Return list of dictionaries of threshold information based on the most recent entry. 

441 """ 

442 

443 if board_type not in ["EM", "HEC"]: 

444 log.error(f"{board_type} is not a known type: {['EM', 'HEC']}") 

445 return [] 

446 

447 q = self._query( 

448 sql.SQL(""" 

449 SELECT 

450 l.label, 

451 t.operator, 

452 t.threshold, 

453 t.board_type, 

454 t.label_id, 

455 t.tag 

456 FROM {thresholds} t 

457 JOIN {labels} l ON l.id=t.label_id 

458 WHERE t.board_type={board_type} 

459 """).format( 

460 thresholds=( 

461 sql.Identifier(self.schema["latest_thresholds_view_name"]) 

462 if tag is None 

463 else sql.Identifier(self.schema["latest_thresholds_per_tag_view_name"]) 

464 ), 

465 labels=sql.Identifier(self.schema["labels_table_name"]), 

466 board_type=board_type, 

467 ) 

468 + (sql.SQL("\nAND t.tag = {tag}").format(tag=tag) if tag is not None else sql.SQL("")) 

469 ) 

470 out = [row._asdict() for row in q] 

471 

472 return out 

473 

474 def print_thresholds(self, board_type: Literal["EM", "HEC"]): 

475 print(json.dumps(self.get_thresholds(board_type), indent=4)) 

476 

477 def get_threshold_tags(self) -> List[str]: 

478 """ 

479 Return list of threshold tags, most recent first 

480 """ 

481 

482 q = self._query( 

483 sql.SQL(""" 

484 SELECT 

485 tag, 

486 MAX(created_at), 

487 MIN(created_at) 

488 FROM {thresholds} 

489 GROUP BY tag 

490 ORDER BY MAX(created_at) DESC 

491 """).format(thresholds=sql.Identifier(self.schema["latest_thresholds_per_tag_view_name"])) 

492 ) 

493 

494 df = pl.DataFrame( 

495 { 

496 "tag": [i[0] for i in q], 

497 "first_created": [i[2] for i in q], 

498 "last_edited": [i[1] for i in q], 

499 } 

500 ) 

501 

502 log.info(df) 

503 

504 return df["tag"].to_list() 

505 

506 def upload_test_data( 

507 self, 

508 board_id: str, 

509 board_type: Literal["EM", "HEC"], 

510 board_version: str, 

511 run_number: int, 

512 var_label: Dict[str, Any], 

513 value: float, 

514 githash: str, 

515 ) -> bool: 

516 """ 

517 Add new test result to database 

518 """ 

519 

520 # Check if board id exists, if not, create it 

521 if board_id not in self.get_board_ids(): 

522 if self.upload_board(board_id, board_type, board_version): 

523 log.info(f"New {board_id=} of type {board_type} inserted into database") 

524 else: 

525 log.error(f"Failed to add new board{board_id} to db") 

526 return False 

527 

528 # Check if board is right type 

529 stored_type = self.get_board_type(board_id) 

530 if stored_type != board_type: 

531 log.error(f"Board type for board {board_id} is {stored_type} not {board_type}") 

532 return False 

533 

534 # Check if there are unknown variables 

535 valid_vars = self.get_valid_var_labels() 

536 if var_label not in valid_vars: 

537 log.error(f"""Trying to insert unknown variable: {var_label}.\ 

538 Returning without inserting any data.""") 

539 return False 

540 

541 return self._insert( 

542 self.schema["data_table_name"], 

543 { 

544 "label_id": self.get_label_id(var_label), 

545 "board_id": board_id, 

546 "run_number": run_number, 

547 "value": value, 

548 "githash": githash, 

549 }, 

550 ) 

551 

552 def upload_test_data_multi( 

553 self, 

554 board_id: str, 

555 board_type: Optional[Literal["EM", "HEC"]], 

556 board_version: Optional[str], 

557 run_number: int, 

558 var_label_val: List[Tuple[Dict[str, Any], float]], 

559 githash: str, 

560 ) -> bool: 

561 """ 

562 Add new test result to database 

563 """ 

564 # Check if board id exists, if not, create it 

565 if board_id not in self.get_board_ids(): 

566 if board_type is None: 

567 log.error(f"Board {board_id} not found in DB and board type not provided") 

568 return False 

569 if self.upload_board(board_id, board_type, board_version): 

570 log.info(f"New {board_id=} of type {board_type} inserted into database") 

571 else: 

572 log.error(f"Failed to add new board{board_id} to db") 

573 return False 

574 

575 # Check if board is right type 

576 stored_type = self.get_board_type(board_id) 

577 if board_type is not None and stored_type != board_type: 

578 log.error(f"Board type for board {board_id} is {stored_type} not {board_type}") 

579 return False 

580 

581 # Loop through dictionary to insert data 

582 # _insert_threshold_or_derived inserts label (or does nothing) and retrieve label_id, then inserts threshold 

583 retval = True 

584 for label, value in var_label_val: 

585 log.debug(f"Trying to upload: {label}:{value}") 

586 retval &= self._insert_threshold_or_derived( 

587 self.schema["data_table_name"], 

588 { 

589 "label": Jsonb(label, dumps=no_null_json_dumps), 

590 "board_id": board_id, 

591 "run_number": run_number, 

592 "value": value, 

593 "githash": githash, 

594 }, 

595 ) 

596 

597 return retval 

598 

599 def get_test_data(self, board_id: int) -> list: 

600 """ 

601 Returns all test data for board 

602 

603 Args: 

604 board_id: the board 

605 

606 Returns: 

607 dict: 

608 list of all test data 

609 """ 

610 

611 # Grab all of the test data 

612 q = self._query( 

613 sql.SQL(""" 

614 SELECT 

615 d.created_at, 

616 l.label, 

617 d.board_id, 

618 b.board_type, 

619 b.board_version, 

620 d.run_number, 

621 d.value, 

622 d.githash, 

623 r.username 

624 FROM {derived} d 

625 JOIN {labels} l ON l.id=d.label_id 

626 JOIN {boards} b ON b.board_id=d.board_id 

627 JOIN {runs} r ON r.run_number=d.run_number 

628 WHERE d.board_id={board_id}""").format( 

629 derived=sql.Identifier(self.schema["data_table_name"]), 

630 labels=sql.Identifier(self.schema["labels_table_name"]), 

631 boards=sql.Identifier(self.schema["boards_table_name"]), 

632 runs=sql.Identifier(self.schema["runs_table_name"]), 

633 board_id=board_id, 

634 ) 

635 ) 

636 

637 if len(q) == 0: 

638 log.warning(f"Test data is empty for {board_id}") 

639 return [] 

640 

641 test_data = [row._asdict() for row in q] 

642 

643 return test_data 

644 

645 def get_test_results( 

646 self, board_id: str, return_only_failures: bool = False, tag: Optional[str] = None 

647 ) -> Dict[Any, Any]: 

648 """ 

649 Returns json of most recent test results. 

650 Sets QC test result for board_id. 

651 

652 Args: 

653 board_id: the board 

654 return_only_failures: only return test result info if failed 

655 tag: tag of QC thresholds to use 

656 

657 Returns: 

658 dict: 

659 json dictionary of "passed":True/False 

660 the test data var_label:value 

661 "complete":True/False if all test data is available 

662 a message 

663 """ 

664 

665 message: str = "" 

666 

667 # Grab all of the test data 

668 q = self._query( 

669 sql.SQL(""" 

670 SELECT 

671 l.label, 

672 d.board_id, 

673 b.board_type, 

674 b.board_version, 

675 d.run_number, 

676 d.value, 

677 t.operator, 

678 t.threshold 

679 FROM {derived} d 

680 JOIN {labels} l ON l.id=d.label_id 

681 JOIN {boards} b ON b.board_id=d.board_id 

682 JOIN {thresholds} t ON ( 

683 t.label_id=d.label_id 

684 AND t.board_type=b.board_type 

685 AND t.operator=operator 

686 ) 

687 WHERE d.board_id={board_id} 

688 """).format( 

689 derived=sql.Identifier(self.schema["latest_data_view_name"]), 

690 labels=sql.Identifier(self.schema["labels_table_name"]), 

691 boards=sql.Identifier(self.schema["boards_table_name"]), 

692 thresholds=( 

693 sql.Identifier(self.schema["latest_thresholds_view_name"]) 

694 if tag is None 

695 else sql.Identifier(self.schema["latest_thresholds_per_tag_view_name"]) 

696 ), 

697 board_id=board_id, 

698 ) 

699 + (sql.SQL("\nAND t.tag = {tag}").format(tag=tag) if tag is not None else sql.SQL("")) 

700 ) 

701 board_passed = True 

702 complete_test = False 

703 

704 if len(q) == 0: 

705 message = f"Test data is empty for {board_id}" 

706 log.error(message) 

707 return {"passed": False, "complete_test": False, "data": [], "message": message} 

708 

709 test_data = [row._asdict() for row in q] 

710 

711 # Append passed per test 

712 for d in test_data: 

713 passed, error_msg = eval_test(d["value"], d["operator"], d["threshold"]) 

714 d["passed"] = passed 

715 d["error_message"] = error_msg 

716 if error_msg: 

717 d["value"] = None 

718 board_passed &= passed 

719 

720 if return_only_failures: 

721 test_data_out = [] 

722 for d in test_data: 

723 if not d["passed"]: 

724 test_data_out.append(d) 

725 else: 

726 test_data_out = test_data 

727 

728 # Check if we have test_data for all tests 

729 board_type = self.get_board_type(board_id) 

730 if board_type: 

731 all_checks = {json.dumps(t["label"], sort_keys=True) for t in self.get_thresholds(board_type, tag)} 

732 test_set = {json.dumps(t["label"], sort_keys=True) for t in test_data} 

733 complete_test = all_checks == test_set 

734 missing_tests = [json.loads(t) for t in all_checks - test_set] 

735 log.info(f"Missing tests: {missing_tests}") 

736 message = f"{'Complete' if complete_test else 'Incomplete'} test results retrieved for {tag=}" 

737 else: 

738 message = f"board_type missing for board {board_id}" 

739 log.error(message) 

740 return {"passed": False, "complete_test": False, "data": test_data_out, "message": message} 

741 

742 # Update board status in QC database 

743 qc_passed = board_passed and complete_test 

744 qc_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f%z") 

745 

746 self._execute( 

747 sql.SQL(""" 

748 UPDATE {boards} 

749 SET 

750 qc_passed={qc_passed}, 

751 qc_check_timestamp={qc_timestamp}, 

752 qc_check_tag={qc_tag} 

753 WHERE board_id={board_id} 

754 """).format( 

755 boards=sql.Identifier(self.schema["boards_table_name"]), 

756 qc_passed=qc_passed, 

757 qc_timestamp=qc_timestamp, 

758 qc_tag=("latest_cuts" if tag is None else tag), 

759 board_id=board_id, 

760 ) 

761 ) 

762 

763 return { 

764 "passed": (board_passed and complete_test), 

765 "complete_test": complete_test, 

766 "data": test_data_out, 

767 "message": message, 

768 } 

769 

770 

771def fill_test_data(db: ProductionTestDB) -> bool: 

772 """ 

773 Data for testing 

774 """ 

775 try: 

776 db.upload_label({"variable": "energy_resolution", "channel": 1, "gain": "lo"}) 

777 db.upload_label({"variable": "pedestal_rms", "channel": 1, "gain": "lo"}) 

778 db.upload_label({"variable": "n_channels", "channel": 1, "gain": "lo"}) 

779 db.upload_label({"variable": "energy_resolution", "channel": 1, "gain": "hi"}) 

780 db.upload_label({"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}) 

781 db.upload_label({"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}) 

782 

783 db.upload_test_threshold({"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, ">", 5, "EM") 

784 db.upload_test_threshold({"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, "<", 16, "EM") 

785 db.upload_test_threshold({"variable": "n_channels", "channel": 1, "gain": "lo"}, "==", 128, "EM") 

786 db.upload_test_threshold({"variable": "n_channels", "channel": 1, "gain": "lo"}, "==", 128, "HEC") 

787 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "lo"}, ">", 1, "EM") 

788 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "lo"}, "<", 12, "EM") 

789 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "lo"}, "<", 9, "EM") 

790 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "hi"}, ">", 1, "EM") 

791 db.upload_test_threshold({"variable": "energy_resolution", "channel": 1, "gain": "hi"}, "<", 9, "EM") 

792 db.upload_test_threshold({"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}, ">", 23, "EM") 

793 db.upload_test_threshold({"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}, "<", 24, "EM") 

794 db.upload_test_threshold({"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}, ">", 23, "EM") 

795 db.upload_test_threshold({"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}, "<", 24, "EM") 

796 

797 db.upload_test_data( 

798 "E191703", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 20, "DEADBEEF" 

799 ) 

800 db.upload_test_data( 

801 "E191701", "EM", "v2.5", 2002, {"variable": "n_channels", "channel": 1, "gain": "lo"}, 128, "DEADBEEF" 

802 ) 

803 db.upload_test_data( 

804 "E191701", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 20, "DEADBEEF" 

805 ) 

806 db.upload_test_data( 

807 "E191701", "EM", "v2.5", 2002, {"variable": "energy_resolution", "channel": 1, "gain": "lo"}, 50, "DEADBEEF" 

808 ) 

809 db.upload_test_data( 

810 "E191702", "EM", "v2.5", 2002, {"variable": "energy_resolution", "channel": 1, "gain": "lo"}, 5, "DEADBEEF" 

811 ) 

812 db.upload_test_data( 

813 "E191702", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 12, "DEADBEEF" 

814 ) 

815 db.upload_test_data( 

816 "E191702", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "lo"}, 13, "DEADBEEF" 

817 ) 

818 db.upload_test_data( 

819 "E191702", "EM", "v2.5", 2002, {"variable": "n_channels", "channel": 1, "gain": "lo"}, 128, "DEADBEEF" 

820 ) 

821 db.upload_test_data( 

822 "E191701", "EM", "v2.5", 2002, {"variable": "pedestal_rms", "channel": 1, "gain": "hi"}, 20, "DEADBEEF" 

823 ) 

824 db.upload_test_data( 

825 "E191701", "EM", "v2.5", 2002, {"variable": "gain_ratio", "channel": 1, "awg_amp": 1.5}, 23.25, "DEADBEEF" 

826 ) 

827 db.upload_test_data( 

828 "E191701", "EM", "v2.5", 2003, {"variable": "gain_ratio", "channel": 2, "awg_amp": 1.5}, 23.75, "DEADBEEF" 

829 ) 

830 

831 return True 

832 except Exception as e: 

833 log.error("Failed to fill test db tables") 

834 log.error(e) 

835 raise e 

836 

837 

838if __name__ == "__main__": 

839 """ 

840 Start a REPL with interface to db 

841 """ 

842 

843 import sys 

844 

845 from rich.logging import RichHandler 

846 

847 from polars_analysis.db_interface import prod_db_data_uploader 

848 

849 # tell pyright we want this available in iPython 

850 prod_db_data_uploader = prod_db_data_uploader 

851 

852 log_level = "INFO" 

853 logging.basicConfig( 

854 level=logging.getLevelNamesMapping()[log_level.upper()], 

855 format="%(message)s", # "%(asctime)s:%(levelname)s:%(name)s:%(lineno)s: %(message)s" 

856 handlers=[ 

857 RichHandler( 

858 markup=True, 

859 show_time=log_level.upper() == "DEBUG", # Only show timestamp for debug level 

860 show_path=log_level.upper() == "DEBUG", # Only show filepath for debug level 

861 ) 

862 ], 

863 datefmt="[%Y-%m-%d %H:%M:%S]", 

864 ) # ,uuu for milliseconds doesn't work with richhandler... 

865 

866 if len(sys.argv) == 2: 

867 uri = sys.argv[1] 

868 proddb = ProductionTestDB(uri) 

869 else: 

870 print("Provide a URI") 

871 sys.exit() 

872 

873 IPython.embed( 

874 header=f"""The interface `proddb` will be connected to {uri} 

875""" 

876 )