Coverage for src/aquasense/hydroscat/hydroscat.py: 88%
142 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
1"""
2HydroScat packet I/O, extraction, calibration class.
4See sections 9.5, 9.2.8, and 9.2.9 of User's Manual, Revision J.
5"""
7import configparser
9from aquasense.common.sensor import SensorBase
10from aquasense.hydroscat.calibrate import seconds_since_posix_epoch_to_excel_days
11from aquasense.hydroscat.calibrate import temperature, depth, voltage, beta
12from aquasense.hydroscat.extract_raw import extract_from_raw_line
14from collections import OrderedDict
16from datetime import datetime
18import logging
20from typing import Dict, List, TextIO, Tuple, Union
22class HydroScat(SensorBase):
24 def __init__(self,
25 cal_path: str,
26 in_out: TextIO,
27 out: TextIO,
28 sep: str,
29 serial_mode: bool,
30 burst_mode: bool,
31 sleep_on_memory_full: bool,
32 fluorescence_control: int,
33 start_delay: float,
34 warmup_time: float,
35 burst_duration: float,
36 burst_cycle: float,
37 total_duration: float,
38 log_period: float,
39 output_cal_header: bool=False,
40 logger: logging.Logger=None,
41 verbose: bool=False):
42 """Configure initial HydroScat state.
44 Args:
45 cal_path: Path to a HydroScat calibration file.
46 in_out: File-like object which is raw packet in_out and command destination.
47 out: File-like object to write output to.
48 sep: Output column separator.
49 serial_mode: Is the in_out a serial port?
50 burst_mode: Enable burst mode?
51 sleep_on_memory_full: If the HydroScat's memory is full, should it go to sleep?
52 fluorescence_control: As per section 8.3.12 of HydroScat user manual.
53 start_delay: Start delay (seconds) in sending data.
54 warmup_time: Warm-up time in seconds.
55 burst_duration: Burst duration in seconds.
56 burst_cycle: Burst cycle in minutes.
57 total_duration: Total logging duration in minutes.
58 log_period: Logging period in seconds.
59 output_cal_header: Output header with information from calibration file?
60 logger: A logger object; defaults to None.
61 verbose: Verbose mode flag.
62 """
63 SensorBase.__init__(self, in_out, out, sep, logger, verbose)
65 self.serial_mode = serial_mode
66 self.burst_mode = burst_mode
67 self.sleep_on_memory_full = sleep_on_memory_full
68 self.fluorescence_control = fluorescence_control
69 self.start_delay = start_delay
70 self.warmup_time = warmup_time
71 self.burst_duration = burst_duration
72 self.burst_cycle = burst_cycle
73 self.total_duration = total_duration
74 self.log_period = log_period
75 self.output_cal_header = output_cal_header
77 self.aux_data = {"Time":None, "Depth":None, "Voltage":0, "Temperature":None}
79 self.config = configparser.ConfigParser()
80 self.config.read(cal_path)
82 self.num_channels = len(self.channel_names())
84 if serial_mode:
85 self.init_serial()
86 self.exit_str = None
87 else:
88 self.exit_str = "'End of cast"
91 def channel_names(self) -> List[str]:
92 """Return a list of channel names for this device."""
93 names = []
95 if len(self.config) != 0: 95 ↛ 106line 95 didn't jump to line 106, because the condition on line 95 was never false
96 channel_num = 1
97 finished = False
98 while not finished:
99 channel_name = "Channel {}".format(channel_num)
100 if channel_name in self.config:
101 names.append(self.config[channel_name]["Name"])
102 channel_num += 1
103 else:
104 finished = True
106 return names
109 def run(self):
110 """Read as many lines of data as the in_out offers
111 after issuing the start command."""
112 if self.serial_mode:
113 self.start_command()
115 line, dataline = self.next()
117 print("\n".join(self.header_lines()), file=self.out)
119 while True:
120 if dataline is not None:
121 print("{}".format(dataline), file=self.out)
122 line, dataline = self.next()
123 if line is not None and self.exit_str is not None and \
124 line.startswith(self.exit_str):
125 break
127 self.out.close()
128 self.in_out.close()
131 def init_serial(self):
132 """Issue serial initialisation commands."""
133 self.consume_available(self.in_out)
134 self.date_command()
137 def date_command(self):
138 """Issue DATE command"""
139 date_time = datetime.strftime(datetime.now(), format="%m/%d/%Y %H:%M:%S")
140 self.command_response("DATE,{}".format(date_time),
141 response_patterns=None)
144 def flourescence_command(self):
145 """Issue FL command"""
146 self.command_response("FL,{}".format(self.fluorescence_control),
147 response_patterns=None)
150 def burst_command(self):
151 """Issue BURST command"""
152 burst_mode = 1 if self.burst_mode else 0
153 sleep_on_memory_full = 1 if self.sleep_on_memory_full else 0
154 auto_start = 0
156 self.command_response("BURST,{},{},{},{},{},{},{},{},{}".\
157 format(burst_mode,
158 self.warmup_time, self.burst_duration,
159 self.burst_cycle, self.total_duration,
160 self.log_period, self.start_delay,
161 auto_start, sleep_on_memory_full),
162 response_patterns=None)
165 def start_command(self):
166 """Issue START command"""
167 return self.command_response("START,{}".format(self.start_delay),
168 response_patterns=["'Sampling starts in 1 second",
169 r"HS\d"])
172 def stop_command(self):
173 """Issue STOP command"""
174 return self.command_response("STOP",
175 response_patterns=["'Sampling stopped."])
178 def header_lines(self):
179 """Returns the header lines."""
180 header_lines = []
182 if self.output_cal_header: 182 ↛ 194line 182 didn't jump to line 194, because the condition on line 182 was never false
183 for section in self.config:
184 if "default" not in section.lower() and "end" not in section.lower():
185 header_lines.append("[{}]".format(section))
186 for param in self.config[section]:
187 val = self.config[section][param]
188 if "//" in val:
189 val = val[0:val.find("//")]
190 if "(" in val:
191 val = val[0:val.find("(")]
192 header_lines.append("{}={}".format(param, val))
194 header_lines.append("[ColumnHeadings]")
195 names = ["Time", "Depth"]
196 for index in range(1, self.num_channels+1):
197 channel_config = self.config["Channel {}".format(index)]
198 channel_name = channel_config["Name"]
199 if channel_name[0:2] in ["bb", "fl"]: 199 ↛ 201line 199 didn't jump to line 201, because the condition on line 199 was never false
200 channel_name = "beta{}".format(channel_name)
201 names.append(channel_name)
202 header_lines.append(",".join(names))
203 header_lines.append("[Data]")
205 return header_lines
208 def next(self) -> Tuple[str,str]:
209 """Returns the next data line from the source."""
210 rawline = self.in_out.readline()
212 dataline = None
214 if rawline is not None: 214 ↛ 218line 214 didn't jump to line 218, because the condition on line 214 was never false
215 rawline = rawline.rstrip()
216 dataline = self.rawline2dataline(rawline)
218 return rawline, dataline
221 def rawline2dataline(self, line: str) -> str:
222 """Given a line containing a raw packet, return a line of
223 calibrated output data.
225 Args:
226 line: A line corresponding to a raw format packet, ending in CRLF.
228 Returns:
229 A line of data.
231 Throws:
232 ValueError if packet checksum field not same as computed checksum value.
233 """
234 data_dict = self.rawline2datadict(line)
236 if data_dict is not None:
237 dataline = self.sep.join([str(data_dict[key]) for key in data_dict])
238 else:
239 dataline = None
241 return dataline
244 def rawline2datadict(self, line: str) -> Dict[str,Union[int,float]]:
245 """Given a line containing a raw packet, return a dictionary of calibrated
246 data or housekeeping packet fields. Also updates auxillary data dictionary.
248 Args:
249 line: A line corresponding to a raw format packet, ending in CRLF.
251 Returns:
252 An ordered dictionary of named calibrated data values.
254 Throws:
255 ValueError if packet checksum field not same as computed checksum value.
256 """
257 raw_values_dict = extract_from_raw_line(line, self.num_channels)
259 if raw_values_dict is None:
260 data_dict = None
261 elif len(raw_values_dict) <= 21:
262 # from data packet
263 data_dict = self.raw2datadict(raw_values_dict)
264 else:
265 # from housekeeping packet
266 data_dict = {key: raw_values_dict[key]
267 for key in raw_values_dict if key != "Checksum"}
268 # as per section 9.4.5 of user manual, HydroScat manual Rev J,
269 # HydroScat only draws power from supply with highest voltage,
270 # so find maximum
271 VsupA = data_dict["VsupA"]
272 VsupB = data_dict["VsupB"]
273 Vback = data_dict["Vback"]
274 self.aux_data["Voltage"] = voltage(max(VsupA, VsupB, Vback))
276 return data_dict
279 def raw2datadict(self, raw_values: Dict[str,Union[int,float]]) -> Dict[str,Union[int,float]]:
280 """Given a dictionary of raw data packet fields, return a dictionary of calibrated
281 data packet fields. Also updates auxillary data dictionary.
283 Args:
284 raw_values: An ordered dictionary of the numeric fields extracted from the packet.
286 Returns:
287 An ordered dictionary of named calibrated data values.
289 Throws:
290 ValueError if packet checksum field not same as computed checksum value.
291 """
292 data_dict = OrderedDict()
294 self.aux_data["Time"] = raw_values["Time"]
295 data_dict["Time"] = seconds_since_posix_epoch_to_excel_days(raw_values["Time"])
297 data_dict["Depth"] = depth(raw_values["RawDepth"],
298 self.config["General"].getfloat("DepthCal"),
299 self.config["General"].getfloat("DepthOff"))
300 self.aux_data["Depth"] = data_dict["Depth"]
302 instr_temperature = temperature(raw_values["TempRaw"])
303 self.aux_data["Temperature"] = instr_temperature
305 calibration_temperature = self.config["General"].getfloat("CalTemp")
307 for index in range(1, self.num_channels+1):
308 s_norm = raw_values["Snorm{}".format(index)]
309 gain_status = raw_values["GainStatus{}".format(index)]
310 gain_index = gain_status & 3 # gain is lower two bits
311 status = (gain_status & 4) >> 2 # status is upper bit 2
312 channel_config = self.config["Channel {}".format(index)]
313 if gain_index != 0:
314 beta_value = \
315 beta(s_norm=s_norm, mu=channel_config.getfloat("Mu"),
316 temp_coeff=channel_config.getfloat("TempCoeff"),
317 instr_temp=instr_temperature,
318 cal_temp=calibration_temperature,
319 gain_ratio=channel_config.getfloat("Gain{}".format(gain_index)),
320 r_nom=channel_config.getfloat("RNominal"))
321 else:
322 beta_value = 0
324 data_dict[channel_config["Name"]] = beta_value
326 return data_dict