Coverage for src/aquasense/hydroscat/driver.py: 0%
76 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
1"""
2HydroScat I/O driver for integration testing.
3"""
5import argparse
6import io
7import os
8import serial
9import sys
11from aquasense.common.arghelp import less_than_zero_check, usage
12from aquasense.hydroscat.hydroscat import HydroScat
15def create_arg_parser() -> argparse.ArgumentParser:
16 """
17 Create and return the command-line parser.
18 """
19 parser = argparse.ArgumentParser(
20 description="Read HydroScat data from a file or serial port.")
22 parser.add_argument("--source", "-s",
23 dest="source",
24 type=str,
25 default="reference/data files/HydroScat-6/sample.raw",
26 help="Raw lines source (file path or serial port name)")
28 parser.add_argument("--cal-path", "-c",
29 dest="cal_path",
30 type=str,
31 default="reference/Device files/HydroScat/HS080339 2021-10-16.cal",
32 help="Calibration file path")
34 parser.add_argument("--output-path", "-o",
35 dest="out_path",
36 type=str,
37 default=None,
38 help="Output file path")
40 parser.add_argument("--output-field-delimiter", "-d",
41 dest="output_field_delimiter",
42 type=str,
43 default=",",
44 help="Delimiter for fields in output rows (default: ,)")
46 parser.add_argument("--baud-rate", "-b",
47 dest="baud_rate",
48 type=int,
49 default=9600,
50 help="Serial port baud rate (default: 9600)")
52 parser.add_argument("--serial-timeout", "-t",
53 dest="timeout",
54 type=int,
55 default=1,
56 help="Serial port read timeout (default: 1 second)")
58 parser.add_argument("--burst-mode", "-e",
59 dest="burst_mode",
60 default=False,
61 action="store_true",
62 help="Enable burst mode?")
64 parser.add_argument("--sleep-on-memory-full", "-z",
65 dest="sleep_on_memory_full",
66 default=False,
67 action="store_true",
68 help="Enable burst mode?")
70 parser.add_argument("--fluorescence-control", "-f",
71 dest="fluorescence_control",
72 type=int,
73 default=1,
74 help="Fluorescence channel setting (0, 1, 2) as "
75 "per section 8.3.12 of HydroScat user manual "
76 "(default: 1)")
78 parser.add_argument("--start-delay", "-r",
79 dest="start_delay",
80 type=float,
81 default=1,
82 help="Start delay in seconds (default: 1)")
84 parser.add_argument("--warmup-time", "-w",
85 dest="warmup_time",
86 type=float,
87 default=0,
88 help="Start delay in seconds (default: 0)")
90 parser.add_argument("--burst-duration", "-u",
91 dest="burst_duration",
92 type=float,
93 default=0,
94 help="Burst duration in seconds (default: 0)")
96 parser.add_argument("--burst-cycle", "-y",
97 dest="burst_cycle",
98 type=float,
99 default=0.7,
100 help="Burst cycle in minutes (default: 0.7)")
102 parser.add_argument("--total-duration", "-a",
103 dest="total_duration",
104 type=float,
105 default=0.4,
106 help="Total duration in minutes (default: 0.4)")
108 parser.add_argument("--log-period", "-i",
109 dest="log_period",
110 type=float,
111 default=0.6,
112 help="Log period in seconds (default: 0.6)")
114 parser.add_argument("--output-cal-header", "-l",
115 dest="output_cal_header",
116 default=False,
117 action="store_true",
118 help="Output calibration header")
120 parser.add_argument("--verbose", "-v",
121 dest="verbose",
122 default=False,
123 action="store_true",
124 help="Verbose output mode")
126 return parser
129def check_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
130 """
131 Check args and exit if any problem is found.
133 Args:
134 parser: Command-line parser
135 args: Command-line options
137 Returns:
138 Whether or not the source is serial.
139 """
140 msgs = []
142 if not os.path.exists(args.cal_path):
143 msgs.append("calibration file does not exist")
145 if args.source.startswith("COM") or args.source.startswith("/dev/tty"):
146 serial_mode = True
147 if args.baud_rate <= 0:
148 msgs.append("baud rate must be a positive integer (e.g. 9600, 19200)")
149 else:
150 serial_mode = False
151 if not os.path.exists(args.source):
152 msgs.append("raw packet lines source does not exist")
154 if args.out_path is not None and \
155 not os.path.exists(os.path.dirname(args.out_path)):
156 msgs.append("output file directory does not exist")
158 if args.fluorescence_control not in [0,1,2]:
159 msgs.append("fluorescence control value must be 0, 1, or 2")
161 less_than_zero_check(args.start_delay, "start delay", msgs)
162 less_than_zero_check(args.warmup_time, "warm-up time", msgs)
163 less_than_zero_check(args.burst_duration, "burst duration", msgs)
164 less_than_zero_check(args.burst_cycle, "burst cycle", msgs)
165 less_than_zero_check(args.total_duration, "total duration", msgs)
166 less_than_zero_check(args.log_period, "log_period", msgs)
168 if len(msgs) != 0:
169 usage(parser, msgs)
171 return serial_mode
174def main():
175 try:
176 parser = create_arg_parser()
177 args = parser.parse_args()
179 serial_mode = check_args(parser, args)
181 if serial_mode:
182 ser = serial.Serial(port=args.source,
183 baudrate=args.baud_rate,
184 timeout=args.timeout)
185 in_out = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
186 else:
187 in_out = open(args.source)
189 if args.out_path is None:
190 out = sys.stdout
191 else:
192 out = open(args.out_path, "w")
194 hydroscat = HydroScat(cal_path=args.cal_path,
195 in_out=in_out,
196 out=out,
197 sep=args.output_field_delimiter,
198 serial_mode=serial_mode,
199 burst_mode=args.burst_mode,
200 sleep_on_memory_full=args.sleep_on_memory_full,
201 fluorescence_control=args.fluorescence_control,
202 start_delay=args.start_delay,
203 warmup_time=args.warmup_time,
204 burst_duration=args.burst_duration,
205 burst_cycle=args.burst_cycle,
206 total_duration=args.total_duration,
207 log_period=args.log_period,
208 output_cal_header=args.output_cal_header,
209 logger=None,
210 verbose=args.verbose)
212 if serial_mode:
213 hydroscat.flourescence_command()
214 hydroscat.burst_command()
216 hydroscat.run()
218 except KeyboardInterrupt:
219 print("** keyboard interrupt", file=sys.stderr)
220 sys.exit(1)
221 except Exception as e:
222 print("** exiting with error: {}".format(str(e)), file=sys.stderr)
223 sys.exit(2)
226if __name__ == "__main__":
227 main()