Coverage for src/aquasense/hydroscat/hydroscat.py: 88%

142 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-20 10:23 +0000

1""" 

2HydroScat packet I/O, extraction, calibration class. 

3 

4See sections 9.5, 9.2.8, and 9.2.9 of User's Manual, Revision J. 

5""" 

6 

7import configparser 

8 

9from aquasense.common.sensor import SensorBase 

10from aquasense.hydroscat.calibrate import seconds_since_posix_epoch_to_excel_days 

11from aquasense.hydroscat.calibrate import temperature, depth, voltage, beta 

12from aquasense.hydroscat.extract_raw import extract_from_raw_line 

13 

14from collections import OrderedDict 

15 

16from datetime import datetime 

17 

18import logging 

19 

20from typing import Dict, List, TextIO, Tuple, Union 

21 

22class HydroScat(SensorBase): 

23 

24 def __init__(self, 

25 cal_path: str, 

26 in_out: TextIO, 

27 out: TextIO, 

28 sep: str, 

29 serial_mode: bool, 

30 burst_mode: bool, 

31 sleep_on_memory_full: bool, 

32 fluorescence_control: int, 

33 start_delay: float, 

34 warmup_time: float, 

35 burst_duration: float, 

36 burst_cycle: float, 

37 total_duration: float, 

38 log_period: float, 

39 output_cal_header: bool=False, 

40 logger: logging.Logger=None, 

41 verbose: bool=False): 

42 """Configure initial HydroScat state. 

43  

44 Args: 

45 cal_path: Path to a HydroScat calibration file. 

46 in_out: File-like object which is raw packet in_out and command destination. 

47 out: File-like object to write output to. 

48 sep: Output column separator. 

49 serial_mode: Is the in_out a serial port? 

50 burst_mode: Enable burst mode? 

51 sleep_on_memory_full: If the HydroScat's memory is full, should it go to sleep? 

52 fluorescence_control: As per section 8.3.12 of HydroScat user manual. 

53 start_delay: Start delay (seconds) in sending data. 

54 warmup_time: Warm-up time in seconds. 

55 burst_duration: Burst duration in seconds. 

56 burst_cycle: Burst cycle in minutes. 

57 total_duration: Total logging duration in minutes. 

58 log_period: Logging period in seconds. 

59 output_cal_header: Output header with information from calibration file? 

60 logger: A logger object; defaults to None. 

61 verbose: Verbose mode flag. 

62 """ 

63 SensorBase.__init__(self, in_out, out, sep, logger, verbose) 

64 

65 self.serial_mode = serial_mode 

66 self.burst_mode = burst_mode 

67 self.sleep_on_memory_full = sleep_on_memory_full 

68 self.fluorescence_control = fluorescence_control 

69 self.start_delay = start_delay 

70 self.warmup_time = warmup_time 

71 self.burst_duration = burst_duration 

72 self.burst_cycle = burst_cycle 

73 self.total_duration = total_duration 

74 self.log_period = log_period 

75 self.output_cal_header = output_cal_header 

76 

77 self.aux_data = {"Time":None, "Depth":None, "Voltage":0, "Temperature":None} 

78 

79 self.config = configparser.ConfigParser() 

80 self.config.read(cal_path) 

81 

82 self.num_channels = len(self.channel_names()) 

83 

84 if serial_mode: 

85 self.init_serial() 

86 self.exit_str = None 

87 else: 

88 self.exit_str = "'End of cast" 

89 

90 

91 def channel_names(self) -> List[str]: 

92 """Return a list of channel names for this device.""" 

93 names = [] 

94 

95 if len(self.config) != 0: 95 ↛ 106line 95 didn't jump to line 106, because the condition on line 95 was never false

96 channel_num = 1 

97 finished = False 

98 while not finished: 

99 channel_name = "Channel {}".format(channel_num) 

100 if channel_name in self.config: 

101 names.append(self.config[channel_name]["Name"]) 

102 channel_num += 1 

103 else: 

104 finished = True 

105 

106 return names 

107 

108 

109 def run(self): 

110 """Read as many lines of data as the in_out offers 

111 after issuing the start command.""" 

112 if self.serial_mode: 

113 self.start_command() 

114 

115 line, dataline = self.next() 

116 

117 print("\n".join(self.header_lines()), file=self.out) 

118 

119 while True: 

120 if dataline is not None: 

121 print("{}".format(dataline), file=self.out) 

122 line, dataline = self.next() 

123 if line is not None and self.exit_str is not None and \ 

124 line.startswith(self.exit_str): 

125 break 

126 

127 self.out.close() 

128 self.in_out.close() 

129 

130 

131 def init_serial(self): 

132 """Issue serial initialisation commands.""" 

133 self.consume_available(self.in_out) 

134 self.date_command() 

135 

136 

137 def date_command(self): 

138 """Issue DATE command""" 

139 date_time = datetime.strftime(datetime.now(), format="%m/%d/%Y %H:%M:%S") 

140 self.command_response("DATE,{}".format(date_time), 

141 response_patterns=None) 

142 

143 

144 def flourescence_command(self): 

145 """Issue FL command""" 

146 self.command_response("FL,{}".format(self.fluorescence_control), 

147 response_patterns=None) 

148 

149 

150 def burst_command(self): 

151 """Issue BURST command""" 

152 burst_mode = 1 if self.burst_mode else 0 

153 sleep_on_memory_full = 1 if self.sleep_on_memory_full else 0 

154 auto_start = 0 

155 

156 self.command_response("BURST,{},{},{},{},{},{},{},{},{}".\ 

157 format(burst_mode, 

158 self.warmup_time, self.burst_duration, 

159 self.burst_cycle, self.total_duration, 

160 self.log_period, self.start_delay, 

161 auto_start, sleep_on_memory_full), 

162 response_patterns=None) 

163 

164 

165 def start_command(self): 

166 """Issue START command""" 

167 return self.command_response("START,{}".format(self.start_delay), 

168 response_patterns=["'Sampling starts in 1 second", 

169 r"HS\d"]) 

170 

171 

172 def stop_command(self): 

173 """Issue STOP command""" 

174 return self.command_response("STOP", 

175 response_patterns=["'Sampling stopped."]) 

176 

177 

178 def header_lines(self): 

179 """Returns the header lines.""" 

180 header_lines = [] 

181 

182 if self.output_cal_header: 182 ↛ 194line 182 didn't jump to line 194, because the condition on line 182 was never false

183 for section in self.config: 

184 if "default" not in section.lower() and "end" not in section.lower(): 

185 header_lines.append("[{}]".format(section)) 

186 for param in self.config[section]: 

187 val = self.config[section][param] 

188 if "//" in val: 

189 val = val[0:val.find("//")] 

190 if "(" in val: 

191 val = val[0:val.find("(")] 

192 header_lines.append("{}={}".format(param, val)) 

193 

194 header_lines.append("[ColumnHeadings]") 

195 names = ["Time", "Depth"] 

196 for index in range(1, self.num_channels+1): 

197 channel_config = self.config["Channel {}".format(index)] 

198 channel_name = channel_config["Name"] 

199 if channel_name[0:2] in ["bb", "fl"]: 199 ↛ 201line 199 didn't jump to line 201, because the condition on line 199 was never false

200 channel_name = "beta{}".format(channel_name) 

201 names.append(channel_name) 

202 header_lines.append(",".join(names)) 

203 header_lines.append("[Data]") 

204 

205 return header_lines 

206 

207 

208 def next(self) -> Tuple[str,str]: 

209 """Returns the next data line from the source.""" 

210 rawline = self.in_out.readline() 

211 

212 dataline = None 

213 

214 if rawline is not None: 214 ↛ 218line 214 didn't jump to line 218, because the condition on line 214 was never false

215 rawline = rawline.rstrip() 

216 dataline = self.rawline2dataline(rawline) 

217 

218 return rawline, dataline 

219 

220 

221 def rawline2dataline(self, line: str) -> str: 

222 """Given a line containing a raw packet, return a line of 

223 calibrated output data. 

224 

225 Args: 

226 line: A line corresponding to a raw format packet, ending in CRLF. 

227 

228 Returns: 

229 A line of data. 

230 

231 Throws: 

232 ValueError if packet checksum field not same as computed checksum value. 

233 """ 

234 data_dict = self.rawline2datadict(line) 

235 

236 if data_dict is not None: 

237 dataline = self.sep.join([str(data_dict[key]) for key in data_dict]) 

238 else: 

239 dataline = None 

240 

241 return dataline 

242 

243 

244 def rawline2datadict(self, line: str) -> Dict[str,Union[int,float]]: 

245 """Given a line containing a raw packet, return a dictionary of calibrated 

246 data or housekeeping packet fields. Also updates auxillary data dictionary. 

247 

248 Args: 

249 line: A line corresponding to a raw format packet, ending in CRLF. 

250 

251 Returns: 

252 An ordered dictionary of named calibrated data values. 

253 

254 Throws: 

255 ValueError if packet checksum field not same as computed checksum value. 

256 """ 

257 raw_values_dict = extract_from_raw_line(line, self.num_channels) 

258 

259 if raw_values_dict is None: 

260 data_dict = None 

261 elif len(raw_values_dict) <= 21: 

262 # from data packet 

263 data_dict = self.raw2datadict(raw_values_dict) 

264 else: 

265 # from housekeeping packet 

266 data_dict = {key: raw_values_dict[key] 

267 for key in raw_values_dict if key != "Checksum"} 

268 # as per section 9.4.5 of user manual, HydroScat manual Rev J, 

269 # HydroScat only draws power from supply with highest voltage, 

270 # so find maximum 

271 VsupA = data_dict["VsupA"] 

272 VsupB = data_dict["VsupB"] 

273 Vback = data_dict["Vback"] 

274 self.aux_data["Voltage"] = voltage(max(VsupA, VsupB, Vback)) 

275 

276 return data_dict 

277 

278 

279 def raw2datadict(self, raw_values: Dict[str,Union[int,float]]) -> Dict[str,Union[int,float]]: 

280 """Given a dictionary of raw data packet fields, return a dictionary of calibrated 

281 data packet fields. Also updates auxillary data dictionary. 

282 

283 Args: 

284 raw_values: An ordered dictionary of the numeric fields extracted from the packet. 

285 

286 Returns: 

287 An ordered dictionary of named calibrated data values. 

288 

289 Throws: 

290 ValueError if packet checksum field not same as computed checksum value. 

291 """ 

292 data_dict = OrderedDict() 

293 

294 self.aux_data["Time"] = raw_values["Time"] 

295 data_dict["Time"] = seconds_since_posix_epoch_to_excel_days(raw_values["Time"]) 

296 

297 data_dict["Depth"] = depth(raw_values["RawDepth"], 

298 self.config["General"].getfloat("DepthCal"), 

299 self.config["General"].getfloat("DepthOff")) 

300 self.aux_data["Depth"] = data_dict["Depth"] 

301 

302 instr_temperature = temperature(raw_values["TempRaw"]) 

303 self.aux_data["Temperature"] = instr_temperature 

304 

305 calibration_temperature = self.config["General"].getfloat("CalTemp") 

306 

307 for index in range(1, self.num_channels+1): 

308 s_norm = raw_values["Snorm{}".format(index)] 

309 gain_status = raw_values["GainStatus{}".format(index)] 

310 gain_index = gain_status & 3 # gain is lower two bits 

311 status = (gain_status & 4) >> 2 # status is upper bit 2 

312 channel_config = self.config["Channel {}".format(index)] 

313 if gain_index != 0: 

314 beta_value = \ 

315 beta(s_norm=s_norm, mu=channel_config.getfloat("Mu"), 

316 temp_coeff=channel_config.getfloat("TempCoeff"), 

317 instr_temp=instr_temperature, 

318 cal_temp=calibration_temperature, 

319 gain_ratio=channel_config.getfloat("Gain{}".format(gain_index)), 

320 r_nom=channel_config.getfloat("RNominal")) 

321 else: 

322 beta_value = 0 

323 

324 data_dict[channel_config["Name"]] = beta_value 

325 

326 return data_dict