Coverage for src/aquasense/common/sensor.py: 83%
44 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
1"""
2Module for common sensor code.
3"""
5from abc import abstractmethod
6from typing import List, TextIO
8import logging
9import re
11class SensorBase(object):
12 """
13 A base class for sensors.
15 Args:
16 in_out: File-like object for command-response I/O (e.g. serial)
17 out: File-like object to write output to.
18 sep: Output column separator; defaults to comma.
19 logger: A logger object; defaults to None.
20 verbose: Verbose mode flag.
21 """
22 def __init__(self,
23 in_out: TextIO,
24 out: TextIO,
25 sep: str,
26 logger: logging.Logger=None,
27 verbose: bool=False):
28 self.in_out = in_out
29 self.out = out
30 self.sep = sep
31 if logger is None: 31 ↛ 35line 31 didn't jump to line 35, because the condition on line 31 was never false
32 logging.basicConfig()
33 self.logger = logging.getLogger(self.__class__.__name__)
34 else:
35 self.logger = logger
36 self.verbose = verbose
37 if self.verbose: 37 ↛ exitline 37 didn't return from function '__init__', because the condition on line 37 was never false
38 self.logger.setLevel(logging.INFO)
41 @abstractmethod
42 def run(self):
43 """Read samples forever or until some some
44 implementation specific end state
45 """
46 raise NotImplementedError
49 def command_response(self, command: str,
50 response_patterns: List[str]=None,
51 eoln_out: str="\r\n") -> List[str]:
52 """Write a command to I/O channel, wait for one of a number of possible expected
53 response regular expressions, return any matched groups.
55 Args:
56 in_out: I/O channel (e.g. serial)
57 command: The command string to be sent.
58 response_patterns: Optional expected response regular expression list.
59 eoln_out: The end of line sequence to be added to commands.
61 Returns:
62 Values matched from response pattern.
64 Raises:
65 IOError: If an I/O error occurs or the expected responses are not received.
66 """
67 if self.verbose: 67 ↛ 70line 67 didn't jump to line 70, because the condition on line 67 was never false
68 self.logger.info(">> {}".format(command))
70 self.in_out.write("{}{}".format(command, eoln_out))
71 self.in_out.flush()
73 matched_values = None
75 if response_patterns is None or len(response_patterns) == 0:
76 self.consume_available(self.in_out)
77 else:
78 if self.verbose: 78 ↛ 80line 78 didn't jump to line 80, because the condition on line 78 was never false
79 self.logger.info("?? {}".format(response_patterns))
80 finished = False
81 patterns = [re.compile(resp_patt) for resp_patt in response_patterns]
82 while not finished:
83 response = self.in_out.readline()
84 if self.verbose and response.strip() != "": 84 ↛ 86line 84 didn't jump to line 86, because the condition on line 84 was never false
85 self.logger.info("<< {}".format(response))
86 for pattern in patterns: 86 ↛ 82line 86 didn't jump to line 82, because the loop on line 86 didn't complete
87 matcher = pattern.search(response.rstrip())
88 if matcher is not None: 88 ↛ 86line 88 didn't jump to line 86, because the condition on line 88 was never false
89 finished = True
90 matched_values = matcher.groups()
91 break
93 return matched_values
96 def consume_available(self, in_out: TextIO):
97 """Consume any incoming bytes waiting on serial port
98 before beginning.
100 Args:
101 in_out: I/O channel (file, serial port)
102 """
103 while len(in_out.readlines()) > 0: 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true
104 pass