Coverage for polars_analysis / data_sources.py: 69%
542 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 17:11 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 17:11 -0400
1import logging
2import os
3import sys
4from collections import defaultdict
5from datetime import datetime
6from glob import glob
7from pathlib import Path
8from typing import Any, Dict, List, Optional, Protocol, cast
10import numpy as np
11import polars as pl
12import polars.selectors as cs
14from polars_analysis.analysis.constants import MIDDLE_ATTENUATIONS
16# Instantiate logger
17log = logging.getLogger(__name__)
20class DataSource(Protocol):
21 def load_raw_data(
22 self,
23 *run_numbers: int,
24 require_positive: bool = False,
25 require_unsaturated: bool = False,
26 require_nonempty: bool = True,
27 ignore_boards: List[str] = [],
28 ) -> pl.DataFrame: ...
30 def load_derived_data(self, run_number: int, meas_type: str) -> pl.DataFrame: ...
32 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame: ...
34 def load_frame_data(
35 self, *run_numbers: int, reject_single_adc: bool = False, non_empty: bool = True
36 ) -> pl.DataFrame: ...
38 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame: ...
40 def load_lab_environment_data(self) -> pl.DataFrame: ...
42 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None: ...
44 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None: ...
46 def check_run_exists(self, run_number: int) -> bool: ...
48 def check_measurement_exists(self, run_number: int, measurement: int) -> bool: ...
50 def get_runs_summary(self) -> pl.DataFrame: ...
52 def get_boards_summary(self) -> pl.DataFrame: ...
54 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame: ...
56 def get_runs_list(self) -> pl.DataFrame: ...
58 def get_channels_list(
59 self,
60 run_number: int,
61 meas_type: str,
62 require_pulsed: bool = False,
63 require_positive: bool = False,
64 require_unsaturated: bool = False,
65 require_nonempty: bool = True,
66 ) -> pl.DataFrame: ...
68 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame: ...
70 def get_measurements_list(self, run_number: int) -> List[int]: ...
72 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]: ...
74 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame:
75 """
76 Returns a Polars DataFrame with columns [channel, gain, samples_min, samples_max, samples_len]
77 For use when checking if any samples arrays contain values outside the expected range
78 """
79 ...
81 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int: ...
84class DeltaSource(DataSource):
85 def __init__(
86 self,
87 raw_data_dir: Optional[Path],
88 derived_dir: Optional[Path],
89 frame_dir: Optional[Path] = None,
90 monitoring_dir: Optional[Path] = None,
91 crate_lab_path: Path = Path("/data/feb2/lab-env/env_monitoring_crate_lab.csv"),
92 test_stand_path: Path = Path("/data/feb2/lab-env/env_monitoring_test_stand.csv"),
93 ) -> None:
94 self.raw_dir = raw_data_dir
95 self.derived_dir = derived_dir
96 self.frame_dir = frame_dir
97 self.monitoring_dir = monitoring_dir
98 self.crate_lab_path = crate_lab_path
99 self.test_stand_path = test_stand_path
101 if derived_dir is not None and not derived_dir.exists():
102 derived_dir.mkdir(exist_ok=True)
103 os.chmod(derived_dir, 0o775)
105 def load_raw_data(
106 self,
107 *run_numbers: int,
108 require_positive: bool = False,
109 require_unsaturated: bool = False,
110 require_nonempty: bool = True,
111 ignore_boards: List[str] = [],
112 ) -> pl.DataFrame:
113 log.info(f"Loading raw data from DeltaSource {self.raw_dir}")
114 if self.raw_dir is None:
115 log.error("Raw data directory must be set to load raw data")
116 sys.exit(1)
117 raw_data = pl.scan_delta(str(self.raw_dir)).filter(pl.col("run_number").is_in(run_numbers)).collect()
118 if require_positive:
119 raw_data = raw_data.filter(pl.col("samples").list.min() > 0)
120 if require_unsaturated:
121 raw_data = raw_data.filter(pl.col("samples").list.max() < 2**15)
122 if require_nonempty:
123 raw_data = raw_data.filter(pl.col("samples").list.len() != 0)
125 if "board_version" not in raw_data.columns:
126 raw_data = raw_data.with_columns(pl.lit("").alias("board_version"))
128 if raw_data.is_empty():
129 log.error(f"No raw data for {run_numbers=} found in {self.raw_dir}, exiting...")
130 sys.exit(1)
132 # Add Null hps_ps_gain to handle old data without this column
133 if "hps_ps_gain" not in raw_data.columns:
134 raw_data = raw_data.with_columns(pl.lit(None).alias("hps_ps_gain"))
136 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode
137 raw_data = raw_data.with_columns(
138 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC"),
139 pl.when(pl.col("alfe_mode").is_not_null())
140 .then(pl.col("alfe_mode"))
141 .otherwise(pl.col("hps_ps_gain"))
142 .alias("pas_mode"),
143 )
145 if len(ignore_boards) > 0:
146 log.warning(f"Will skip loading boards {ignore_boards}")
147 return raw_data.filter(~pl.col("board_id").is_in(ignore_boards))
148 else:
149 return raw_data
151 def load_derived_data(
152 self,
153 run_number: int,
154 meas_type: str,
155 ) -> pl.DataFrame:
156 log.debug(f"Loading derived data from {self.derived_dir}")
157 if self.derived_dir is None:
158 log.error("Derived directory must be set to load derived data")
159 sys.exit(1)
160 derived_df = (
161 pl.scan_parquet(self.derived_dir / f"{meas_type}_derived_values*{run_number}.parquet")
162 .filter(pl.col("run_number") == run_number)
163 .collect()
164 )
166 if derived_df.is_empty():
167 log.error(f"No derived {meas_type} data for run {run_number} in {self.derived_dir}, exiting...")
168 log.error("Have you run [yellow]calc-save-runs[/yellow]?")
169 sys.exit(1)
171 return derived_df
173 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame:
174 log.debug(f"Loading coherent noise data from {self.derived_dir}")
175 if self.derived_dir is None:
176 log.error("Derived directory must be set to load coherent noise data")
177 sys.exit(1)
178 coherent_noise_df = pl.concat(
179 [
180 pl.scan_parquet(p).filter(pl.col("run_number") == run_number).collect()
181 for p in Path(self.derived_dir).glob(f"coherent_noise_*{run_number}.parquet")
182 ],
183 how="diagonal",
184 )
186 if coherent_noise_df.is_empty():
187 log.error(f"No coherent noise data for run {run_number} found in {self.derived_dir}, exiting...")
188 log.error("Have you run [yellow]calc-save-runs[/yellow]?")
189 log.error(f"Is {run_number} a pedestal run?")
190 sys.exit(1)
192 return coherent_noise_df
194 def load_frame_data(
195 self,
196 *run_numbers: int,
197 reject_single_adc: bool = False,
198 non_empty: bool = True,
199 ) -> pl.DataFrame:
200 log.debug(f"Loading frame data from {self.frame_dir} with {reject_single_adc=} and {non_empty=}")
201 if self.frame_dir is None:
202 log.error("Frame directory must be set to load frame data")
203 sys.exit(1)
205 # Collect Filters
206 filters = []
207 if reject_single_adc:
208 filters.append(pl.col("frame8").list.len() < 1e5)
209 if non_empty:
210 filters.append(pl.col("frame8").list.len() != 0)
212 frame_data = pl.DataFrame()
214 if len(run_numbers) < 50:
215 # Collecting a bunch of runs with scan_delta (and maybe pyarrow) uses a lot of memory
216 # So don't do this if we're trying to get many runs. 50 is ad-hoc from testing
217 try:
218 # use_pyarrow=True avoids schema errors
219 log.debug("Scan_delta")
220 df_lazy = pl.scan_delta(str(self.frame_dir), use_pyarrow=True)
221 if filters:
222 df_lazy = df_lazy.filter(filters)
224 frame_data = df_lazy.filter(pl.col("run_number").is_in(run_numbers)).collect()
225 except pl.exceptions.SchemaError:
226 log.warning(
227 f"""Runs {run_numbers} caused a schema error with scan_delta.\
228 Trying scan_parquet and picking largest file"""
229 )
231 if frame_data.is_empty(): # pragma: no cover
232 log.debug("Empty frame, trying scan_parquet")
234 # Collect run directory paths and run numbers
235 run_dirs = [
236 (p, int(str(p).split("run_number=")[-1]))
237 for p in Path(str(self.frame_dir)).glob("*run*")
238 if int(str(p).split("run_number=")[-1]) in run_numbers
239 ]
241 dfs_to_concat = []
242 for rd, run_number in run_dirs:
243 log.debug(f"Scanning: {rd}")
245 files = glob(f"{str(rd)}/*")
246 if len(files) == 1:
247 file_path = files[0]
248 else:
249 # single ADC data is merged into one file
250 log.info(f"Multiple files in {rd}, picking the largest for frame data.")
251 largest_file_idx = np.argmax([os.path.getsize(f) for f in files])
252 file_path = files[largest_file_idx]
254 try:
255 dfs_to_concat.append(
256 pl.scan_parquet(file_path)
257 .filter(filters)
258 .with_columns(
259 # felix_bcid and felix_evt_count are UInt in a few runs around 2170 ish
260 cs.by_dtype(pl.List(pl.UInt16())).cast(pl.List(pl.Int16())),
261 # cs.matches("felix_bcid").cast(pl.List(pl.UInt16())).alias("felix_bcid"),
262 pl.lit(run_number).alias("run_number"),
263 )
264 .collect()
265 )
266 except pl.exceptions.SchemaError as e:
267 # This run can't be loaded due to the datatype in the felix_evt_count column (eg 2204)
268 log.warning(rf"{e}")
269 log.warning(f"Schema error when scanning {file_path}")
270 log.warning(f"Skipping run: {run_number}")
272 if len(dfs_to_concat) > 0:
273 frame_data = pl.concat(dfs_to_concat, how="diagonal")
275 if frame_data.is_empty():
276 log.error(
277 f"No frame data for run {run_numbers} in {self.frame_dir} with {reject_single_adc=} and {non_empty=}, exiting..." # noqa: E501
278 )
279 sys.exit(1)
281 frame_data = frame_data.with_columns(pl.col("adc").str.slice(3).str.to_integer().alias("adc"))
283 return frame_data
285 def load_monitoring_data(
286 self,
287 *run_numbers: int,
288 ) -> pl.DataFrame:
289 log.debug(f"Loading monitoring data from {self.monitoring_dir}")
290 if self.monitoring_dir is None:
291 log.error("Monitoring data directory must be set to load monitoring data")
292 sys.exit(1)
293 monitoring_data = (
294 pl.scan_delta(str(self.monitoring_dir)).filter(pl.col("run_number").is_in(run_numbers)).collect()
295 )
296 if monitoring_data.is_empty():
297 log.error(f"No monitoring data for {run_numbers=} found in {self.monitoring_dir}, exiting...")
298 sys.exit(1)
299 monitoring_data = monitoring_data.with_columns(
300 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC")
301 )
302 return monitoring_data
304 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None:
305 if self.derived_dir is None:
306 log.error("Derived directory must be set to save derived data")
307 sys.exit(1)
308 derived_data.write_parquet(
309 self.derived_dir / f"{meas_type}_derived_values_run{run_number:04}.parquet", compression="zstd"
310 )
312 def load_lab_environment_data(self) -> pl.DataFrame:
313 lab_env_data_UTC = pl.concat([pl.read_csv(self.crate_lab_path), pl.read_csv(self.test_stand_path)])
314 lab_env_data = lab_env_data_UTC.with_columns(
315 pl.col("datetime_utc").str.to_datetime(time_zone="UTC").alias("timestamp")
316 ).drop("datetime_utc")
317 return lab_env_data
319 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None:
320 if self.derived_dir is None:
321 log.error("Derived directory must be set to save coherent noise data")
322 sys.exit(1)
324 # TODO data_sum could be added to a pytest and this moved into a less obscure place
325 # Don't save data_sum, a column with type List[Float64] and length n_samples
326 coherent_noise_data.select(pl.exclude("data_sum")).write_parquet(
327 self.derived_dir / f"coherent_noise_run{run_number:04}.parquet", compression="zstd"
328 )
330 def check_run_exists(self, run_number: int) -> bool:
331 """
332 Check if a run exists exists in data_dir.
334 Args:
335 run_number: the run number, eg 770
337 Returns:
338 True if the run exists, False otherwise
339 """
340 if self.raw_dir is None:
341 log.error("Raw data directory must be set")
342 sys.exit(1)
343 try:
344 return not pl.scan_delta(str(self.raw_dir)).filter(pl.col("run_number") == run_number).collect().is_empty()
345 except Exception as e:
346 log.error(f"check_run_exists error for {run_number=} and raw_dir={self.raw_dir}")
347 log.error("You might have an incomplete data dir, eg missing the _delta_log directory")
348 log.error(type(e))
349 log.error(e)
350 return False
352 def check_measurement_exists(self, run_number: int, measurement: int) -> bool:
353 """
354 Check if a measurement exists in a run in data_dir.
356 Args:
357 run_number: the run number, eg 770
358 measurement: the measurement number, eg 1
360 Returns:
361 True if the measurement exists, False otherwise
362 """
363 if self.raw_dir is None:
364 log.error("Raw data directory must be set")
365 sys.exit(1)
366 return (
367 not pl.scan_delta(str(self.raw_dir))
368 .filter(pl.col("run_number") == run_number, pl.col("measurement") == measurement)
369 .collect()
370 .is_empty()
371 )
373 def get_runs_list(self) -> pl.DataFrame:
374 if self.raw_dir is None:
375 log.error("Raw data directory must be set")
376 sys.exit(1)
377 run_numbers = (
378 pl.scan_delta(str(self.raw_dir))
379 .select([pl.col("run_number"), pl.col("meas_type")])
380 .unique()
381 .sort("run_number")
382 .collect()
383 )
384 return run_numbers
386 def get_runs_summary(self) -> pl.DataFrame:
387 if self.raw_dir is None:
388 log.error("Raw data directory must be set")
389 sys.exit(1)
391 df = (
392 pl.scan_delta(str(self.raw_dir))
393 .sort("measurement")
394 .unique(["run_number", "meas_type", "att_val", "board_id", "channel"], keep="first")
395 )
397 # Add Null hps_ps_gain to handle old data without this column
398 if "hps_ps_gain" not in df.collect_schema().names():
399 df = df.with_columns(pl.lit(None).alias("hps_ps_gain"))
401 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode
402 return (
403 df.with_columns(
404 pl.when(pl.col("alfe_mode").is_not_null())
405 .then(pl.col("alfe_mode"))
406 .otherwise(pl.col("hps_ps_gain"))
407 .alias("pas_mode")
408 )
409 .group_by(["run_number", "meas_type", "pas_mode"])
410 .agg(
411 pl.col("att_val").unique().sort(),
412 pl.col("board_id").unique().sort(),
413 pl.col("timestamp").first(),
414 pl.col("channel").unique().sort(),
415 )
416 .select(["run_number", "board_id", "meas_type", "att_val", "pas_mode", "timestamp", "channel"])
417 .sort("run_number", descending=True)
418 .collect()
419 )
421 def get_channels_list(
422 self,
423 run_number: int,
424 meas_type: str,
425 require_pulsed: bool = False,
426 require_positive: bool = False,
427 require_unsaturated: bool = False,
428 require_nonempty: bool = True,
429 ) -> pl.DataFrame:
430 if self.raw_dir is None:
431 log.error("Raw data directory must be set")
432 sys.exit(1)
433 return (
434 pl.scan_delta(str(self.raw_dir))
435 .filter(
436 pl.col("run_number") == run_number,
437 )
438 .collect()
439 .filter(
440 pl.col("meas_type") == meas_type,
441 pl.col("samples").list.len() != 0 if require_nonempty else True,
442 pl.col("samples").list.min() > 0 if require_positive else True,
443 pl.col("samples").list.max() < 2**15 if require_unsaturated else True,
444 pl.col("is_pulsed") if require_pulsed else True,
445 )
446 .select(pl.col("channel").unique())
447 )
449 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame:
450 return self.load_derived_data(run_number, meas_type).select(pl.col("amp").unique())
452 def get_boards_summary(self) -> pl.DataFrame:
453 if self.raw_dir is None:
454 log.error("Raw data directory must be set")
455 sys.exit(1)
456 return (
457 pl.scan_delta(str(self.raw_dir))
458 .sort("run_number", descending=True)
459 .unique(["board_id"], keep="first")
460 .select(["board_id", "timestamp"])
461 .collect()
462 )
464 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame:
465 if self.raw_dir is None:
466 log.error("Raw data directory must be set")
467 sys.exit(1)
469 filters = []
470 if run_number is not None:
471 filters.append(pl.col("run_number") == run_number)
473 if meas_type is not None:
474 filters.append(pl.col("meas_type") == meas_type)
476 df_lazy = pl.scan_delta(str(self.raw_dir))
477 if filters:
478 df_lazy = df_lazy.filter(filters)
480 if "board_version" in df_lazy.collect_schema().names():
481 return (
482 df_lazy.select("board_id", "board_version", pl.col("board_variant").alias("board_type"))
483 .unique()
484 .collect()
485 )
486 else:
487 # BNL data doesn't have board_version
488 return df_lazy.select("board_id").unique().collect()
490 def get_measurements_list(self, run_number: int) -> List[int]:
491 df = pl.scan_delta(str(self.raw_dir)).filter(pl.col("run_number") == run_number).select("measurement").collect()
493 return df.select(pl.col("measurement").unique()).to_series().to_list()
495 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]:
496 df = (
497 pl.scan_delta(str(self.raw_dir))
498 .filter(pl.col("run_number") == run_number)
499 .select("meas_type", "measurement")
500 .collect()
501 )
502 if measurement is not None:
503 df = df.filter(pl.col("measurement") == measurement)
504 measurement_types = df.select(pl.col("meas_type").unique()).to_series().to_list()
506 return measurement_types
508 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int:
509 df = pl.concat(
510 pl.scan_delta(str(self.raw_dir))
511 .filter(pl.col("run_number") == run_number)
512 .select("run_number", "att_val")
513 .collect()
514 for run_number in run_numbers
515 )
516 result_df = df.filter(pl.col("att_val").is_in(MIDDLE_ATTENUATIONS))
517 if result_df.is_empty():
518 log.error(f"Did not find a run with att_val in {MIDDLE_ATTENUATIONS} in runs {run_numbers}")
519 sys.exit(1)
520 return result_df["run_number"][0]
522 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame:
523 df = (
524 pl.scan_delta(str(self.raw_dir))
525 .filter(pl.col("run_number") == run_number)
526 .select(
527 "channel",
528 "gain",
529 "board_id",
530 pl.col("samples").list.min().alias("samples_min"),
531 pl.col("samples").list.max().alias("samples_max"),
532 pl.col("samples").list.len().alias("samples_len"),
533 )
534 .collect()
535 )
537 return df
540class ParquetSource(DeltaSource):
541 """
542 For loading raw data from specific parquet files
543 """
545 def __init__(
546 self,
547 raw_data_path: Optional[Path],
548 derived_path: Optional[Path],
549 frame_path: Optional[Path] = None,
550 monitoring_dir: Optional[Path] = None,
551 crate_lab_path: Path = Path("/data/feb2/lab-env/env_monitoring_crate_lab.csv"),
552 test_stand_path: Path = Path("/data/feb2/lab-env/env_monitoring_test_stand.csv"),
553 ) -> None:
554 self.raw_dir = raw_data_path
555 self.derived_path = derived_path
556 if self.derived_path is not None:
557 self.derived_dir = self.derived_path.parent
558 else:
559 self.derived_dir = Path("derived/")
560 self.frame_dir = frame_path
561 self.monitoring_dir = monitoring_dir
562 self.crate_lab_path = crate_lab_path
563 self.test_stand_path = test_stand_path
565 self.delta_source = DeltaSource(
566 raw_data_path,
567 self.derived_dir,
568 frame_path,
569 monitoring_dir,
570 crate_lab_path,
571 test_stand_path,
572 )
574 if self.derived_dir is not None and not self.derived_dir.exists():
575 self.derived_dir.mkdir(exist_ok=True)
576 os.chmod(self.derived_dir, 0o775)
578 def load_raw_data(
579 self,
580 *run_numbers: int,
581 require_positive: bool = False,
582 require_unsaturated: bool = False,
583 require_nonempty: bool = True,
584 ignore_boards: List[str] = [],
585 ) -> pl.DataFrame:
586 log.info(f"Loading raw data from ParquetSource {self.raw_dir}")
587 if self.raw_dir is None:
588 log.error("Raw data directory must be set to load raw data")
589 sys.exit(1)
591 if Path(self.raw_dir).is_dir():
592 log.error(f"{self.raw_dir=} must be a parquet filepath")
593 sys.exit(1)
595 if len(run_numbers) != 1:
596 log.error("Only one run number supported for direct parquet files.")
597 sys.exit(1)
599 run_number = run_numbers[0]
600 # maybe something for later
601 # fpaths = []
602 # for run_number in run_numbers:
603 # fpaths += glob(str(self.raw_path)+f"/*{run_number}*.parquet") + glob(str(self.raw_path)+f"/*{run_number}*/*.parquet") # noqa: E501
604 raw_data = (
605 pl.scan_parquet(self.raw_dir)
606 .collect()
607 .filter(
608 pl.col("samples").list.len() != 0,
609 )
610 )
612 # Add Null hps_ps_gain to handle old data without this column
613 if "hps_ps_gain" not in raw_data.columns:
614 raw_data = raw_data.with_columns(pl.lit(None).alias("hps_ps_gain"))
616 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode
617 raw_data = raw_data.with_columns(
618 pl.lit(run_number).alias("run_number"),
619 pl.when(pl.col("alfe_mode").is_not_null())
620 .then(pl.col("alfe_mode"))
621 .otherwise(pl.col("hps_ps_gain"))
622 .alias("pas_mode"),
623 )
625 if require_positive:
626 raw_data = raw_data.filter(pl.col("samples").list.min() > 0)
627 if require_unsaturated:
628 raw_data = raw_data.filter(pl.col("samples").list.max() < 2**15)
629 if require_nonempty:
630 raw_data = raw_data.filter(pl.col("samples").list.len() != 0)
632 if "board_version" not in raw_data.columns:
633 raw_data = raw_data.with_columns(pl.lit("").alias("board_version"))
635 if raw_data.is_empty():
636 log.error(f"No raw data for {run_numbers=} found in {self.raw_dir}, exiting...")
637 sys.exit(1)
639 raw_data = raw_data.with_columns(
640 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC")
641 )
642 if len(ignore_boards) > 0:
643 log.warning(f"Will skip loading boards {ignore_boards}")
644 return raw_data.filter(~pl.col("board_id").is_in(ignore_boards))
645 else:
646 return raw_data
648 def load_derived_data(
649 self,
650 run_number: int,
651 meas_type: str,
652 ) -> pl.DataFrame:
653 log.debug(f"Loading derived data from {self.derived_path}")
654 if self.derived_path is None:
655 log.error("Derived data path must be set to load derived data")
656 sys.exit(1)
657 derived_df = pl.read_parquet(self.derived_path).filter(pl.col("run_number") == run_number)
659 if derived_df.is_empty():
660 log.error(f"No derived {meas_type} data for run {run_number} in {self.derived_path}, exiting...")
661 log.error("Have you run [yellow]calc-save-runs[/yellow]?")
662 sys.exit(1)
664 return derived_df
666 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame:
667 if self.raw_dir is None:
668 log.error("Raw data directory must be set")
669 sys.exit(1)
671 filters = []
672 if run_number is not None:
673 filters.append(pl.col("run_number") == run_number)
675 if meas_type is not None:
676 filters.append(pl.col("meas_type") == meas_type)
678 df_lazy = pl.scan_parquet(str(self.raw_dir))
679 if filters:
680 df_lazy = df_lazy.filter(filters)
682 if "board_version" in df_lazy.collect_schema().names():
683 return (
684 df_lazy.select("board_id", "board_version", pl.col("board_variant").alias("board_type"))
685 .unique()
686 .collect()
687 )
688 else:
689 # BNL data doesn't have board_version
690 return df_lazy.select("board_id").unique().collect()
692 def check_run_exists(self, run_number: int) -> bool:
693 """
694 Does nothing for direct parquet files
695 """
696 _ = run_number
697 return True
699 def load_lab_environment_data(self) -> pl.DataFrame:
700 # Allow for reading parquet file direct to help with tests
701 if str(self.crate_lab_path).endswith(".parquet"):
702 lab_env_data = pl.read_parquet(self.crate_lab_path)
703 else:
704 lab_env_data = self.delta_source.load_lab_environment_data()
706 if lab_env_data.is_empty():
707 log.error(f"No lab environment data in {self.crate_lab_path} or {self.test_stand_path}, exiting...")
708 sys.exit(1)
710 return lab_env_data
712 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame:
713 if not self.monitoring_dir:
714 log.error("monitoring_dir must be set to load monitoring data")
715 sys.exit(1)
717 if str(self.monitoring_dir).endswith(".parquet"):
718 monitoring_df = pl.read_parquet(self.monitoring_dir).filter(pl.col("run_number").is_in(run_numbers))
719 else:
720 monitoring_df = self.delta_source.load_monitoring_data(*run_numbers)
722 if monitoring_df.is_empty():
723 log.error(f"No monitoring data for runs {run_numbers} in {self.monitoring_dir}, exiting...")
724 sys.exit(1)
726 return monitoring_df
729class HDF5Source(DataSource):
730 def __init__(
731 self,
732 raw_data_dir: Path,
733 derived_dir: Optional[Path],
734 crate_lab_path: Path = Path("/data/feb2/lab-env/env_monitoring_crate_lab.csv"),
735 test_stand_path: Path = Path("/data/feb2/lab-env/env_monitoring_test_stand.csv"),
736 ) -> None:
737 self.raw_dir: Path = raw_data_dir
738 self.derived_dir = derived_dir
739 self.crate_lab_path = crate_lab_path
740 self.test_stand_path = test_stand_path
741 self.delta_source = DeltaSource(
742 raw_data_dir / "samples",
743 derived_dir,
744 crate_lab_path=crate_lab_path,
745 test_stand_path=test_stand_path,
746 )
748 if self.derived_dir is not None and not self.derived_dir.exists():
749 self.derived_dir.mkdir(exist_ok=True)
750 os.chmod(self.derived_dir, 0o775)
752 @staticmethod
753 def get_dataset_keys(f):
754 import h5py # type: ignore
756 keys = []
757 f.visit(lambda key: keys.append(key) if isinstance(f[key], h5py.Dataset) else None)
758 return keys
760 def load_raw_data(
761 self,
762 *run_numbers: int,
763 require_positive: bool = False,
764 require_unsaturated: bool = False,
765 require_nonempty: bool = True,
766 ignore_boards: List[str] = [],
767 ) -> pl.DataFrame:
768 import h5py
770 log.info("Loading raw data from HDF5Source")
772 if len(run_numbers) != 1:
773 log.critical("Only single run numbers implemented for HDF5, exiting...")
774 sys.exit(1)
775 run_number = run_numbers[0]
776 filename = str(self.raw_dir / f"run{run_number:04d}.hdf5")
777 try:
778 _ = h5py.File(filename)
779 except FileNotFoundError:
780 log.critical(f"Could not find input file {filename}")
781 sys.exit(1)
783 samples_dict: Dict[str, List[Any]] = defaultdict(list)
784 with h5py.File(filename) as f:
785 dataset_keys = self.get_dataset_keys(f)
786 for dataset_key in dataset_keys:
787 key_list = dataset_key.split("/")
788 if len(key_list) != 4:
789 continue
790 measurement, channel, gain, samples = key_list
791 for key, value in f[measurement].attrs.items():
792 if value == "":
793 samples_dict[key].append(None)
794 elif key == "alfe_mode" and value == -99:
795 samples_dict[key].append(None)
796 elif key == "awg_amp":
797 samples_dict[key].append(float(value))
798 elif key == "board_id":
799 samples_dict[key].append(str(value))
800 elif key == "timestamp":
801 samples_dict[key].append(
802 datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f").replace(microsecond=0)
803 )
804 elif key == "meas_chan":
805 samples_dict[key].append(int(value[-3:]))
806 samples_dict["is_pulsed"].append(int(channel[-3:]) == int(value[-3:]))
807 else:
808 samples_dict[key].append(value)
809 meas_group = cast(h5py.Group, f[measurement])
810 for key, value in meas_group[channel].attrs.items():
811 # is_pulsed is filled based on meas_chan
812 if key == "is_pulsed":
813 continue
814 samples_dict[key].append(value)
815 samples_dict["measurement"].append(int(measurement.split("_")[1]))
816 samples_dict["channel"].append(int(channel[-3:]))
817 samples_dict["gain"].append(gain)
818 meas_chan_group = cast(h5py.Group, meas_group[channel])
819 meas_chan_gain_group = cast(h5py.Group, meas_chan_group[gain])
820 samples_dataset = cast(h5py.Dataset, meas_chan_gain_group[samples])
821 samples_dict["samples"].append(samples_dataset[()])
823 raw_data = pl.DataFrame(samples_dict).filter(pl.col("samples").list.len() != 0)
824 if require_positive:
825 raw_data = raw_data.filter(pl.col("samples").list.min() > 0)
826 if require_unsaturated:
827 raw_data = raw_data.filter(pl.col("samples").list.max() < 2**15)
828 if require_nonempty:
829 raw_data = raw_data.filter(pl.col("samples").list.len() != 0)
830 raw_data = raw_data.with_columns(
831 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC")
832 )
833 print(f"HERE!! raw_data timestamp: {raw_data.select('timestamp').head(1)}")
834 if len(ignore_boards) > 0:
835 log.warning(f"Will skip loading boards {ignore_boards}")
836 return raw_data.filter(~pl.col("board_id").is_in(ignore_boards))
837 else:
838 return raw_data
840 def load_derived_data(self, run_number: int, meas_type: str) -> pl.DataFrame:
841 return self.delta_source.load_derived_data(run_number, meas_type)
843 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame:
844 return self.delta_source.load_coherent_noise_data(run_number)
846 def load_frame_data(
847 self, *run_numbers: int, reject_single_adc: bool = False, non_empty: bool = True
848 ) -> pl.DataFrame:
849 return self.delta_source.load_frame_data(*run_numbers, reject_single_adc=reject_single_adc, non_empty=non_empty)
851 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None:
852 self.delta_source.save_derived_data(derived_data, run_number, meas_type)
854 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None:
855 self.delta_source.save_coherent_noise_data(coherent_noise_data, run_number)
857 def check_run_exists(self, run_number: int) -> bool:
858 import h5py
860 filename = str(self.raw_dir / f"run{run_number:04d}.hdf5")
861 try:
862 _ = h5py.File(filename)
863 except FileNotFoundError:
864 log.error(f"Could not find HDF5 input file {filename} in {self.raw_dir}")
865 return False
866 return True
868 def check_measurement_exists(self, run_number: int, measurement: int) -> bool:
869 import h5py
871 filename = str(self.raw_dir / f"run{run_number:04d}.hdf5")
872 if not self.check_run_exists(run_number):
873 return False
874 with h5py.File(filename) as f:
875 measurements = f.keys()
876 if f"Measurement_{measurement:03d}" in measurements:
877 return True
878 else:
879 return False
881 def get_runs_summary(self) -> pl.DataFrame:
882 raise NotImplementedError
884 def get_boards_summary(self) -> pl.DataFrame:
885 raise NotImplementedError
887 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame:
888 raise NotImplementedError
890 def get_runs_list(self) -> pl.DataFrame:
891 raise NotImplementedError
893 def get_channels_list(
894 self,
895 run_number: int,
896 meas_type: str,
897 require_pulsed: bool = False,
898 require_positive: bool = False,
899 require_unsaturated: bool = False,
900 require_nonempty: bool = True,
901 ) -> pl.DataFrame:
902 raise NotImplementedError
904 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame:
905 raise NotImplementedError
907 def load_lab_environment_data(self) -> pl.DataFrame:
908 return self.delta_source.load_lab_environment_data()
910 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame:
911 raise NotImplementedError
913 def get_measurements_list(self, run_number: int) -> List[int]:
914 raise NotImplementedError
916 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]:
917 raise NotImplementedError
919 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame:
920 raise NotImplementedError
922 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int:
923 raise NotImplementedError
926class SQLSource(DataSource):
927 def __init__(self, uri: str, derived_dir: Optional[Path] = None) -> None:
928 self.uri = uri
929 self.derived_dir = derived_dir
931 self.delta_source = DeltaSource(None, derived_dir)
933 if derived_dir is not None and not derived_dir.exists():
934 derived_dir.mkdir(exist_ok=True)
935 os.chmod(derived_dir, 0o775)
937 def load_raw_data(
938 self,
939 *run_numbers: int,
940 require_positive: bool = False,
941 require_unsaturated: bool = False,
942 require_nonempty: bool = True,
943 ignore_boards: List[str] = [],
944 ) -> pl.DataFrame:
945 log.info("Loading raw data from SQLSource")
947 run_number_string = ",".join(f"'{r}'" for r in run_numbers)
948 ignore_boards_string = ",".join(f"'{b}'" for b in ignore_boards)
950 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode
951 query = f"""
952 SELECT
953 m.*,
954 s.channel,
955 s.gain,
956 s.is_pulsed,
957 s.samples,
958 b.board_type as board_variant,
959 b.board_version as board_version,
960 r.githash,
961 COALESCE(m.alfe_mode, m.hps_ps_gain) AS pas_mode
962 FROM runs r
963 JOIN measurements m
964 ON r.run_number = m.run_number
965 JOIN samples s
966 ON m.id = s.measurement_id
967 JOIN boards b
968 ON m.board_id = b.board_id
969 WHERE m.run_number in ({run_number_string})
970 """
971 if len(ignore_boards) > 0:
972 log.warning(f"Will skip loading boards {ignore_boards}")
973 query += f"""
974 AND b.board_id NOT IN ({ignore_boards_string})
975 """
977 if require_positive:
978 query += "\nAND samples_min > 0"
979 if require_unsaturated:
980 query += f"\nAND samples_max < {2**15}"
981 if require_nonempty:
982 query += "\nAND samples_len > 0"
984 df = pl.read_database_uri(query, self.uri, partition_on="channel", partition_num=16)
985 df = df.rename({"measurement_number": "measurement", "measurement_timestamp": "timestamp"}).drop("id")
986 # Because the data is being loaded from multiple threads, sorting must be down outside of the SQL query
987 df = df.sort(by=["run_number", "channel", "gain"])
988 return df
990 def load_derived_data(self, run_number: int, meas_type: str) -> pl.DataFrame:
991 return self.delta_source.load_derived_data(run_number, meas_type)
993 def load_coherent_noise_data(self, run_number: int) -> pl.DataFrame:
994 return self.delta_source.load_coherent_noise_data(run_number)
996 def load_frame_data(
997 self, *run_numbers: int, reject_single_adc: bool = False, non_empty: bool = True
998 ) -> pl.DataFrame:
999 run_number_string = ",".join(str(r) for r in run_numbers)
1000 query = f"""
1001 SELECT
1002 m.*,
1003 f.adc,
1004 f.channel,
1005 f.frame,
1006 h.felix_event_count,
1007 CAST(h.felix_bcid AS smallint[]) AS felix_bcid
1008 FROM runs r
1009 JOIN measurements m
1010 ON r.run_number = m.run_number
1011 JOIN frames f
1012 ON m.id = f.measurement_id
1013 JOIN felix_headers h
1014 ON m.id = h.measurement_id
1015 WHERE r.run_number in ({run_number_string})
1016 """
1017 if reject_single_adc:
1018 query += f"\nAND frame_max < {1e5}"
1019 if non_empty:
1020 query += "\nAND frame_len != 0"
1021 df = pl.read_database_uri(query, self.uri, partition_on="adc", partition_num=16)
1022 df = (
1023 df.drop("id")
1024 .pivot(on="channel", values="frame")
1025 .rename(
1026 {
1027 "1": "frame1",
1028 "8": "frame8",
1029 "measurement_number": "measurement",
1030 "measurement_timestamp": "timestamp",
1031 }
1032 )
1033 )
1034 return df.sort(by=["run_number", "measurement", "adc"])
1036 def save_derived_data(self, derived_data: pl.DataFrame, run_number: int, meas_type: str) -> None:
1037 self.delta_source.save_derived_data(derived_data, run_number, meas_type)
1039 def save_coherent_noise_data(self, coherent_noise_data: pl.DataFrame, run_number: int) -> None:
1040 self.delta_source.save_coherent_noise_data(coherent_noise_data, run_number)
1042 def check_run_exists(self, run_number: int) -> bool:
1043 query = f"SELECT run_number FROM runs WHERE run_number = {run_number}"
1044 df = pl.read_database_uri(query, self.uri)
1045 return not df.is_empty()
1047 def check_measurement_exists(self, run_number: int, measurement: int) -> bool:
1048 query = f"""
1049 SELECT run_number
1050 FROM measurements
1051 WHERE run_number = {run_number}
1052 AND measurement_number = {measurement}
1053 """
1054 df = pl.read_database_uri(query, self.uri)
1055 return not df.is_empty()
1057 def get_runs_summary(self) -> pl.DataFrame:
1058 # Merge alfe_mode and hps gain into pas_mode, preferring alfe_mode
1059 query = """
1060 SELECT DISTINCT
1061 r.run_number,
1062 array_agg(DISTINCT m.board_id ORDER BY m.board_id) AS board_id,
1063 r.run_timestamp AS timestamp,
1064 m.meas_type,
1065 array_agg(DISTINCT m.att_val ORDER BY m.att_val) AS att_val,
1066 COALESCE(m.alfe_mode, m.hps_ps_gain) AS pas_mode,
1067 array_agg(DISTINCT s.channel ORDER BY s.channel) AS channel
1068 FROM runs r
1069 JOIN measurements m
1070 ON r.run_number = m.run_number
1071 JOIN samples s
1072 ON m.id = s.measurement_id
1073 GROUP BY r.run_number, r.run_timestamp, m.meas_type, COALESCE(m.alfe_mode, m.hps_ps_gain)
1074 ORDER BY r.run_number DESC
1075 """
1076 return pl.read_database_uri(query, self.uri)
1078 def get_boards_summary(self) -> pl.DataFrame:
1079 query = """
1080 SELECT DISTINCT
1081 r.run_number,
1082 m.board_id,
1083 r.run_timestamp as timestamp
1084 FROM runs r
1085 JOIN measurements m
1086 USING (run_number)
1087 ORDER BY run_number DESC
1088 """
1089 return pl.read_database_uri(query, self.uri)
1091 def get_boards_list(self, run_number: Optional[int] = None, meas_type: Optional[str] = None) -> pl.DataFrame:
1092 query = """
1093 SELECT DISTINCT
1094 boards.board_id,
1095 board_version,
1096 board_type
1097 FROM boards
1098 JOIN measurements m
1099 ON boards.board_id = m.board_id
1100 """
1102 if run_number is not None:
1103 query += f"""
1104 WHERE m.run_number = {run_number}
1105 """
1107 if meas_type is not None:
1108 query += f"""
1109 {"AND" if run_number is not None else "WHERE"} meas_type = '{meas_type}'
1110 """
1111 query += """
1112 ORDER BY boards.board_id
1113 """
1115 return pl.read_database_uri(query, self.uri)
1117 def get_runs_list(self) -> pl.DataFrame:
1118 query = """
1119 SELECT DISTINCT
1120 run_number,
1121 meas_type
1122 FROM measurements
1123 ORDER BY run_number
1124 """
1125 return pl.read_database_uri(query, self.uri)
1127 def get_channels_list(
1128 self,
1129 run_number: int,
1130 meas_type: str,
1131 require_pulsed: bool = False,
1132 require_positive: bool = False,
1133 require_unsaturated: bool = False,
1134 require_nonempty: bool = True,
1135 ) -> pl.DataFrame:
1136 query = f"""
1137 SELECT DISTINCT s.channel
1138 FROM measurements m
1139 JOIN samples s
1140 ON m.id = s.measurement_id
1141 WHERE m.run_number = {run_number}
1142 AND m.meas_type = '{meas_type}'
1143 """
1145 if require_pulsed:
1146 query += "\nAND s.is_pulsed"
1147 if require_positive:
1148 query += "\nAND samples_min > 0"
1149 if require_unsaturated:
1150 query += f"\nAND samples_max < {2**15}"
1151 if require_nonempty:
1152 query += "\nAND samples_len > 0"
1154 query += "\nORDER BY channel"
1156 return pl.read_database_uri(query, self.uri)
1158 def get_run_boards_list(
1159 self,
1160 run_number: int,
1161 meas_type: str,
1162 ) -> pl.DataFrame:
1163 query = f"""
1164 SELECT DISTINCT board_id
1165 FROM measurements
1166 WHERE run_number = {run_number}
1167 AND meas_type = '{meas_type}'
1168 ORDER BY board_id
1169 """
1171 return pl.read_database_uri(query, self.uri)
1173 def get_amplitudes_list(self, run_number: int, meas_type: str) -> pl.DataFrame:
1174 return self.load_derived_data(run_number, meas_type).select(pl.col("amp").unique())
1176 def load_monitoring_data(self, *run_numbers: int) -> pl.DataFrame:
1177 run_number_string = ",".join(str(r) for r in run_numbers)
1178 query = f"""
1179 SELECT
1180 mon.monitor,
1181 mon.unit,
1182 mon.monitor_type,
1183 mon.group_name,
1184 mon.ideal_value,
1185 v.value,
1186 v.run_number,
1187 v.monitoring_timestamp AS timestamp,
1188 v.board_id,
1189 meas.measurement_number AS measurement
1190 FROM monitors mon
1191 JOIN monitor_values v
1192 ON mon.id = v.monitor_id
1193 LEFT OUTER JOIN measurements meas
1194 ON v.measurement_id = meas.id
1195 WHERE v.run_number IN ({run_number_string})
1196 """
1197 data = pl.read_database_uri(query, self.uri)
1198 data_with_timezone = data.with_columns(
1199 pl.col("timestamp").dt.replace_time_zone("America/New_York").dt.convert_time_zone("UTC")
1200 )
1201 return data_with_timezone
1203 def get_measurements_list(self, run_number: int) -> List[int]:
1204 query = f"""
1205 SELECT DISTINCT measurement_number AS measurement
1206 FROM measurements
1207 WHERE run_number = {run_number}
1208 ORDER BY measurement
1209 """
1210 return pl.read_database_uri(query, self.uri).to_series().to_list()
1212 def get_measurement_types(self, run_number: int, measurement: Optional[int] = None) -> List[str]:
1213 query = f"""
1214 SELECT DISTINCT meas_type
1215 FROM measurements
1216 WHERE run_number = {run_number}
1217 """
1219 if measurement is not None:
1220 query += f"\nAND measurement_number = {measurement}"
1222 measurement_types = pl.read_database_uri(query, self.uri).to_series().to_list()
1224 return measurement_types
1226 def get_bad_samples_check(self, run_number: int) -> pl.DataFrame:
1227 query = f"""
1228 SELECT
1229 m.board_id,
1230 s.channel,
1231 s.gain,
1232 s.samples_min,
1233 s.samples_max,
1234 s.samples_len
1235 FROM measurements m
1236 JOIN samples s
1237 ON m.id = s.measurement_id
1238 WHERE m.run_number = {run_number}
1239 """
1240 return pl.read_database_uri(query, self.uri)
1242 def load_lab_environment_data(self) -> pl.DataFrame:
1243 query = """
1244 SELECT datetime_utc AS timestamp, lab_name, humidity, pressure, lab_temp, crate_temp
1245 FROM lab_environment
1246 """
1247 return pl.read_database_uri(query, self.uri)
1249 def get_middle_attenuation_run(self, run_numbers: List[int]) -> int:
1250 run_number_string = ",".join(str(r) for r in run_numbers)
1251 attenuations_string = ", ".join(str(a) for a in MIDDLE_ATTENUATIONS)
1252 query = f"""
1253 SELECT DISTINCT
1254 run_number
1255 FROM measurements
1256 WHERE att_val IN ({attenuations_string})
1257 AND run_number IN ({run_number_string})
1258 LIMIT 1
1259 """
1260 df = pl.read_database_uri(query, self.uri)
1261 if df.is_empty():
1262 log.error(f"Did not find a run with att_val in {MIDDLE_ATTENUATIONS} in runs {run_numbers}")
1263 sys.exit(1)
1264 min_att_run_number = df["run_number"][0]
1265 return min_att_run_number