Coverage for src/aquasense/common/sensor.py: 83%

44 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-20 10:23 +0000

1""" 

2Module for common sensor code. 

3""" 

4 

5from abc import abstractmethod 

6from typing import List, TextIO 

7 

8import logging 

9import re 

10 

11class SensorBase(object): 

12 """ 

13 A base class for sensors. 

14 

15 Args: 

16 in_out: File-like object for command-response I/O (e.g. serial) 

17 out: File-like object to write output to. 

18 sep: Output column separator; defaults to comma. 

19 logger: A logger object; defaults to None. 

20 verbose: Verbose mode flag. 

21 """ 

22 def __init__(self, 

23 in_out: TextIO, 

24 out: TextIO, 

25 sep: str, 

26 logger: logging.Logger=None, 

27 verbose: bool=False): 

28 self.in_out = in_out 

29 self.out = out 

30 self.sep = sep 

31 if logger is None: 31 ↛ 35line 31 didn't jump to line 35, because the condition on line 31 was never false

32 logging.basicConfig() 

33 self.logger = logging.getLogger(self.__class__.__name__) 

34 else: 

35 self.logger = logger 

36 self.verbose = verbose 

37 if self.verbose: 37 ↛ exitline 37 didn't return from function '__init__', because the condition on line 37 was never false

38 self.logger.setLevel(logging.INFO) 

39 

40 

41 @abstractmethod 

42 def run(self): 

43 """Read samples forever or until some some 

44 implementation specific end state 

45 """ 

46 raise NotImplementedError 

47 

48 

49 def command_response(self, command: str, 

50 response_patterns: List[str]=None, 

51 eoln_out: str="\r\n") -> List[str]: 

52 """Write a command to I/O channel, wait for one of a number of possible expected 

53 response regular expressions, return any matched groups. 

54  

55 Args: 

56 in_out: I/O channel (e.g. serial) 

57 command: The command string to be sent. 

58 response_patterns: Optional expected response regular expression list. 

59 eoln_out: The end of line sequence to be added to commands. 

60 

61 Returns: 

62 Values matched from response pattern. 

63 

64 Raises: 

65 IOError: If an I/O error occurs or the expected responses are not received. 

66 """ 

67 if self.verbose: 67 ↛ 70line 67 didn't jump to line 70, because the condition on line 67 was never false

68 self.logger.info(">> {}".format(command)) 

69 

70 self.in_out.write("{}{}".format(command, eoln_out)) 

71 self.in_out.flush() 

72 

73 matched_values = None 

74 

75 if response_patterns is None or len(response_patterns) == 0: 

76 self.consume_available(self.in_out) 

77 else: 

78 if self.verbose: 78 ↛ 80line 78 didn't jump to line 80, because the condition on line 78 was never false

79 self.logger.info("?? {}".format(response_patterns)) 

80 finished = False 

81 patterns = [re.compile(resp_patt) for resp_patt in response_patterns] 

82 while not finished: 

83 response = self.in_out.readline() 

84 if self.verbose and response.strip() != "": 84 ↛ 86line 84 didn't jump to line 86, because the condition on line 84 was never false

85 self.logger.info("<< {}".format(response)) 

86 for pattern in patterns: 86 ↛ 82line 86 didn't jump to line 82, because the loop on line 86 didn't complete

87 matcher = pattern.search(response.rstrip()) 

88 if matcher is not None: 88 ↛ 86line 88 didn't jump to line 86, because the condition on line 88 was never false

89 finished = True 

90 matched_values = matcher.groups() 

91 break 

92 

93 return matched_values 

94 

95 

96 def consume_available(self, in_out: TextIO): 

97 """Consume any incoming bytes waiting on serial port 

98 before beginning. 

99 

100 Args: 

101 in_out: I/O channel (file, serial port) 

102 """ 

103 while len(in_out.readlines()) > 0: 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true

104 pass