Coverage for polars_analysis / utils.py: 40%
193 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 14:47 -0400
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 14:47 -0400
1import json
2import logging
3import os
4import re
5import sys
6from itertools import combinations
7from pathlib import Path
8from typing import Any, Dict, List, Literal, Optional, Tuple
10import polars as pl
11import requests
12from dotenv import dotenv_values
14from polars_analysis import data_sources
16# Instantiate logger
17log = logging.getLogger(__name__)
20def parse_skip_channels(input: List[str]) -> Optional[Tuple[List[int], List[int]]]:
21 # Parse channels to skip
22 skip_channels_lo: List[int] = []
23 skip_channels_hi: List[int] = []
24 for i in input:
25 regex_match = re.findall(r"[0-9]+", i)
26 if not regex_match:
27 return None
28 if "l" in i.lower():
29 skip_channels_lo.append(int(regex_match[0]))
30 if "h" in i.lower():
31 skip_channels_hi.append(int(regex_match[0]))
32 if "l" not in i.lower() and "h" not in i.lower():
33 skip_channels_lo.append(int(regex_match[0]))
34 skip_channels_hi.append(int(regex_match[0]))
35 if len(skip_channels_lo) > 0:
36 log.info(f"Skipping lo channels: {skip_channels_lo}")
37 if len(skip_channels_hi) > 0:
38 log.info(f"Skipping hi channels: {skip_channels_hi}")
40 return skip_channels_lo, skip_channels_hi
43def get_columns_or_exit(df: pl.DataFrame, columns: List[str]) -> pl.DataFrame:
44 try:
45 df = df.select(columns)
46 except pl.exceptions.ColumnNotFoundError:
47 log.critical("Could not find all needed columns in dataframe")
48 log.critical(f"Expected: {columns}")
49 log.critical(f"Found: {df.columns}")
50 log.critical(f"Missing: {set(columns).difference(df.columns)}")
51 sys.exit(1)
52 return df
55def check_missing_www_runs(run_numbers: List[int], www_dir: Path) -> List[int]:
56 # collect runs in www dir
57 www_runs: List[int] = []
58 for d in os.listdir(www_dir):
59 if "html" not in d.lower():
60 continue
61 m = re.search(r"\d+", d)
62 if m:
63 www_runs.append(int(m.group(0)))
65 return sorted(list(set(run_numbers).difference(set(www_runs))))
68def clear_run_info(board_id: str, plot_dir: Path) -> None:
69 json_path = os.path.join(plot_dir, f"run_info_{board_id}.json")
70 if os.path.exists(json_path):
71 os.remove(json_path)
74def add_run_info(name: str, info, board_id: str, plot_dir: Path, print_to_website: bool = True) -> None:
75 json_path = os.path.join(plot_dir, f"run_info_{board_id}.json")
76 if os.path.exists(json_path):
77 try:
78 with open(json_path, "r") as f:
79 run_info: Dict[str, Any] = json.load(f)
80 except json.decoder.JSONDecodeError:
81 log.warning(f"Could not decode {json_path}, creating new file")
82 run_info = {}
83 else:
84 run_info = {}
86 run_info[name] = {"info": info, "print_to_website": print_to_website}
88 with open(json_path, "w") as f:
89 json.dump(run_info, f, indent=4)
92def get_run_info_str(json_file_path: Path) -> str:
93 run_info = ""
94 if os.path.exists(json_file_path):
95 with open(json_file_path, "r") as f:
96 json_data = json.load(f)
97 for key, value in json_data.items():
98 try:
99 if value["print_to_website"]:
100 run_info += f"{key}: {value['info']}<br>"
101 except KeyError:
102 log.warning(f"Key 'print_to_website' not found in {key}")
103 continue
104 else:
105 log.warning(f"Could not find {json_file_path}")
106 return run_info
109def check_bad_samples(loader: data_sources.DataSource, run_number: int, run_plot_dir: Path):
110 # create a blank run_info.json file
111 with open(os.path.join(run_plot_dir, "run_info.json"), "w") as f:
112 json.dump({}, f, indent=4)
114 df_raw_samples = loader.get_bad_samples_check(run_number)
116 for board_id in df_raw_samples["board_id"].unique().to_list():
117 saturated_gain_channels = [
118 f"{row['channel']}-{row['gain']}"
119 for row in df_raw_samples.filter(pl.col("samples_max") >= 2**15)
120 .filter(pl.col("board_id") == board_id)
121 .select(pl.col("channel"), pl.col("gain"))
122 .unique(subset="channel")
123 .sort("channel")
124 .iter_rows(named=True)
125 ]
126 if len(saturated_gain_channels) > 0:
127 log.warning(f"Saturated Channels: {saturated_gain_channels}")
128 add_run_info("Saturated Channels", saturated_gain_channels, board_id, run_plot_dir, True)
130 negative_gain_channels = [
131 f"{row['channel']}-{row['gain']}"
132 for row in df_raw_samples.filter(pl.col("samples_min") < 0)
133 .filter(pl.col("board_id") == board_id)
134 .select(pl.col("channel"), pl.col("gain"))
135 .unique(subset="channel")
136 .sort("channel")
137 .iter_rows(named=True)
138 ]
139 if len(negative_gain_channels) > 0:
140 log.warning(f"Negative (Saturated?) Channels: {negative_gain_channels}")
141 add_run_info("Negative (Saturated?) Channels", negative_gain_channels, board_id, run_plot_dir, True)
143 """
144 Probably don't want this for now as it will flag single ADC
145 and crosstalk runs until something more specific is done
146 """
147 # empty_gain_channels = [
148 # f"{row['channel']}-{row['gain']}"
149 # for row in df_raw_samples.filter(pl.col("samples_len") == 0)
150 # .select(pl.col("channel"), pl.col("gain"))
151 # .iter_rows(named=True)
152 # ]
153 # if len(empty_gain_channels) > 0:
154 # log.warning(f"Empty Channels: {empty_gain_channels}")
155 # add_run_info("Empty Channels", empty_gain_channels, board_id, run_plot_dir, True)
157 zero_gain_channels = [
158 f"{row['channel']}-{row['gain']}"
159 for row in df_raw_samples.filter(pl.col("samples_min") == 0)
160 .filter(pl.col("board_id") == board_id)
161 .select(pl.col("channel"), pl.col("gain"))
162 .unique(subset="channel")
163 .sort("channel")
164 .iter_rows(named=True)
165 ]
166 if len(zero_gain_channels) > 0:
167 log.warning(f"Zero Value Samples Channels: {zero_gain_channels}")
168 add_run_info("Zero Value Samples Channels", zero_gain_channels, board_id, run_plot_dir, True)
171def get_board_combinations(boards: List[str], ignore_boards: List[str] = []) -> List[List[str]]:
172 """
173 Get all combinations of boards with the first entry being all boards together.
174 """
175 boards_sorted = sorted(boards)
176 if len(boards_sorted) == 1:
177 return []
178 elif len(boards_sorted) == 2:
179 return [boards_sorted]
180 elif len(boards_sorted) > 3 and len(ignore_boards) > 0:
181 log.warning(f"Ignoring boards {ignore_boards} in combinations")
182 board_combinations = [
183 boards_sorted,
184 [board for board in boards_sorted if board not in ignore_boards],
185 *(list(c) for c in combinations(boards_sorted, 2)),
186 ]
187 return board_combinations
188 else:
189 return [boards_sorted, *(list(c) for c in combinations(boards_sorted, 2))]
192def notify_mattermost(url: str, msg: str):
193 j = {
194 "attachments": [
195 {
196 "title": "Analysis Webserver",
197 "title_link": "https://www.nevis.columbia.edu/feb2/FEB2/feb2_home.html",
198 "text": msg,
199 }
200 ]
201 }
203 try:
204 r = requests.post(url, data=json.dumps(j))
205 r.raise_for_status()
206 except requests.exceptions.HTTPError as http_err:
207 log.error(f"Mattermost HTTP error occurred: {http_err}")
208 except Exception as err:
209 log.error(f"Mattermost: other error occurred: {err}")
212def non_nevis_config_overrider(data_location: Literal["bnl", "cern"]) -> tuple:
213 bnl_data = False
215 if data_location.lower() == "bnl":
216 bnl_data = True
217 log.warning("bnl_data flag does not overwite env variables loaded in sub function calls.")
218 if Path(".env-bnl").exists():
219 config = dotenv_values(".env-bnl")
220 if config["DATA_DIR"]:
221 data_dir = Path(config["DATA_DIR"])
222 else:
223 log.error("DATA_DIR not set in .env-bnl")
224 raise Exception("DATA_DIR not set in .env-bnl")
225 if config["DERIVED_DIR"]:
226 derived_dir = Path(config["DERIVED_DIR"])
227 else:
228 log.error("DERIVED_DIR not set in .env-bnl")
229 raise Exception("DERIVED_DIR not set in .env-bnl")
230 if config["RUNS_PLOT_DIR"]:
231 plot_dir = Path(config["RUNS_PLOT_DIR"])
232 else:
233 log.error("RUNS_PLOT_DIR not set in .env-bnl")
234 raise Exception("RUNS_PLOT_DIR not set in .env-bnl")
235 if config["RENDERED_DIR"]:
236 rendered_dir = Path(config["RENDERED_DIR"])
237 else:
238 log.error("RENDERED_DIR not set in .env-bnl")
239 raise Exception("RENDERED_DIR not set in .env-bnl")
240 if config["FRAME_DIR"]:
241 frame_dir = Path(config["FRAME_DIR"])
242 else:
243 log.error("FRAME_DIR not set in .env-bnl")
244 raise Exception("FRAME_DIR not set in .env-bnl")
245 if config["MONITORING_DIR"]:
246 monitoring_dir = Path(config["MONITORING_DIR"])
247 else:
248 log.error("MONITORING_DIR not set in .env-bnl")
249 raise Exception("MONITORING_DIR not set in .env-bnl")
250 postgres_uri = None
251 postgres_prod_uri = None
252 upload_to_prod_db = False
253 data_source = "deltalake"
254 else:
255 log.error("bnl_data is true, but .env-bnl does not exist")
256 raise FileNotFoundError(".env-bnl")
257 elif data_location.lower() == "cern":
258 log.warning("cern_data flag does not overwite env variables loaded in sub function calls.")
259 if Path(".env-cern").exists():
260 config = dotenv_values(".env-cern")
261 if config["DATA_DIR"]:
262 data_dir = Path(config["DATA_DIR"])
263 else:
264 log.error("DATA_DIR not set in .env-cern")
265 raise Exception("DATA_DIR not set in .env-cern")
266 if config["DERIVED_DIR"]:
267 derived_dir = Path(config["DERIVED_DIR"])
268 else:
269 log.error("DERIVED_DIR not set in .env-cern")
270 raise Exception("DERIVED_DIR not set in .env-cern")
271 if config["RUNS_PLOT_DIR"]:
272 plot_dir = Path(config["RUNS_PLOT_DIR"])
273 else:
274 log.error("RUNS_PLOT_DIR not set in .env-cern")
275 raise Exception("RUNS_PLOT_DIR not set in .env-cern")
276 if config["RENDERED_DIR"]:
277 rendered_dir = Path(config["RENDERED_DIR"])
278 else:
279 log.error("RENDERED_DIR not set in .env-cern")
280 raise Exception("RENDERED_DIR not set in .env-cern")
281 if config["FRAME_DIR"]:
282 frame_dir = Path(config["FRAME_DIR"])
283 else:
284 log.error("FRAME_DIR not set in .env-cern")
285 raise Exception("FRAME_DIR not set in .env-cern")
286 if config["MONITORING_DIR"]:
287 monitoring_dir = Path(config["MONITORING_DIR"])
288 else:
289 log.error("MONITORING_DIR not set in .env-cern")
290 raise Exception("MONITORING_DIR not set in .env-cern")
291 postgres_uri = None
292 postgres_prod_uri = None
293 upload_to_prod_db = False
294 data_source = "deltalake"
295 else:
296 log.error("cern_data is true, but .env-cern does not exist")
297 raise FileNotFoundError(".env-cern")
298 else:
299 raise Exception(f"Incorrect data_location {data_location} for config override.")
301 return (
302 bnl_data,
303 data_dir,
304 derived_dir,
305 plot_dir,
306 rendered_dir,
307 frame_dir,
308 monitoring_dir,
309 postgres_uri,
310 postgres_prod_uri,
311 upload_to_prod_db,
312 data_source,
313 )