Coverage for polars_analysis / data_sources.py: 69%

542 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 17:11 -0400

1import logging 

2import os 

3import sys 

4from collections import defaultdict 

5from datetime import datetime 

6from glob import glob 

7from pathlib import Path 

8from typing import Any, Dict, List, Optional, Protocol, cast 

9 

10import numpy as np 

11import polars as pl 

12import polars.selectors as cs 

13 

14from polars_analysis.analysis.constants import MIDDLE_ATTENUATIONS 

15 

16# Instantiate logger 

17log = logging.getLogger(__name__) 

18 

19 

20class DataSource(Protocol): 

21 def load_raw_data( 

22 self, 

23 *run_numbers: int, 

24 require_positive: bool = False, 

25 require_unsaturated: bool = False, 

26 require_nonempty: bool = True, 

27 ignore_boards: List[str] = [], 

28 ) -> pl.DataFrame: ... 

29 

30 def load_derived_data(self, run_number: int, meas_type: str) -> pl.DataFrame: ... 

31 

32 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame: ... 

33 

34 def load_frame_data( 

35 self, *run_numbers: int, reject_single_adc: bool = False, non_empty: bool = True 

36 ) -> pl.DataFrame: ... 

37 

38 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame: ... 

39 

40 def load_lab_environment_data(self) -> pl.DataFrame: ... 

41 

42 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None: ... 

43 

44 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None: ... 

45 

46 def check_run_exists(self, run_number: int) -> bool: ... 

47 

48 def check_measurement_exists(self, run_number: int, measurement: int) -> bool: ... 

49 

50 def get_runs_summary(self) -> pl.DataFrame: ... 

51 

52 def get_boards_summary(self) -> pl.DataFrame: ... 

53 

54 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame: ... 

55 

56 def get_runs_list(self) -> pl.DataFrame: ... 

57 

58 def get_channels_list( 

59 self, 

60 run_number: int, 

61 meas_type: str, 

62 require_pulsed: bool = False, 

63 require_positive: bool = False, 

64 require_unsaturated: bool = False, 

65 require_nonempty: bool = True, 

66 ) -> pl.DataFrame: ... 

67 

68 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame: ... 

69 

70 def get_measurements_list(self, run_number: int) -> List[int]: ... 

71 

72 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]: ... 

73 

74 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame: 

75 """ 

76 Returns a Polars DataFrame with columns [channel, gain, samples_min, samples_max, samples_len] 

77 For use when checking if any samples arrays contain values outside the expected range 

78 """ 

79 ... 

80 

81 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int: ... 

82 

83 

84class DeltaSource(DataSource): 

85 def __init__( 

86 self, 

87 raw_data_dir: Optional[Path], 

88 derived_dir: Optional[Path], 

89 frame_dir: Optional[Path] = None, 

90 monitoring_dir: Optional[Path] = None, 

91 crate_lab_path: Path = Path("/data/feb2/lab-env/env_monitoring_crate_lab.csv"), 

92 test_stand_path: Path = Path("/data/feb2/lab-env/env_monitoring_test_stand.csv"), 

93 ) -> None: 

94 self.raw_dir = raw_data_dir 

95 self.derived_dir = derived_dir 

96 self.frame_dir = frame_dir 

97 self.monitoring_dir = monitoring_dir 

98 self.crate_lab_path = crate_lab_path 

99 self.test_stand_path = test_stand_path 

100 

101 if derived_dir is not None and not derived_dir.exists(): 

102 derived_dir.mkdir(exist_ok=True) 

103 os.chmod(derived_dir, 0o775) 

104 

105 def load_raw_data( 

106 self, 

107 *run_numbers: int, 

108 require_positive: bool = False, 

109 require_unsaturated: bool = False, 

110 require_nonempty: bool = True, 

111 ignore_boards: List[str] = [], 

112 ) -> pl.DataFrame: 

113 log.info(f"Loading raw data from DeltaSource {self.raw_dir}") 

114 if self.raw_dir is None: 

115 log.error("Raw data directory must be set to load raw data") 

116 sys.exit(1) 

117 raw_data = pl.scan_delta(str(self.raw_dir)).filter(pl.col("run_number").is_in(run_numbers)).collect() 

118 if require_positive: 

119 raw_data = raw_data.filter(pl.col("samples").list.min() > 0) 

120 if require_unsaturated: 

121 raw_data = raw_data.filter(pl.col("samples").list.max() < 2**15) 

122 if require_nonempty: 

123 raw_data = raw_data.filter(pl.col("samples").list.len() != 0) 

124 

125 if "board_version" not in raw_data.columns: 

126 raw_data = raw_data.with_columns(pl.lit("").alias("board_version")) 

127 

128 if raw_data.is_empty(): 

129 log.error(f"No raw data for {run_numbers=} found in {self.raw_dir}, exiting...") 

130 sys.exit(1) 

131 

132 # Add Null hps_ps_gain to handle old data without this column 

133 if "hps_ps_gain" not in raw_data.columns: 

134 raw_data = raw_data.with_columns(pl.lit(None).alias("hps_ps_gain")) 

135 

136 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode 

137 raw_data = raw_data.with_columns( 

138 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC"), 

139 pl.when(pl.col("alfe_mode").is_not_null()) 

140 .then(pl.col("alfe_mode")) 

141 .otherwise(pl.col("hps_ps_gain")) 

142 .alias("pas_mode"), 

143 ) 

144 

145 if len(ignore_boards) > 0: 

146 log.warning(f"Will skip loading boards {ignore_boards}") 

147 return raw_data.filter(~pl.col("board_id").is_in(ignore_boards)) 

148 else: 

149 return raw_data 

150 

151 def load_derived_data( 

152 self, 

153 run_number: int, 

154 meas_type: str, 

155 ) -> pl.DataFrame: 

156 log.debug(f"Loading derived data from {self.derived_dir}") 

157 if self.derived_dir is None: 

158 log.error("Derived directory must be set to load derived data") 

159 sys.exit(1) 

160 derived_df = ( 

161 pl.scan_parquet(self.derived_dir / f"{meas_type}_derived_values*{run_number}.parquet") 

162 .filter(pl.col("run_number") == run_number) 

163 .collect() 

164 ) 

165 

166 if derived_df.is_empty(): 

167 log.error(f"No derived {meas_type} data for run {run_number} in {self.derived_dir}, exiting...") 

168 log.error("Have you run [yellow]calc-save-runs[/yellow]?") 

169 sys.exit(1) 

170 

171 return derived_df 

172 

173 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame: 

174 log.debug(f"Loading coherent noise data from {self.derived_dir}") 

175 if self.derived_dir is None: 

176 log.error("Derived directory must be set to load coherent noise data") 

177 sys.exit(1) 

178 coherent_noise_df = pl.concat( 

179 [ 

180 pl.scan_parquet(p).filter(pl.col("run_number") == run_number).collect() 

181 for p in Path(self.derived_dir).glob(f"coherent_noise_*{run_number}.parquet") 

182 ], 

183 how="diagonal", 

184 ) 

185 

186 if coherent_noise_df.is_empty(): 

187 log.error(f"No coherent noise data for run {run_number} found in {self.derived_dir}, exiting...") 

188 log.error("Have you run [yellow]calc-save-runs[/yellow]?") 

189 log.error(f"Is {run_number} a pedestal run?") 

190 sys.exit(1) 

191 

192 return coherent_noise_df 

193 

194 def load_frame_data( 

195 self, 

196 *run_numbers: int, 

197 reject_single_adc: bool = False, 

198 non_empty: bool = True, 

199 ) -> pl.DataFrame: 

200 log.debug(f"Loading frame data from {self.frame_dir} with {reject_single_adc=} and {non_empty=}") 

201 if self.frame_dir is None: 

202 log.error("Frame directory must be set to load frame data") 

203 sys.exit(1) 

204 

205 # Collect Filters 

206 filters = [] 

207 if reject_single_adc: 

208 filters.append(pl.col("frame8").list.len() < 1e5) 

209 if non_empty: 

210 filters.append(pl.col("frame8").list.len() != 0) 

211 

212 frame_data = pl.DataFrame() 

213 

214 if len(run_numbers) < 50: 

215 # Collecting a bunch of runs with scan_delta (and maybe pyarrow) uses a lot of memory 

216 # So don't do this if we're trying to get many runs. 50 is ad-hoc from testing 

217 try: 

218 # use_pyarrow=True avoids schema errors 

219 log.debug("Scan_delta") 

220 df_lazy = pl.scan_delta(str(self.frame_dir), use_pyarrow=True) 

221 if filters: 

222 df_lazy = df_lazy.filter(filters) 

223 

224 frame_data = df_lazy.filter(pl.col("run_number").is_in(run_numbers)).collect() 

225 except pl.exceptions.SchemaError: 

226 log.warning( 

227 f"""Runs {run_numbers} caused a schema error with scan_delta.\ 

228 Trying scan_parquet and picking largest file""" 

229 ) 

230 

231 if frame_data.is_empty(): # pragma: no cover 

232 log.debug("Empty frame, trying scan_parquet") 

233 

234 # Collect run directory paths and run numbers 

235 run_dirs = [ 

236 (p, int(str(p).split("run_number=")[-1])) 

237 for p in Path(str(self.frame_dir)).glob("*run*") 

238 if int(str(p).split("run_number=")[-1]) in run_numbers 

239 ] 

240 

241 dfs_to_concat = [] 

242 for rd, run_number in run_dirs: 

243 log.debug(f"Scanning: {rd}") 

244 

245 files = glob(f"{str(rd)}/*") 

246 if len(files) == 1: 

247 file_path = files[0] 

248 else: 

249 # single ADC data is merged into one file 

250 log.info(f"Multiple files in {rd}, picking the largest for frame data.") 

251 largest_file_idx = np.argmax([os.path.getsize(f) for f in files]) 

252 file_path = files[largest_file_idx] 

253 

254 try: 

255 dfs_to_concat.append( 

256 pl.scan_parquet(file_path) 

257 .filter(filters) 

258 .with_columns( 

259 # felix_bcid and felix_evt_count are UInt in a few runs around 2170 ish 

260 cs.by_dtype(pl.List(pl.UInt16())).cast(pl.List(pl.Int16())), 

261 # cs.matches("felix_bcid").cast(pl.List(pl.UInt16())).alias("felix_bcid"), 

262 pl.lit(run_number).alias("run_number"), 

263 ) 

264 .collect() 

265 ) 

266 except pl.exceptions.SchemaError as e: 

267 # This run can't be loaded due to the datatype in the felix_evt_count column (eg 2204) 

268 log.warning(rf"{e}") 

269 log.warning(f"Schema error when scanning {file_path}") 

270 log.warning(f"Skipping run: {run_number}") 

271 

272 if len(dfs_to_concat) > 0: 

273 frame_data = pl.concat(dfs_to_concat, how="diagonal") 

274 

275 if frame_data.is_empty(): 

276 log.error( 

277 f"No frame data for run {run_numbers} in {self.frame_dir} with {reject_single_adc=} and {non_empty=}, exiting..." # noqa: E501 

278 ) 

279 sys.exit(1) 

280 

281 frame_data = frame_data.with_columns(pl.col("adc").str.slice(3).str.to_integer().alias("adc")) 

282 

283 return frame_data 

284 

285 def load_monitoring_data( 

286 self, 

287 *run_numbers: int, 

288 ) -> pl.DataFrame: 

289 log.debug(f"Loading monitoring data from {self.monitoring_dir}") 

290 if self.monitoring_dir is None: 

291 log.error("Monitoring data directory must be set to load monitoring data") 

292 sys.exit(1) 

293 monitoring_data = ( 

294 pl.scan_delta(str(self.monitoring_dir)).filter(pl.col("run_number").is_in(run_numbers)).collect() 

295 ) 

296 if monitoring_data.is_empty(): 

297 log.error(f"No monitoring data for {run_numbers=} found in {self.monitoring_dir}, exiting...") 

298 sys.exit(1) 

299 monitoring_data = monitoring_data.with_columns( 

300 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC") 

301 ) 

302 return monitoring_data 

303 

304 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None: 

305 if self.derived_dir is None: 

306 log.error("Derived directory must be set to save derived data") 

307 sys.exit(1) 

308 derived_data.write_parquet( 

309 self.derived_dir / f"{meas_type}_derived_values_run{run_number:04}.parquet", compression="zstd" 

310 ) 

311 

312 def load_lab_environment_data(self) -> pl.DataFrame: 

313 lab_env_data_UTC = pl.concat([pl.read_csv(self.crate_lab_path), pl.read_csv(self.test_stand_path)]) 

314 lab_env_data = lab_env_data_UTC.with_columns( 

315 pl.col("datetime_utc").str.to_datetime(time_zone="UTC").alias("timestamp") 

316 ).drop("datetime_utc") 

317 return lab_env_data 

318 

319 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None: 

320 if self.derived_dir is None: 

321 log.error("Derived directory must be set to save coherent noise data") 

322 sys.exit(1) 

323 

324 # TODO data_sum could be added to a pytest and this moved into a less obscure place 

325 # Don't save data_sum, a column with type List[Float64] and length n_samples 

326 coherent_noise_data.select(pl.exclude("data_sum")).write_parquet( 

327 self.derived_dir / f"coherent_noise_run{run_number:04}.parquet", compression="zstd" 

328 ) 

329 

330 def check_run_exists(self, run_number: int) -> bool: 

331 """ 

332 Check if a run exists exists in data_dir. 

333 

334 Args: 

335 run_number: the run number, eg 770 

336 

337 Returns: 

338 True if the run exists, False otherwise 

339 """ 

340 if self.raw_dir is None: 

341 log.error("Raw data directory must be set") 

342 sys.exit(1) 

343 try: 

344 return not pl.scan_delta(str(self.raw_dir)).filter(pl.col("run_number") == run_number).collect().is_empty() 

345 except Exception as e: 

346 log.error(f"check_run_exists error for {run_number=} and raw_dir={self.raw_dir}") 

347 log.error("You might have an incomplete data dir, eg missing the _delta_log directory") 

348 log.error(type(e)) 

349 log.error(e) 

350 return False 

351 

352 def check_measurement_exists(self, run_number: int, measurement: int) -> bool: 

353 """ 

354 Check if a measurement exists in a run in data_dir. 

355 

356 Args: 

357 run_number: the run number, eg 770 

358 measurement: the measurement number, eg 1 

359 

360 Returns: 

361 True if the measurement exists, False otherwise 

362 """ 

363 if self.raw_dir is None: 

364 log.error("Raw data directory must be set") 

365 sys.exit(1) 

366 return ( 

367 not pl.scan_delta(str(self.raw_dir)) 

368 .filter(pl.col("run_number") == run_number, pl.col("measurement") == measurement) 

369 .collect() 

370 .is_empty() 

371 ) 

372 

373 def get_runs_list(self) -> pl.DataFrame: 

374 if self.raw_dir is None: 

375 log.error("Raw data directory must be set") 

376 sys.exit(1) 

377 run_numbers = ( 

378 pl.scan_delta(str(self.raw_dir)) 

379 .select([pl.col("run_number"), pl.col("meas_type")]) 

380 .unique() 

381 .sort("run_number") 

382 .collect() 

383 ) 

384 return run_numbers 

385 

386 def get_runs_summary(self) -> pl.DataFrame: 

387 if self.raw_dir is None: 

388 log.error("Raw data directory must be set") 

389 sys.exit(1) 

390 

391 df = ( 

392 pl.scan_delta(str(self.raw_dir)) 

393 .sort("measurement") 

394 .unique(["run_number", "meas_type", "att_val", "board_id", "channel"], keep="first") 

395 ) 

396 

397 # Add Null hps_ps_gain to handle old data without this column 

398 if "hps_ps_gain" not in df.collect_schema().names(): 

399 df = df.with_columns(pl.lit(None).alias("hps_ps_gain")) 

400 

401 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode 

402 return ( 

403 df.with_columns( 

404 pl.when(pl.col("alfe_mode").is_not_null()) 

405 .then(pl.col("alfe_mode")) 

406 .otherwise(pl.col("hps_ps_gain")) 

407 .alias("pas_mode") 

408 ) 

409 .group_by(["run_number", "meas_type", "pas_mode"]) 

410 .agg( 

411 pl.col("att_val").unique().sort(), 

412 pl.col("board_id").unique().sort(), 

413 pl.col("timestamp").first(), 

414 pl.col("channel").unique().sort(), 

415 ) 

416 .select(["run_number", "board_id", "meas_type", "att_val", "pas_mode", "timestamp", "channel"]) 

417 .sort("run_number", descending=True) 

418 .collect() 

419 ) 

420 

421 def get_channels_list( 

422 self, 

423 run_number: int, 

424 meas_type: str, 

425 require_pulsed: bool = False, 

426 require_positive: bool = False, 

427 require_unsaturated: bool = False, 

428 require_nonempty: bool = True, 

429 ) -> pl.DataFrame: 

430 if self.raw_dir is None: 

431 log.error("Raw data directory must be set") 

432 sys.exit(1) 

433 return ( 

434 pl.scan_delta(str(self.raw_dir)) 

435 .filter( 

436 pl.col("run_number") == run_number, 

437 ) 

438 .collect() 

439 .filter( 

440 pl.col("meas_type") == meas_type, 

441 pl.col("samples").list.len() != 0 if require_nonempty else True, 

442 pl.col("samples").list.min() > 0 if require_positive else True, 

443 pl.col("samples").list.max() < 2**15 if require_unsaturated else True, 

444 pl.col("is_pulsed") if require_pulsed else True, 

445 ) 

446 .select(pl.col("channel").unique()) 

447 ) 

448 

449 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame: 

450 return self.load_derived_data(run_number, meas_type).select(pl.col("amp").unique()) 

451 

452 def get_boards_summary(self) -> pl.DataFrame: 

453 if self.raw_dir is None: 

454 log.error("Raw data directory must be set") 

455 sys.exit(1) 

456 return ( 

457 pl.scan_delta(str(self.raw_dir)) 

458 .sort("run_number", descending=True) 

459 .unique(["board_id"], keep="first") 

460 .select(["board_id", "timestamp"]) 

461 .collect() 

462 ) 

463 

464 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame: 

465 if self.raw_dir is None: 

466 log.error("Raw data directory must be set") 

467 sys.exit(1) 

468 

469 filters = [] 

470 if run_number is not None: 

471 filters.append(pl.col("run_number") == run_number) 

472 

473 if meas_type is not None: 

474 filters.append(pl.col("meas_type") == meas_type) 

475 

476 df_lazy = pl.scan_delta(str(self.raw_dir)) 

477 if filters: 

478 df_lazy = df_lazy.filter(filters) 

479 

480 if "board_version" in df_lazy.collect_schema().names(): 

481 return ( 

482 df_lazy.select("board_id", "board_version", pl.col("board_variant").alias("board_type")) 

483 .unique() 

484 .collect() 

485 ) 

486 else: 

487 # BNL data doesn't have board_version 

488 return df_lazy.select("board_id").unique().collect() 

489 

490 def get_measurements_list(self, run_number: int) -> List[int]: 

491 df = pl.scan_delta(str(self.raw_dir)).filter(pl.col("run_number") == run_number).select("measurement").collect() 

492 

493 return df.select(pl.col("measurement").unique()).to_series().to_list() 

494 

495 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]: 

496 df = ( 

497 pl.scan_delta(str(self.raw_dir)) 

498 .filter(pl.col("run_number") == run_number) 

499 .select("meas_type", "measurement") 

500 .collect() 

501 ) 

502 if measurement is not None: 

503 df = df.filter(pl.col("measurement") == measurement) 

504 measurement_types = df.select(pl.col("meas_type").unique()).to_series().to_list() 

505 

506 return measurement_types 

507 

508 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int: 

509 df = pl.concat( 

510 pl.scan_delta(str(self.raw_dir)) 

511 .filter(pl.col("run_number") == run_number) 

512 .select("run_number", "att_val") 

513 .collect() 

514 for run_number in run_numbers 

515 ) 

516 result_df = df.filter(pl.col("att_val").is_in(MIDDLE_ATTENUATIONS)) 

517 if result_df.is_empty(): 

518 log.error(f"Did not find a run with att_val in {MIDDLE_ATTENUATIONS} in runs {run_numbers}") 

519 sys.exit(1) 

520 return result_df["run_number"][0] 

521 

522 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame: 

523 df = ( 

524 pl.scan_delta(str(self.raw_dir)) 

525 .filter(pl.col("run_number") == run_number) 

526 .select( 

527 "channel", 

528 "gain", 

529 "board_id", 

530 pl.col("samples").list.min().alias("samples_min"), 

531 pl.col("samples").list.max().alias("samples_max"), 

532 pl.col("samples").list.len().alias("samples_len"), 

533 ) 

534 .collect() 

535 ) 

536 

537 return df 

538 

539 

540class ParquetSource(DeltaSource): 

541 """ 

542 For loading raw data from specific parquet files 

543 """ 

544 

545 def __init__( 

546 self, 

547 raw_data_path: Optional[Path], 

548 derived_path: Optional[Path], 

549 frame_path: Optional[Path] = None, 

550 monitoring_dir: Optional[Path] = None, 

551 crate_lab_path: Path = Path("/data/feb2/lab-env/env_monitoring_crate_lab.csv"), 

552 test_stand_path: Path = Path("/data/feb2/lab-env/env_monitoring_test_stand.csv"), 

553 ) -> None: 

554 self.raw_dir = raw_data_path 

555 self.derived_path = derived_path 

556 if self.derived_path is not None: 

557 self.derived_dir = self.derived_path.parent 

558 else: 

559 self.derived_dir = Path("derived/") 

560 self.frame_dir = frame_path 

561 self.monitoring_dir = monitoring_dir 

562 self.crate_lab_path = crate_lab_path 

563 self.test_stand_path = test_stand_path 

564 

565 self.delta_source = DeltaSource( 

566 raw_data_path, 

567 self.derived_dir, 

568 frame_path, 

569 monitoring_dir, 

570 crate_lab_path, 

571 test_stand_path, 

572 ) 

573 

574 if self.derived_dir is not None and not self.derived_dir.exists(): 

575 self.derived_dir.mkdir(exist_ok=True) 

576 os.chmod(self.derived_dir, 0o775) 

577 

578 def load_raw_data( 

579 self, 

580 *run_numbers: int, 

581 require_positive: bool = False, 

582 require_unsaturated: bool = False, 

583 require_nonempty: bool = True, 

584 ignore_boards: List[str] = [], 

585 ) -> pl.DataFrame: 

586 log.info(f"Loading raw data from ParquetSource {self.raw_dir}") 

587 if self.raw_dir is None: 

588 log.error("Raw data directory must be set to load raw data") 

589 sys.exit(1) 

590 

591 if Path(self.raw_dir).is_dir(): 

592 log.error(f"{self.raw_dir=} must be a parquet filepath") 

593 sys.exit(1) 

594 

595 if len(run_numbers) != 1: 

596 log.error("Only one run number supported for direct parquet files.") 

597 sys.exit(1) 

598 

599 run_number = run_numbers[0] 

600 # maybe something for later 

601 # fpaths = [] 

602 # for run_number in run_numbers: 

603 # fpaths += glob(str(self.raw_path)+f"/*{run_number}*.parquet") + glob(str(self.raw_path)+f"/*{run_number}*/*.parquet") # noqa: E501 

604 raw_data = ( 

605 pl.scan_parquet(self.raw_dir) 

606 .collect() 

607 .filter( 

608 pl.col("samples").list.len() != 0, 

609 ) 

610 ) 

611 

612 # Add Null hps_ps_gain to handle old data without this column 

613 if "hps_ps_gain" not in raw_data.columns: 

614 raw_data = raw_data.with_columns(pl.lit(None).alias("hps_ps_gain")) 

615 

616 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode 

617 raw_data = raw_data.with_columns( 

618 pl.lit(run_number).alias("run_number"), 

619 pl.when(pl.col("alfe_mode").is_not_null()) 

620 .then(pl.col("alfe_mode")) 

621 .otherwise(pl.col("hps_ps_gain")) 

622 .alias("pas_mode"), 

623 ) 

624 

625 if require_positive: 

626 raw_data = raw_data.filter(pl.col("samples").list.min() > 0) 

627 if require_unsaturated: 

628 raw_data = raw_data.filter(pl.col("samples").list.max() < 2**15) 

629 if require_nonempty: 

630 raw_data = raw_data.filter(pl.col("samples").list.len() != 0) 

631 

632 if "board_version" not in raw_data.columns: 

633 raw_data = raw_data.with_columns(pl.lit("").alias("board_version")) 

634 

635 if raw_data.is_empty(): 

636 log.error(f"No raw data for {run_numbers=} found in {self.raw_dir}, exiting...") 

637 sys.exit(1) 

638 

639 raw_data = raw_data.with_columns( 

640 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC") 

641 ) 

642 if len(ignore_boards) > 0: 

643 log.warning(f"Will skip loading boards {ignore_boards}") 

644 return raw_data.filter(~pl.col("board_id").is_in(ignore_boards)) 

645 else: 

646 return raw_data 

647 

648 def load_derived_data( 

649 self, 

650 run_number: int, 

651 meas_type: str, 

652 ) -> pl.DataFrame: 

653 log.debug(f"Loading derived data from {self.derived_path}") 

654 if self.derived_path is None: 

655 log.error("Derived data path must be set to load derived data") 

656 sys.exit(1) 

657 derived_df = pl.read_parquet(self.derived_path).filter(pl.col("run_number") == run_number) 

658 

659 if derived_df.is_empty(): 

660 log.error(f"No derived {meas_type} data for run {run_number} in {self.derived_path}, exiting...") 

661 log.error("Have you run [yellow]calc-save-runs[/yellow]?") 

662 sys.exit(1) 

663 

664 return derived_df 

665 

666 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame: 

667 if self.raw_dir is None: 

668 log.error("Raw data directory must be set") 

669 sys.exit(1) 

670 

671 filters = [] 

672 if run_number is not None: 

673 filters.append(pl.col("run_number") == run_number) 

674 

675 if meas_type is not None: 

676 filters.append(pl.col("meas_type") == meas_type) 

677 

678 df_lazy = pl.scan_parquet(str(self.raw_dir)) 

679 if filters: 

680 df_lazy = df_lazy.filter(filters) 

681 

682 if "board_version" in df_lazy.collect_schema().names(): 

683 return ( 

684 df_lazy.select("board_id", "board_version", pl.col("board_variant").alias("board_type")) 

685 .unique() 

686 .collect() 

687 ) 

688 else: 

689 # BNL data doesn't have board_version 

690 return df_lazy.select("board_id").unique().collect() 

691 

692 def check_run_exists(self, run_number: int) -> bool: 

693 """ 

694 Does nothing for direct parquet files 

695 """ 

696 _ = run_number 

697 return True 

698 

699 def load_lab_environment_data(self) -> pl.DataFrame: 

700 # Allow for reading parquet file direct to help with tests 

701 if str(self.crate_lab_path).endswith(".parquet"): 

702 lab_env_data = pl.read_parquet(self.crate_lab_path) 

703 else: 

704 lab_env_data = self.delta_source.load_lab_environment_data() 

705 

706 if lab_env_data.is_empty(): 

707 log.error(f"No lab environment data in {self.crate_lab_path} or {self.test_stand_path}, exiting...") 

708 sys.exit(1) 

709 

710 return lab_env_data 

711 

712 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame: 

713 if not self.monitoring_dir: 

714 log.error("monitoring_dir must be set to load monitoring data") 

715 sys.exit(1) 

716 

717 if str(self.monitoring_dir).endswith(".parquet"): 

718 monitoring_df = pl.read_parquet(self.monitoring_dir).filter(pl.col("run_number").is_in(run_numbers)) 

719 else: 

720 monitoring_df = self.delta_source.load_monitoring_data(*run_numbers) 

721 

722 if monitoring_df.is_empty(): 

723 log.error(f"No monitoring data for runs {run_numbers} in {self.monitoring_dir}, exiting...") 

724 sys.exit(1) 

725 

726 return monitoring_df 

727 

728 

729class HDF5Source(DataSource): 

730 def __init__( 

731 self, 

732 raw_data_dir: Path, 

733 derived_dir: Optional[Path], 

734 crate_lab_path: Path = Path("/data/feb2/lab-env/env_monitoring_crate_lab.csv"), 

735 test_stand_path: Path = Path("/data/feb2/lab-env/env_monitoring_test_stand.csv"), 

736 ) -> None: 

737 self.raw_dir: Path = raw_data_dir 

738 self.derived_dir = derived_dir 

739 self.crate_lab_path = crate_lab_path 

740 self.test_stand_path = test_stand_path 

741 self.delta_source = DeltaSource( 

742 raw_data_dir / "samples", 

743 derived_dir, 

744 crate_lab_path=crate_lab_path, 

745 test_stand_path=test_stand_path, 

746 ) 

747 

748 if self.derived_dir is not None and not self.derived_dir.exists(): 

749 self.derived_dir.mkdir(exist_ok=True) 

750 os.chmod(self.derived_dir, 0o775) 

751 

752 @staticmethod 

753 def get_dataset_keys(f): 

754 import h5py # type: ignore 

755 

756 keys = [] 

757 f.visit(lambda key: keys.append(key) if isinstance(f[key], h5py.Dataset) else None) 

758 return keys 

759 

760 def load_raw_data( 

761 self, 

762 *run_numbers: int, 

763 require_positive: bool = False, 

764 require_unsaturated: bool = False, 

765 require_nonempty: bool = True, 

766 ignore_boards: List[str] = [], 

767 ) -> pl.DataFrame: 

768 import h5py 

769 

770 log.info("Loading raw data from HDF5Source") 

771 

772 if len(run_numbers) != 1: 

773 log.critical("Only single run numbers implemented for HDF5, exiting...") 

774 sys.exit(1) 

775 run_number = run_numbers[0] 

776 filename = str(self.raw_dir / f"run{run_number:04d}.hdf5") 

777 try: 

778 _ = h5py.File(filename) 

779 except FileNotFoundError: 

780 log.critical(f"Could not find input file {filename}") 

781 sys.exit(1) 

782 

783 samples_dict: Dict[str, List[Any]] = defaultdict(list) 

784 with h5py.File(filename) as f: 

785 dataset_keys = self.get_dataset_keys(f) 

786 for dataset_key in dataset_keys: 

787 key_list = dataset_key.split("/") 

788 if len(key_list) != 4: 

789 continue 

790 measurement, channel, gain, samples = key_list 

791 for key, value in f[measurement].attrs.items(): 

792 if value == "": 

793 samples_dict[key].append(None) 

794 elif key == "alfe_mode" and value == -99: 

795 samples_dict[key].append(None) 

796 elif key == "awg_amp": 

797 samples_dict[key].append(float(value)) 

798 elif key == "board_id": 

799 samples_dict[key].append(str(value)) 

800 elif key == "timestamp": 

801 samples_dict[key].append( 

802 datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f").replace(microsecond=0) 

803 ) 

804 elif key == "meas_chan": 

805 samples_dict[key].append(int(value[-3:])) 

806 samples_dict["is_pulsed"].append(int(channel[-3:]) == int(value[-3:])) 

807 else: 

808 samples_dict[key].append(value) 

809 meas_group = cast(h5py.Group, f[measurement]) 

810 for key, value in meas_group[channel].attrs.items(): 

811 # is_pulsed is filled based on meas_chan 

812 if key == "is_pulsed": 

813 continue 

814 samples_dict[key].append(value) 

815 samples_dict["measurement"].append(int(measurement.split("_")[1])) 

816 samples_dict["channel"].append(int(channel[-3:])) 

817 samples_dict["gain"].append(gain) 

818 meas_chan_group = cast(h5py.Group, meas_group[channel]) 

819 meas_chan_gain_group = cast(h5py.Group, meas_chan_group[gain]) 

820 samples_dataset = cast(h5py.Dataset, meas_chan_gain_group[samples]) 

821 samples_dict["samples"].append(samples_dataset[()]) 

822 

823 raw_data = pl.DataFrame(samples_dict).filter(pl.col("samples").list.len() != 0) 

824 if require_positive: 

825 raw_data = raw_data.filter(pl.col("samples").list.min() > 0) 

826 if require_unsaturated: 

827 raw_data = raw_data.filter(pl.col("samples").list.max() < 2**15) 

828 if require_nonempty: 

829 raw_data = raw_data.filter(pl.col("samples").list.len() != 0) 

830 raw_data = raw_data.with_columns( 

831 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC") 

832 ) 

833 print(f"HERE!! raw_data timestamp: {raw_data.select('timestamp').head(1)}") 

834 if len(ignore_boards) > 0: 

835 log.warning(f"Will skip loading boards {ignore_boards}") 

836 return raw_data.filter(~pl.col("board_id").is_in(ignore_boards)) 

837 else: 

838 return raw_data 

839 

840 def load_derived_data(self, run_number: int, meas_type: str) -> pl.DataFrame: 

841 return self.delta_source.load_derived_data(run_number, meas_type) 

842 

843 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame: 

844 return self.delta_source.load_coherent_noise_data(run_number) 

845 

846 def load_frame_data( 

847 self, *run_numbers: int, reject_single_adc: bool = False, non_empty: bool = True 

848 ) -> pl.DataFrame: 

849 return self.delta_source.load_frame_data(*run_numbers, reject_single_adc=reject_single_adc, non_empty=non_empty) 

850 

851 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None: 

852 self.delta_source.save_derived_data(derived_data, run_number, meas_type) 

853 

854 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None: 

855 self.delta_source.save_coherent_noise_data(coherent_noise_data, run_number) 

856 

857 def check_run_exists(self, run_number: int) -> bool: 

858 import h5py 

859 

860 filename = str(self.raw_dir / f"run{run_number:04d}.hdf5") 

861 try: 

862 _ = h5py.File(filename) 

863 except FileNotFoundError: 

864 log.error(f"Could not find HDF5 input file {filename} in {self.raw_dir}") 

865 return False 

866 return True 

867 

868 def check_measurement_exists(self, run_number: int, measurement: int) -> bool: 

869 import h5py 

870 

871 filename = str(self.raw_dir / f"run{run_number:04d}.hdf5") 

872 if not self.check_run_exists(run_number): 

873 return False 

874 with h5py.File(filename) as f: 

875 measurements = f.keys() 

876 if f"Measurement_{measurement:03d}" in measurements: 

877 return True 

878 else: 

879 return False 

880 

881 def get_runs_summary(self) -> pl.DataFrame: 

882 raise NotImplementedError 

883 

884 def get_boards_summary(self) -> pl.DataFrame: 

885 raise NotImplementedError 

886 

887 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame: 

888 raise NotImplementedError 

889 

890 def get_runs_list(self) -> pl.DataFrame: 

891 raise NotImplementedError 

892 

893 def get_channels_list( 

894 self, 

895 run_number: int, 

896 meas_type: str, 

897 require_pulsed: bool = False, 

898 require_positive: bool = False, 

899 require_unsaturated: bool = False, 

900 require_nonempty: bool = True, 

901 ) -> pl.DataFrame: 

902 raise NotImplementedError 

903 

904 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame: 

905 raise NotImplementedError 

906 

907 def load_lab_environment_data(self) -> pl.DataFrame: 

908 return self.delta_source.load_lab_environment_data() 

909 

910 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame: 

911 raise NotImplementedError 

912 

913 def get_measurements_list(self, run_number: int) -> List[int]: 

914 raise NotImplementedError 

915 

916 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]: 

917 raise NotImplementedError 

918 

919 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame: 

920 raise NotImplementedError 

921 

922 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int: 

923 raise NotImplementedError 

924 

925 

926class SQLSource(DataSource): 

927 def __init__(self, uri: str, derived_dir: Optional[Path] = None) -> None: 

928 self.uri = uri 

929 self.derived_dir = derived_dir 

930 

931 self.delta_source = DeltaSource(None, derived_dir) 

932 

933 if derived_dir is not None and not derived_dir.exists(): 

934 derived_dir.mkdir(exist_ok=True) 

935 os.chmod(derived_dir, 0o775) 

936 

937 def load_raw_data( 

938 self, 

939 *run_numbers: int, 

940 require_positive: bool = False, 

941 require_unsaturated: bool = False, 

942 require_nonempty: bool = True, 

943 ignore_boards: List[str] = [], 

944 ) -> pl.DataFrame: 

945 log.info("Loading raw data from SQLSource") 

946 

947 run_number_string = ",".join(f"'{r}'" for r in run_numbers) 

948 ignore_boards_string = ",".join(f"'{b}'" for b in ignore_boards) 

949 

950 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode 

951 query = f""" 

952 SELECT 

953 m.*, 

954 s.channel, 

955 s.gain, 

956 s.is_pulsed, 

957 s.samples, 

958 b.board_type as board_variant, 

959 b.board_version as board_version, 

960 r.githash, 

961 COALESCE(m.alfe_mode, m.hps_ps_gain) AS pas_mode 

962 FROM runs r 

963 JOIN measurements m 

964 ON r.run_number = m.run_number 

965 JOIN samples s 

966 ON m.id = s.measurement_id 

967 JOIN boards b 

968 ON m.board_id = b.board_id 

969 WHERE m.run_number in ({run_number_string}) 

970 """ 

971 if len(ignore_boards) > 0: 

972 log.warning(f"Will skip loading boards {ignore_boards}") 

973 query += f""" 

974 AND b.board_id NOT IN ({ignore_boards_string}) 

975 """ 

976 

977 if require_positive: 

978 query += "\nAND samples_min > 0" 

979 if require_unsaturated: 

980 query += f"\nAND samples_max < {2**15}" 

981 if require_nonempty: 

982 query += "\nAND samples_len > 0" 

983 

984 df = pl.read_database_uri(query, self.uri, partition_on="channel", partition_num=16) 

985 df = df.rename({"measurement_number": "measurement", "measurement_timestamp": "timestamp"}).drop("id") 

986 # Because the data is being loaded from multiple threads, sorting must be down outside of the SQL query 

987 df = df.sort(by=["run_number", "channel", "gain"]) 

988 return df 

989 

990 def load_derived_data(self, run_number: int, meas_type: str) -> pl.DataFrame: 

991 return self.delta_source.load_derived_data(run_number, meas_type) 

992 

993 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame: 

994 return self.delta_source.load_coherent_noise_data(run_number) 

995 

996 def load_frame_data( 

997 self, *run_numbers: int, reject_single_adc: bool = False, non_empty: bool = True 

998 ) -> pl.DataFrame: 

999 run_number_string = ",".join(str(r) for r in run_numbers) 

1000 query = f""" 

1001 SELECT 

1002 m.*, 

1003 f.adc, 

1004 f.channel, 

1005 f.frame, 

1006 h.felix_event_count, 

1007 CAST(h.felix_bcid AS smallint[]) AS felix_bcid 

1008 FROM runs r 

1009 JOIN measurements m 

1010 ON r.run_number = m.run_number 

1011 JOIN frames f 

1012 ON m.id = f.measurement_id 

1013 JOIN felix_headers h 

1014 ON m.id = h.measurement_id 

1015 WHERE r.run_number in ({run_number_string}) 

1016 """ 

1017 if reject_single_adc: 

1018 query += f"\nAND frame_max < {1e5}" 

1019 if non_empty: 

1020 query += "\nAND frame_len != 0" 

1021 df = pl.read_database_uri(query, self.uri, partition_on="adc", partition_num=16) 

1022 df = ( 

1023 df.drop("id") 

1024 .pivot(on="channel", values="frame") 

1025 .rename( 

1026 { 

1027 "1": "frame1", 

1028 "8": "frame8", 

1029 "measurement_number": "measurement", 

1030 "measurement_timestamp": "timestamp", 

1031 } 

1032 ) 

1033 ) 

1034 return df.sort(by=["run_number", "measurement", "adc"]) 

1035 

1036 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None: 

1037 self.delta_source.save_derived_data(derived_data, run_number, meas_type) 

1038 

1039 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None: 

1040 self.delta_source.save_coherent_noise_data(coherent_noise_data, run_number) 

1041 

1042 def check_run_exists(self, run_number: int) -> bool: 

1043 query = f"SELECT run_number FROM runs WHERE run_number = {run_number}" 

1044 df = pl.read_database_uri(query, self.uri) 

1045 return not df.is_empty() 

1046 

1047 def check_measurement_exists(self, run_number: int, measurement: int) -> bool: 

1048 query = f""" 

1049 SELECT run_number 

1050 FROM measurements 

1051 WHERE run_number = {run_number} 

1052 AND measurement_number = {measurement} 

1053 """ 

1054 df = pl.read_database_uri(query, self.uri) 

1055 return not df.is_empty() 

1056 

1057 def get_runs_summary(self) -> pl.DataFrame: 

1058 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode 

1059 query = """ 

1060 SELECT DISTINCT 

1061 r.run_number, 

1062 array_agg(DISTINCT m.board_id ORDER BY m.board_id) AS board_id, 

1063 r.run_timestamp AS timestamp, 

1064 m.meas_type, 

1065 array_agg(DISTINCT m.att_val ORDER BY m.att_val) AS att_val, 

1066 COALESCE(m.alfe_mode, m.hps_ps_gain) AS pas_mode, 

1067 array_agg(DISTINCT s.channel ORDER BY s.channel) AS channel 

1068 FROM runs r 

1069 JOIN measurements m 

1070 ON r.run_number = m.run_number 

1071 JOIN samples s 

1072 ON m.id = s.measurement_id 

1073 GROUP BY r.run_number, r.run_timestamp, m.meas_type, COALESCE(m.alfe_mode, m.hps_ps_gain) 

1074 ORDER BY r.run_number DESC 

1075 """ 

1076 return pl.read_database_uri(query, self.uri) 

1077 

1078 def get_boards_summary(self) -> pl.DataFrame: 

1079 query = """ 

1080 SELECT DISTINCT 

1081 r.run_number, 

1082 m.board_id, 

1083 r.run_timestamp as timestamp 

1084 FROM runs r 

1085 JOIN measurements m 

1086 USING (run_number) 

1087 ORDER BY run_number DESC 

1088 """ 

1089 return pl.read_database_uri(query, self.uri) 

1090 

1091 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame: 

1092 query = """ 

1093 SELECT DISTINCT 

1094 boards.board_id, 

1095 board_version, 

1096 board_type 

1097 FROM boards 

1098 JOIN measurements m 

1099 ON boards.board_id = m.board_id 

1100 """ 

1101 

1102 if run_number is not None: 

1103 query += f""" 

1104 WHERE m.run_number = {run_number} 

1105 """ 

1106 

1107 if meas_type is not None: 

1108 query += f""" 

1109 {"AND" if run_number is not None else "WHERE"} meas_type = '{meas_type}' 

1110 """ 

1111 query += """ 

1112 ORDER BY boards.board_id 

1113 """ 

1114 

1115 return pl.read_database_uri(query, self.uri) 

1116 

1117 def get_runs_list(self) -> pl.DataFrame: 

1118 query = """ 

1119 SELECT DISTINCT 

1120 run_number, 

1121 meas_type 

1122 FROM measurements 

1123 ORDER BY run_number 

1124 """ 

1125 return pl.read_database_uri(query, self.uri) 

1126 

1127 def get_channels_list( 

1128 self, 

1129 run_number: int, 

1130 meas_type: str, 

1131 require_pulsed: bool = False, 

1132 require_positive: bool = False, 

1133 require_unsaturated: bool = False, 

1134 require_nonempty: bool = True, 

1135 ) -> pl.DataFrame: 

1136 query = f""" 

1137 SELECT DISTINCT s.channel 

1138 FROM measurements m 

1139 JOIN samples s 

1140 ON m.id = s.measurement_id 

1141 WHERE m.run_number = {run_number} 

1142 AND m.meas_type = '{meas_type}' 

1143 """ 

1144 

1145 if require_pulsed: 

1146 query += "\nAND s.is_pulsed" 

1147 if require_positive: 

1148 query += "\nAND samples_min > 0" 

1149 if require_unsaturated: 

1150 query += f"\nAND samples_max < {2**15}" 

1151 if require_nonempty: 

1152 query += "\nAND samples_len > 0" 

1153 

1154 query += "\nORDER BY channel" 

1155 

1156 return pl.read_database_uri(query, self.uri) 

1157 

1158 def get_run_boards_list( 

1159 self, 

1160 run_number: int, 

1161 meas_type: str, 

1162 ) -> pl.DataFrame: 

1163 query = f""" 

1164 SELECT DISTINCT board_id 

1165 FROM measurements 

1166 WHERE run_number = {run_number} 

1167 AND meas_type = '{meas_type}' 

1168 ORDER BY board_id 

1169 """ 

1170 

1171 return pl.read_database_uri(query, self.uri) 

1172 

1173 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame: 

1174 return self.load_derived_data(run_number, meas_type).select(pl.col("amp").unique()) 

1175 

1176 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame: 

1177 run_number_string = ",".join(str(r) for r in run_numbers) 

1178 query = f""" 

1179 SELECT 

1180 mon.monitor, 

1181 mon.unit, 

1182 mon.monitor_type, 

1183 mon.group_name, 

1184 mon.ideal_value, 

1185 v.value, 

1186 v.run_number, 

1187 v.monitoring_timestamp AS timestamp, 

1188 v.board_id, 

1189 meas.measurement_number AS measurement 

1190 FROM monitors mon 

1191 JOIN monitor_values v 

1192 ON mon.id = v.monitor_id 

1193 LEFT OUTER JOIN measurements meas 

1194 ON v.measurement_id = meas.id 

1195 WHERE v.run_number IN ({run_number_string}) 

1196 """ 

1197 data = pl.read_database_uri(query, self.uri) 

1198 data_with_timezone = data.with_columns( 

1199 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC") 

1200 ) 

1201 return data_with_timezone 

1202 

1203 def get_measurements_list(self, run_number: int) -> List[int]: 

1204 query = f""" 

1205 SELECT DISTINCT measurement_number AS measurement 

1206 FROM measurements 

1207 WHERE run_number = {run_number} 

1208 ORDER BY measurement 

1209 """ 

1210 return pl.read_database_uri(query, self.uri).to_series().to_list() 

1211 

1212 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]: 

1213 query = f""" 

1214 SELECT DISTINCT meas_type 

1215 FROM measurements 

1216 WHERE run_number = {run_number} 

1217 """ 

1218 

1219 if measurement is not None: 

1220 query += f"\nAND measurement_number = {measurement}" 

1221 

1222 measurement_types = pl.read_database_uri(query, self.uri).to_series().to_list() 

1223 

1224 return measurement_types 

1225 

1226 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame: 

1227 query = f""" 

1228 SELECT 

1229 m.board_id, 

1230 s.channel, 

1231 s.gain, 

1232 s.samples_min, 

1233 s.samples_max, 

1234 s.samples_len 

1235 FROM measurements m 

1236 JOIN samples s 

1237 ON m.id = s.measurement_id 

1238 WHERE m.run_number = {run_number} 

1239 """ 

1240 return pl.read_database_uri(query, self.uri) 

1241 

1242 def load_lab_environment_data(self) -> pl.DataFrame: 

1243 query = """ 

1244 SELECT datetime_utc AS timestamp, lab_name, humidity, pressure, lab_temp, crate_temp 

1245 FROM lab_environment 

1246 """ 

1247 return pl.read_database_uri(query, self.uri) 

1248 

1249 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int: 

1250 run_number_string = ",".join(str(r) for r in run_numbers) 

1251 attenuations_string = ", ".join(str(a) for a in MIDDLE_ATTENUATIONS) 

1252 query = f""" 

1253 SELECT DISTINCT 

1254 run_number 

1255 FROM measurements 

1256 WHERE att_val IN ({attenuations_string}) 

1257 AND run_number IN ({run_number_string}) 

1258 LIMIT 1 

1259 """ 

1260 df = pl.read_database_uri(query, self.uri) 

1261 if df.is_empty(): 

1262 log.error(f"Did not find a run with att_val in {MIDDLE_ATTENUATIONS} in runs {run_numbers}") 

1263 sys.exit(1) 

1264 min_att_run_number = df["run_number"][0] 

1265 return min_att_run_number