Coverage for src/aquasense/hydroscat/extract_raw.py: 97%
93 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
1"""
2HydroScat raw data extraction functions.
4D, T, and H packets are handled.
6See HydroScat-6P Spectral Backscattering Sensor & Fluorometer
7 User's Manual, Revision J, Raw Data Formats section.
8"""
10from typing import Dict, List, Tuple, Union
11import numpy as np
14def extract_from_raw_line(line: str, num_channels: int) -> List[Union[int,float]]:
15 """Extract the numeric fields from the packet in a line.
17 Args:
18 line: A line corresponding to a raw format packet, ending in CRLF.
19 num_channels: The number of HydroScat channels.
21 Returns:
22 A list of the numeric fields extracted from the packet.
24 Raises:
25 ValueError if packet checksum field not same as computed checksum value.
26 """
27 packet_handlers = {
28 "*D":extract_from_data_packet,
29 "*T":extract_from_data_packet,
30 "*H":extract_from_H_packet
31 }
33 fields = None
35 if len(line) > 2:
36 line = line.rstrip()
37 packet_prefix = line[0:2]
39 if packet_prefix in packet_handlers: 39 ↛ 45line 39 didn't jump to line 45, because the condition on line 39 was never false
40 fields = packet_handlers[packet_prefix](line, num_channels=8)
42 if fields["Checksum"] != checksum(line):
43 raise ValueError()
45 return fields
48def extract_time(packet: str, start: int, frac_time: bool) -> Tuple[int,
49 Union[int,float]]:
50 """Extract the time from the packet.
52 Args:
53 packet: A raw D, T or H format packet.
54 start: The start index in the packet string.
55 frac_time: Include fractional time (T packet)?
57 Returns:
58 A tuple containing the next packet string index and
59 the integer or decimal time.
60 """
61 # Extract integer time and, optionally fractional time
62 #
63 # Note: packet tables in user manual say signed 32 bit
64 # but that is not validated by detailed description, which
65 # indicates time starts from zero, so assuming unsigned 32 bit
66 Time = packet[start:start+8] # 8 signed bytes
67 decimalTime = unsigned32(Time)
69 if frac_time:
70 FractionalTime = packet[start+8:start+10] # 2 unsigned bytes
71 decimalTime += unsigned8(FractionalTime)/100
72 index = start+10
73 else:
74 index = start+8
76 return index, decimalTime
79def extract_from_data_packet(packet: str, num_channels: int) -> Dict[str,Union[int,float]]:
80 """Extract the numeric fields from the raw data packet.
82 Args:
83 packet: A raw D or T format packet.
84 num_channels: The number of HydroScat channels.
86 Returns:
87 An ordered dictionary of the numeric fields extracted from the data packet.
88 Note that a gain/status value of zero implies that the corresponding
89 channel is disabled and its data should be ignored (see section 9.2.7
90 in HydroScat manual Rev J, Gain/Status).
91 """
92 fields = {}
94 start, Time = extract_time(packet, 2, packet[1] == "T")
95 fields["Time"] = Time
97 # Snorm1..N (16 bits each)
98 for i in range(0, num_channels):
99 Snorm = packet[start:start+4]
100 fields["Snorm{}".format(i+1)] = signed16(Snorm)
101 start += 4
103 # Gain/Status1..N (1 nibble each)
104 for i in range(0, num_channels):
105 GainStatus = packet[start]
106 fields["GainStatus{}".format(i+1)] = int(GainStatus, 16)
107 start += 1
109 # Raw depth/temperature, error, checksum
111 RawDepth = packet[start:start+4]
112 fields["RawDepth"] = signed16(RawDepth)
113 start += 4
115 TempRaw = packet[start:start+2]
116 fields["TempRaw"] = unsigned8(TempRaw)
117 start += 2
119 # 1 byte
120 Error = packet[start:start+2]
121 fields["Error"] = int(Error, 16)
122 start += 2
124 # 1 byte
125 Checksum = packet[start:start+2]
126 fields["Checksum"] = int(Checksum, 16)
127 start += 2
129 return fields
132def extract_from_H_packet(packet: str, num_channels: int) -> Dict[str,int]:
133 """Extract the numeric fields from the raw H packet.
135 Args:
136 packet: A raw H format packet without the 2 byte packet prefix.
137 num_channels: The number of HydroScat channels.
139 Returns:
140 An ordered dictionary of the numeric fields extracted from the H packet.
141 """
142 fields = {}
144 start, Time = extract_time(packet, 2, packet[1] == "T")
145 fields["Time"] = Time
147 # SigOff1..N (16 bits), Ref1..N (16 bits), RefOff1..N (16 bits), Back1..N (16 bits)
149 for i in range(0, num_channels):
150 SigOff = packet[start:start+4]
151 fields["SigOff{}".format(i+1)] = signed16(SigOff)
152 start += 4
154 Ref = packet[start:start+4]
155 fields["Ref{}".format(i+1)] = signed16(Ref)
156 start += 4
158 RefOff = packet[start:start+4]
159 fields["RefOff{}".format(i+1)] = signed16(RefOff)
160 start += 4
162 Back = packet[start:start+2]
163 fields["Back{}".format(i+1)] = signed8(Back)
164 start += 2
166 VsupA = packet[start:start+2]
167 fields["VsupA"] = unsigned8(VsupA)
168 start += 2
170 VsupB = packet[start:start+2]
171 fields["VsupB"] = unsigned8(VsupB)
172 start += 2
174 Vback = packet[start:start+2]
175 fields["Vback"] = unsigned8(Vback)
176 start += 2
178 VAux = packet[start:start+4]
179 fields["VAux"] = signed16(VAux)
180 start += 4
182 # 1 byte
183 Checksum = packet[start:start+2]
184 fields["Checksum"] = int(Checksum, 16)
186 return fields
189def checksum(packet: str) -> int:
190 """Sum all bytes except the packet flag, preceding the checksum byte,
191 returning the value of the least significant byte of the sum.
193 Args:
194 packet: A string corresponding to a raw format packet, NOT ending in CRLF.
196 Returns:
197 Integer value of the least significant byte of the unsigned sum.
198 """
199 return sum([ord(byte) for byte in packet[1:-2]]) & 0xFF
202# Helpers
204# These functions should be used where we have an explicit need
205# for signed/unsigned values vs char, byte, nibble
207def signed8(num_str: str, base=16) -> int:
208 return int(np.int8(int(num_str, base)))
211def unsigned8(num_str: str, base=16) -> int:
212 return int(np.uint8(int(num_str, base)))
215def signed16(num_str: str, base=16) -> int:
216 return int(np.int16(int(num_str, base)))
219def unsigned16(num_str: str, base=16) -> int:
220 return int(np.uint16(int(num_str, base)))
223def signed32(num_str: str, base=16) -> int:
224 return int(np.int32(int(num_str, base)))
227def unsigned32(num_str: str, base=16) -> int:
228 return int(np.uint32(int(num_str, base)))