Coverage for src/aquasense/hydroscat/extract_raw.py: 97%

93 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-20 10:23 +0000

1""" 

2HydroScat raw data extraction functions. 

3 

4D, T, and H packets are handled. 

5 

6See HydroScat-6P Spectral Backscattering Sensor & Fluorometer 

7 User's Manual, Revision J, Raw Data Formats section. 

8""" 

9 

10from typing import Dict, List, Tuple, Union 

11import numpy as np 

12 

13 

14def extract_from_raw_line(line: str, num_channels: int) -> List[Union[int,float]]: 

15 """Extract the numeric fields from the packet in a line. 

16 

17 Args: 

18 line: A line corresponding to a raw format packet, ending in CRLF. 

19 num_channels: The number of HydroScat channels. 

20 

21 Returns: 

22 A list of the numeric fields extracted from the packet. 

23 

24 Raises: 

25 ValueError if packet checksum field not same as computed checksum value. 

26 """ 

27 packet_handlers = { 

28 "*D":extract_from_data_packet, 

29 "*T":extract_from_data_packet, 

30 "*H":extract_from_H_packet 

31 } 

32 

33 fields = None 

34 

35 if len(line) > 2: 

36 line = line.rstrip() 

37 packet_prefix = line[0:2] 

38 

39 if packet_prefix in packet_handlers: 39 ↛ 45line 39 didn't jump to line 45, because the condition on line 39 was never false

40 fields = packet_handlers[packet_prefix](line, num_channels=8) 

41 

42 if fields["Checksum"] != checksum(line): 

43 raise ValueError() 

44 

45 return fields 

46 

47 

48def extract_time(packet: str, start: int, frac_time: bool) -> Tuple[int, 

49 Union[int,float]]: 

50 """Extract the time from the packet. 

51 

52 Args: 

53 packet: A raw D, T or H format packet. 

54 start: The start index in the packet string. 

55 frac_time: Include fractional time (T packet)? 

56 

57 Returns: 

58 A tuple containing the next packet string index and 

59 the integer or decimal time. 

60 """ 

61 # Extract integer time and, optionally fractional time 

62 # 

63 # Note: packet tables in user manual say signed 32 bit 

64 # but that is not validated by detailed description, which 

65 # indicates time starts from zero, so assuming unsigned 32 bit 

66 Time = packet[start:start+8] # 8 signed bytes 

67 decimalTime = unsigned32(Time) 

68 

69 if frac_time: 

70 FractionalTime = packet[start+8:start+10] # 2 unsigned bytes 

71 decimalTime += unsigned8(FractionalTime)/100 

72 index = start+10 

73 else: 

74 index = start+8 

75 

76 return index, decimalTime 

77 

78 

79def extract_from_data_packet(packet: str, num_channels: int) -> Dict[str,Union[int,float]]: 

80 """Extract the numeric fields from the raw data packet. 

81  

82 Args: 

83 packet: A raw D or T format packet. 

84 num_channels: The number of HydroScat channels. 

85 

86 Returns: 

87 An ordered dictionary of the numeric fields extracted from the data packet. 

88 Note that a gain/status value of zero implies that the corresponding 

89 channel is disabled and its data should be ignored (see section 9.2.7 

90 in HydroScat manual Rev J, Gain/Status). 

91 """ 

92 fields = {} 

93 

94 start, Time = extract_time(packet, 2, packet[1] == "T") 

95 fields["Time"] = Time 

96 

97 # Snorm1..N (16 bits each) 

98 for i in range(0, num_channels): 

99 Snorm = packet[start:start+4] 

100 fields["Snorm{}".format(i+1)] = signed16(Snorm) 

101 start += 4 

102 

103 # Gain/Status1..N (1 nibble each) 

104 for i in range(0, num_channels): 

105 GainStatus = packet[start] 

106 fields["GainStatus{}".format(i+1)] = int(GainStatus, 16) 

107 start += 1 

108 

109 # Raw depth/temperature, error, checksum 

110 

111 RawDepth = packet[start:start+4] 

112 fields["RawDepth"] = signed16(RawDepth) 

113 start += 4 

114 

115 TempRaw = packet[start:start+2] 

116 fields["TempRaw"] = unsigned8(TempRaw) 

117 start += 2 

118 

119 # 1 byte 

120 Error = packet[start:start+2] 

121 fields["Error"] = int(Error, 16) 

122 start += 2 

123 

124 # 1 byte 

125 Checksum = packet[start:start+2] 

126 fields["Checksum"] = int(Checksum, 16) 

127 start += 2 

128 

129 return fields 

130 

131 

132def extract_from_H_packet(packet: str, num_channels: int) -> Dict[str,int]: 

133 """Extract the numeric fields from the raw H packet. 

134 

135 Args: 

136 packet: A raw H format packet without the 2 byte packet prefix. 

137 num_channels: The number of HydroScat channels. 

138 

139 Returns: 

140 An ordered dictionary of the numeric fields extracted from the H packet. 

141 """ 

142 fields = {} 

143 

144 start, Time = extract_time(packet, 2, packet[1] == "T") 

145 fields["Time"] = Time 

146 

147 # SigOff1..N (16 bits), Ref1..N (16 bits), RefOff1..N (16 bits), Back1..N (16 bits) 

148 

149 for i in range(0, num_channels): 

150 SigOff = packet[start:start+4] 

151 fields["SigOff{}".format(i+1)] = signed16(SigOff) 

152 start += 4 

153 

154 Ref = packet[start:start+4] 

155 fields["Ref{}".format(i+1)] = signed16(Ref) 

156 start += 4 

157 

158 RefOff = packet[start:start+4] 

159 fields["RefOff{}".format(i+1)] = signed16(RefOff) 

160 start += 4 

161 

162 Back = packet[start:start+2] 

163 fields["Back{}".format(i+1)] = signed8(Back) 

164 start += 2 

165 

166 VsupA = packet[start:start+2] 

167 fields["VsupA"] = unsigned8(VsupA) 

168 start += 2 

169 

170 VsupB = packet[start:start+2] 

171 fields["VsupB"] = unsigned8(VsupB) 

172 start += 2 

173 

174 Vback = packet[start:start+2] 

175 fields["Vback"] = unsigned8(Vback) 

176 start += 2 

177 

178 VAux = packet[start:start+4] 

179 fields["VAux"] = signed16(VAux) 

180 start += 4 

181 

182 # 1 byte 

183 Checksum = packet[start:start+2] 

184 fields["Checksum"] = int(Checksum, 16) 

185 

186 return fields 

187 

188 

189def checksum(packet: str) -> int: 

190 """Sum all bytes except the packet flag, preceding the checksum byte, 

191 returning the value of the least significant byte of the sum. 

192 

193 Args: 

194 packet: A string corresponding to a raw format packet, NOT ending in CRLF. 

195 

196 Returns: 

197 Integer value of the least significant byte of the unsigned sum. 

198 """ 

199 return sum([ord(byte) for byte in packet[1:-2]]) & 0xFF 

200 

201 

202# Helpers 

203 

204# These functions should be used where we have an explicit need 

205# for signed/unsigned values vs char, byte, nibble 

206 

207def signed8(num_str: str, base=16) -> int: 

208 return int(np.int8(int(num_str, base))) 

209 

210 

211def unsigned8(num_str: str, base=16) -> int: 

212 return int(np.uint8(int(num_str, base))) 

213 

214 

215def signed16(num_str: str, base=16) -> int: 

216 return int(np.int16(int(num_str, base))) 

217 

218 

219def unsigned16(num_str: str, base=16) -> int: 

220 return int(np.uint16(int(num_str, base))) 

221 

222 

223def signed32(num_str: str, base=16) -> int: 

224 return int(np.int32(int(num_str, base))) 

225 

226 

227def unsigned32(num_str: str, base=16) -> int: 

228 return int(np.uint32(int(num_str, base)))