Coverage for polars_analysis / utils.py: 40%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 14:47 -0400

1import json 

2import logging 

3import os 

4import re 

5import sys 

6from itertools import combinations 

7from pathlib import Path 

8from typing import Any, Dict, List, Literal, Optional, Tuple 

9 

10import polars as pl 

11import requests 

12from dotenv import dotenv_values 

13 

14from polars_analysis import data_sources 

15 

16# Instantiate logger 

17log = logging.getLogger(__name__) 

18 

19 

20def parse_skip_channels(input: List[str]) -> Optional[Tuple[List[int], List[int]]]: 

21 # Parse channels to skip 

22 skip_channels_lo: List[int] = [] 

23 skip_channels_hi: List[int] = [] 

24 for i in input: 

25 regex_match = re.findall(r"[0-9]+", i) 

26 if not regex_match: 

27 return None 

28 if "l" in i.lower(): 

29 skip_channels_lo.append(int(regex_match[0])) 

30 if "h" in i.lower(): 

31 skip_channels_hi.append(int(regex_match[0])) 

32 if "l" not in i.lower() and "h" not in i.lower(): 

33 skip_channels_lo.append(int(regex_match[0])) 

34 skip_channels_hi.append(int(regex_match[0])) 

35 if len(skip_channels_lo) > 0: 

36 log.info(f"Skipping lo channels: {skip_channels_lo}") 

37 if len(skip_channels_hi) > 0: 

38 log.info(f"Skipping hi channels: {skip_channels_hi}") 

39 

40 return skip_channels_lo, skip_channels_hi 

41 

42 

43def get_columns_or_exit(df: pl.DataFrame, columns: List[str]) -> pl.DataFrame: 

44 try: 

45 df = df.select(columns) 

46 except pl.exceptions.ColumnNotFoundError: 

47 log.critical("Could not find all needed columns in dataframe") 

48 log.critical(f"Expected: {columns}") 

49 log.critical(f"Found: {df.columns}") 

50 log.critical(f"Missing: {set(columns).difference(df.columns)}") 

51 sys.exit(1) 

52 return df 

53 

54 

55def check_missing_www_runs(run_numbers: List[int], www_dir: Path) -> List[int]: 

56 # collect runs in www dir 

57 www_runs: List[int] = [] 

58 for d in os.listdir(www_dir): 

59 if "html" not in d.lower(): 

60 continue 

61 m = re.search(r"\d+", d) 

62 if m: 

63 www_runs.append(int(m.group(0))) 

64 

65 return sorted(list(set(run_numbers).difference(set(www_runs)))) 

66 

67 

68def clear_run_info(board_id: str, plot_dir: Path) -> None: 

69 json_path = os.path.join(plot_dir, f"run_info_{board_id}.json") 

70 if os.path.exists(json_path): 

71 os.remove(json_path) 

72 

73 

74def add_run_info(name: str, info, board_id: str, plot_dir: Path, print_to_website: bool = True) -> None: 

75 json_path = os.path.join(plot_dir, f"run_info_{board_id}.json") 

76 if os.path.exists(json_path): 

77 try: 

78 with open(json_path, "r") as f: 

79 run_info: Dict[str, Any] = json.load(f) 

80 except json.decoder.JSONDecodeError: 

81 log.warning(f"Could not decode {json_path}, creating new file") 

82 run_info = {} 

83 else: 

84 run_info = {} 

85 

86 run_info[name] = {"info": info, "print_to_website": print_to_website} 

87 

88 with open(json_path, "w") as f: 

89 json.dump(run_info, f, indent=4) 

90 

91 

92def get_run_info_str(json_file_path: Path) -> str: 

93 run_info = "" 

94 if os.path.exists(json_file_path): 

95 with open(json_file_path, "r") as f: 

96 json_data = json.load(f) 

97 for key, value in json_data.items(): 

98 try: 

99 if value["print_to_website"]: 

100 run_info += f"{key}: {value['info']}<br>" 

101 except KeyError: 

102 log.warning(f"Key 'print_to_website' not found in {key}") 

103 continue 

104 else: 

105 log.warning(f"Could not find {json_file_path}") 

106 return run_info 

107 

108 

109def check_bad_samples(loader: data_sources.DataSource, run_number: int, run_plot_dir: Path): 

110 # create a blank run_info.json file 

111 with open(os.path.join(run_plot_dir, "run_info.json"), "w") as f: 

112 json.dump({}, f, indent=4) 

113 

114 df_raw_samples = loader.get_bad_samples_check(run_number) 

115 

116 for board_id in df_raw_samples["board_id"].unique().to_list(): 

117 saturated_gain_channels = [ 

118 f"{row['channel']}-{row['gain']}" 

119 for row in df_raw_samples.filter(pl.col("samples_max") >= 2**15) 

120 .filter(pl.col("board_id") == board_id) 

121 .select(pl.col("channel"), pl.col("gain")) 

122 .unique(subset="channel") 

123 .sort("channel") 

124 .iter_rows(named=True) 

125 ] 

126 if len(saturated_gain_channels) > 0: 

127 log.warning(f"Saturated Channels: {saturated_gain_channels}") 

128 add_run_info("Saturated Channels", saturated_gain_channels, board_id, run_plot_dir, True) 

129 

130 negative_gain_channels = [ 

131 f"{row['channel']}-{row['gain']}" 

132 for row in df_raw_samples.filter(pl.col("samples_min") < 0) 

133 .filter(pl.col("board_id") == board_id) 

134 .select(pl.col("channel"), pl.col("gain")) 

135 .unique(subset="channel") 

136 .sort("channel") 

137 .iter_rows(named=True) 

138 ] 

139 if len(negative_gain_channels) > 0: 

140 log.warning(f"Negative (Saturated?) Channels: {negative_gain_channels}") 

141 add_run_info("Negative (Saturated?) Channels", negative_gain_channels, board_id, run_plot_dir, True) 

142 

143 """ 

144 Probably don't want this for now as it will flag single ADC 

145 and crosstalk runs until something more specific is done 

146 """ 

147 # empty_gain_channels = [ 

148 # f"{row['channel']}-{row['gain']}" 

149 # for row in df_raw_samples.filter(pl.col("samples_len") == 0) 

150 # .select(pl.col("channel"), pl.col("gain")) 

151 # .iter_rows(named=True) 

152 # ] 

153 # if len(empty_gain_channels) > 0: 

154 # log.warning(f"Empty Channels: {empty_gain_channels}") 

155 # add_run_info("Empty Channels", empty_gain_channels, board_id, run_plot_dir, True) 

156 

157 zero_gain_channels = [ 

158 f"{row['channel']}-{row['gain']}" 

159 for row in df_raw_samples.filter(pl.col("samples_min") == 0) 

160 .filter(pl.col("board_id") == board_id) 

161 .select(pl.col("channel"), pl.col("gain")) 

162 .unique(subset="channel") 

163 .sort("channel") 

164 .iter_rows(named=True) 

165 ] 

166 if len(zero_gain_channels) > 0: 

167 log.warning(f"Zero Value Samples Channels: {zero_gain_channels}") 

168 add_run_info("Zero Value Samples Channels", zero_gain_channels, board_id, run_plot_dir, True) 

169 

170 

171def get_board_combinations(boards: List[str], ignore_boards: List[str] = []) -> List[List[str]]: 

172 """ 

173 Get all combinations of boards with the first entry being all boards together. 

174 """ 

175 boards_sorted = sorted(boards) 

176 if len(boards_sorted) == 1: 

177 return [] 

178 elif len(boards_sorted) == 2: 

179 return [boards_sorted] 

180 elif len(boards_sorted) > 3 and len(ignore_boards) > 0: 

181 log.warning(f"Ignoring boards {ignore_boards} in combinations") 

182 board_combinations = [ 

183 boards_sorted, 

184 [board for board in boards_sorted if board not in ignore_boards], 

185 *(list(c) for c in combinations(boards_sorted, 2)), 

186 ] 

187 return board_combinations 

188 else: 

189 return [boards_sorted, *(list(c) for c in combinations(boards_sorted, 2))] 

190 

191 

192def notify_mattermost(url: str, msg: str): 

193 j = { 

194 "attachments": [ 

195 { 

196 "title": "Analysis Webserver", 

197 "title_link": "https://www.nevis.columbia.edu/feb2/FEB2/feb2_home.html", 

198 "text": msg, 

199 } 

200 ] 

201 } 

202 

203 try: 

204 r = requests.post(url, data=json.dumps(j)) 

205 r.raise_for_status() 

206 except requests.exceptions.HTTPError as http_err: 

207 log.error(f"Mattermost HTTP error occurred: {http_err}") 

208 except Exception as err: 

209 log.error(f"Mattermost: other error occurred: {err}") 

210 

211 

212def non_nevis_config_overrider(data_location: Literal["bnl", "cern"]) -> tuple: 

213 bnl_data = False 

214 

215 if data_location.lower() == "bnl": 

216 bnl_data = True 

217 log.warning("bnl_data flag does not overwite env variables loaded in sub function calls.") 

218 if Path(".env-bnl").exists(): 

219 config = dotenv_values(".env-bnl") 

220 if config["DATA_DIR"]: 

221 data_dir = Path(config["DATA_DIR"]) 

222 else: 

223 log.error("DATA_DIR not set in .env-bnl") 

224 raise Exception("DATA_DIR not set in .env-bnl") 

225 if config["DERIVED_DIR"]: 

226 derived_dir = Path(config["DERIVED_DIR"]) 

227 else: 

228 log.error("DERIVED_DIR not set in .env-bnl") 

229 raise Exception("DERIVED_DIR not set in .env-bnl") 

230 if config["RUNS_PLOT_DIR"]: 

231 plot_dir = Path(config["RUNS_PLOT_DIR"]) 

232 else: 

233 log.error("RUNS_PLOT_DIR not set in .env-bnl") 

234 raise Exception("RUNS_PLOT_DIR not set in .env-bnl") 

235 if config["RENDERED_DIR"]: 

236 rendered_dir = Path(config["RENDERED_DIR"]) 

237 else: 

238 log.error("RENDERED_DIR not set in .env-bnl") 

239 raise Exception("RENDERED_DIR not set in .env-bnl") 

240 if config["FRAME_DIR"]: 

241 frame_dir = Path(config["FRAME_DIR"]) 

242 else: 

243 log.error("FRAME_DIR not set in .env-bnl") 

244 raise Exception("FRAME_DIR not set in .env-bnl") 

245 if config["MONITORING_DIR"]: 

246 monitoring_dir = Path(config["MONITORING_DIR"]) 

247 else: 

248 log.error("MONITORING_DIR not set in .env-bnl") 

249 raise Exception("MONITORING_DIR not set in .env-bnl") 

250 postgres_uri = None 

251 postgres_prod_uri = None 

252 upload_to_prod_db = False 

253 data_source = "deltalake" 

254 else: 

255 log.error("bnl_data is true, but .env-bnl does not exist") 

256 raise FileNotFoundError(".env-bnl") 

257 elif data_location.lower() == "cern": 

258 log.warning("cern_data flag does not overwite env variables loaded in sub function calls.") 

259 if Path(".env-cern").exists(): 

260 config = dotenv_values(".env-cern") 

261 if config["DATA_DIR"]: 

262 data_dir = Path(config["DATA_DIR"]) 

263 else: 

264 log.error("DATA_DIR not set in .env-cern") 

265 raise Exception("DATA_DIR not set in .env-cern") 

266 if config["DERIVED_DIR"]: 

267 derived_dir = Path(config["DERIVED_DIR"]) 

268 else: 

269 log.error("DERIVED_DIR not set in .env-cern") 

270 raise Exception("DERIVED_DIR not set in .env-cern") 

271 if config["RUNS_PLOT_DIR"]: 

272 plot_dir = Path(config["RUNS_PLOT_DIR"]) 

273 else: 

274 log.error("RUNS_PLOT_DIR not set in .env-cern") 

275 raise Exception("RUNS_PLOT_DIR not set in .env-cern") 

276 if config["RENDERED_DIR"]: 

277 rendered_dir = Path(config["RENDERED_DIR"]) 

278 else: 

279 log.error("RENDERED_DIR not set in .env-cern") 

280 raise Exception("RENDERED_DIR not set in .env-cern") 

281 if config["FRAME_DIR"]: 

282 frame_dir = Path(config["FRAME_DIR"]) 

283 else: 

284 log.error("FRAME_DIR not set in .env-cern") 

285 raise Exception("FRAME_DIR not set in .env-cern") 

286 if config["MONITORING_DIR"]: 

287 monitoring_dir = Path(config["MONITORING_DIR"]) 

288 else: 

289 log.error("MONITORING_DIR not set in .env-cern") 

290 raise Exception("MONITORING_DIR not set in .env-cern") 

291 postgres_uri = None 

292 postgres_prod_uri = None 

293 upload_to_prod_db = False 

294 data_source = "deltalake" 

295 else: 

296 log.error("cern_data is true, but .env-cern does not exist") 

297 raise FileNotFoundError(".env-cern") 

298 else: 

299 raise Exception(f"Incorrect data_location {data_location} for config override.") 

300 

301 return ( 

302 bnl_data, 

303 data_dir, 

304 derived_dir, 

305 plot_dir, 

306 rendered_dir, 

307 frame_dir, 

308 monitoring_dir, 

309 postgres_uri, 

310 postgres_prod_uri, 

311 upload_to_prod_db, 

312 data_source, 

313 )