Coverage for src/aquasense/hydroscat/driver.py: 0%

76 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-20 10:23 +0000

1""" 

2HydroScat I/O driver for integration testing. 

3""" 

4 

5import argparse 

6import io 

7import os 

8import serial 

9import sys 

10 

11from aquasense.common.arghelp import less_than_zero_check, usage 

12from aquasense.hydroscat.hydroscat import HydroScat 

13 

14 

15def create_arg_parser() -> argparse.ArgumentParser: 

16 """ 

17 Create and return the command-line parser. 

18 """ 

19 parser = argparse.ArgumentParser( 

20 description="Read HydroScat data from a file or serial port.") 

21 

22 parser.add_argument("--source", "-s", 

23 dest="source", 

24 type=str, 

25 default="reference/data files/HydroScat-6/sample.raw", 

26 help="Raw lines source (file path or serial port name)") 

27 

28 parser.add_argument("--cal-path", "-c", 

29 dest="cal_path", 

30 type=str, 

31 default="reference/Device files/HydroScat/HS080339 2021-10-16.cal", 

32 help="Calibration file path") 

33 

34 parser.add_argument("--output-path", "-o", 

35 dest="out_path", 

36 type=str, 

37 default=None, 

38 help="Output file path") 

39 

40 parser.add_argument("--output-field-delimiter", "-d", 

41 dest="output_field_delimiter", 

42 type=str, 

43 default=",", 

44 help="Delimiter for fields in output rows (default: ,)") 

45 

46 parser.add_argument("--baud-rate", "-b", 

47 dest="baud_rate", 

48 type=int, 

49 default=9600, 

50 help="Serial port baud rate (default: 9600)") 

51 

52 parser.add_argument("--serial-timeout", "-t", 

53 dest="timeout", 

54 type=int, 

55 default=1, 

56 help="Serial port read timeout (default: 1 second)") 

57 

58 parser.add_argument("--burst-mode", "-e", 

59 dest="burst_mode", 

60 default=False, 

61 action="store_true", 

62 help="Enable burst mode?") 

63 

64 parser.add_argument("--sleep-on-memory-full", "-z", 

65 dest="sleep_on_memory_full", 

66 default=False, 

67 action="store_true", 

68 help="Enable burst mode?") 

69 

70 parser.add_argument("--fluorescence-control", "-f", 

71 dest="fluorescence_control", 

72 type=int, 

73 default=1, 

74 help="Fluorescence channel setting (0, 1, 2) as " 

75 "per section 8.3.12 of HydroScat user manual " 

76 "(default: 1)") 

77 

78 parser.add_argument("--start-delay", "-r", 

79 dest="start_delay", 

80 type=float, 

81 default=1, 

82 help="Start delay in seconds (default: 1)") 

83 

84 parser.add_argument("--warmup-time", "-w", 

85 dest="warmup_time", 

86 type=float, 

87 default=0, 

88 help="Start delay in seconds (default: 0)") 

89 

90 parser.add_argument("--burst-duration", "-u", 

91 dest="burst_duration", 

92 type=float, 

93 default=0, 

94 help="Burst duration in seconds (default: 0)") 

95 

96 parser.add_argument("--burst-cycle", "-y", 

97 dest="burst_cycle", 

98 type=float, 

99 default=0.7, 

100 help="Burst cycle in minutes (default: 0.7)") 

101 

102 parser.add_argument("--total-duration", "-a", 

103 dest="total_duration", 

104 type=float, 

105 default=0.4, 

106 help="Total duration in minutes (default: 0.4)") 

107 

108 parser.add_argument("--log-period", "-i", 

109 dest="log_period", 

110 type=float, 

111 default=0.6, 

112 help="Log period in seconds (default: 0.6)") 

113 

114 parser.add_argument("--output-cal-header", "-l", 

115 dest="output_cal_header", 

116 default=False, 

117 action="store_true", 

118 help="Output calibration header") 

119 

120 parser.add_argument("--verbose", "-v", 

121 dest="verbose", 

122 default=False, 

123 action="store_true", 

124 help="Verbose output mode") 

125 

126 return parser 

127 

128 

129def check_args(parser: argparse.ArgumentParser, args: argparse.Namespace): 

130 """ 

131 Check args and exit if any problem is found. 

132 

133 Args: 

134 parser: Command-line parser 

135 args: Command-line options 

136  

137 Returns: 

138 Whether or not the source is serial. 

139 """ 

140 msgs = [] 

141 

142 if not os.path.exists(args.cal_path): 

143 msgs.append("calibration file does not exist") 

144 

145 if args.source.startswith("COM") or args.source.startswith("/dev/tty"): 

146 serial_mode = True 

147 if args.baud_rate <= 0: 

148 msgs.append("baud rate must be a positive integer (e.g. 9600, 19200)") 

149 else: 

150 serial_mode = False 

151 if not os.path.exists(args.source): 

152 msgs.append("raw packet lines source does not exist") 

153 

154 if args.out_path is not None and \ 

155 not os.path.exists(os.path.dirname(args.out_path)): 

156 msgs.append("output file directory does not exist") 

157 

158 if args.fluorescence_control not in [0,1,2]: 

159 msgs.append("fluorescence control value must be 0, 1, or 2") 

160 

161 less_than_zero_check(args.start_delay, "start delay", msgs) 

162 less_than_zero_check(args.warmup_time, "warm-up time", msgs) 

163 less_than_zero_check(args.burst_duration, "burst duration", msgs) 

164 less_than_zero_check(args.burst_cycle, "burst cycle", msgs) 

165 less_than_zero_check(args.total_duration, "total duration", msgs) 

166 less_than_zero_check(args.log_period, "log_period", msgs) 

167 

168 if len(msgs) != 0: 

169 usage(parser, msgs) 

170 

171 return serial_mode 

172 

173 

174def main(): 

175 try: 

176 parser = create_arg_parser() 

177 args = parser.parse_args() 

178 

179 serial_mode = check_args(parser, args) 

180 

181 if serial_mode: 

182 ser = serial.Serial(port=args.source, 

183 baudrate=args.baud_rate, 

184 timeout=args.timeout) 

185 in_out = io.TextIOWrapper(io.BufferedRWPair(ser, ser)) 

186 else: 

187 in_out = open(args.source) 

188 

189 if args.out_path is None: 

190 out = sys.stdout 

191 else: 

192 out = open(args.out_path, "w") 

193 

194 hydroscat = HydroScat(cal_path=args.cal_path, 

195 in_out=in_out, 

196 out=out, 

197 sep=args.output_field_delimiter, 

198 serial_mode=serial_mode, 

199 burst_mode=args.burst_mode, 

200 sleep_on_memory_full=args.sleep_on_memory_full, 

201 fluorescence_control=args.fluorescence_control, 

202 start_delay=args.start_delay, 

203 warmup_time=args.warmup_time, 

204 burst_duration=args.burst_duration, 

205 burst_cycle=args.burst_cycle, 

206 total_duration=args.total_duration, 

207 log_period=args.log_period, 

208 output_cal_header=args.output_cal_header, 

209 logger=None, 

210 verbose=args.verbose) 

211 

212 if serial_mode: 

213 hydroscat.flourescence_command() 

214 hydroscat.burst_command() 

215 

216 hydroscat.run() 

217 

218 except KeyboardInterrupt: 

219 print("** keyboard interrupt", file=sys.stderr) 

220 sys.exit(1) 

221 except Exception as e: 

222 print("** exiting with error: {}".format(str(e)), file=sys.stderr) 

223 sys.exit(2) 

224 

225 

226if __name__ == "__main__": 

227 main()