Coverage for src/aquasense/ramses/driver.py: 0%

46 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-20 10:23 +0000

1""" 

2RAMSES I/O driver for integration testing. 

3""" 

4 

5import argparse 

6import os 

7import sys 

8 

9from aquasense.common.arghelp import less_than_zero_check, usage 

10from aquasense.ramses.ramses import RAMSES 

11 

12 

13INTEGRATION_TIMES = [0] + [2**i for i in range(3, 14)] 

14 

15 

16def create_arg_parser() -> argparse.ArgumentParser: 

17 """ 

18 Create and return the command-line parser. 

19 """ 

20 parser = argparse.ArgumentParser( 

21 description="Read RAMSES data from a serial port.") 

22 

23 parser.add_argument("--port", "-p", 

24 dest="port", 

25 type=str, 

26 default=None, 

27 help="Serial port name") 

28 

29 parser.add_argument("--output-path", "-o", 

30 dest="out_path", 

31 type=str, 

32 default=None, 

33 help="Output file path") 

34 

35 parser.add_argument("--output-field-delimiter", "-d", 

36 dest="output_field_delimiter", 

37 type=str, 

38 default=",", 

39 help="Delimiter for fields in output rows (default: ,)") 

40 

41 parser.add_argument("--integration-time", "-i", 

42 dest="integration_time", 

43 type=int, 

44 default=0, 

45 help="Integration time in ms (default: 0 [auto]); can be one of " 

46 "{}".format(", ".join([str(n) for n in INTEGRATION_TIMES]))) 

47 

48 parser.add_argument("--repeats", "-r", 

49 dest="repeats", 

50 type=int, 

51 default=1, 

52 help="Number of sampling repeats (default: 1)") 

53 

54 parser.add_argument("--intra-sample-delay", "-s", 

55 dest="intra_sample_delay", 

56 type=float, 

57 default=1, 

58 help="Delay between sampling in secs (default: 1)") 

59 

60 parser.add_argument("--verbose", "-v", 

61 dest="verbose", 

62 default=False, 

63 action="store_true", 

64 help="Verbose output mode") 

65 

66 

67 return parser 

68 

69 

70def check_args(parser: argparse.ArgumentParser, args: argparse.Namespace): 

71 """ 

72 Check args and exit if any problem is found. 

73 

74 Args: 

75 parser: Command-line parser 

76 args: Command-line options 

77  

78 Returns: 

79 Whether or not the source is serial. 

80 """ 

81 msgs = [] 

82 

83 if args.port is None: 

84 msgs.append("serial port name required") 

85 

86 if args.out_path is not None and \ 

87 os.path.dirname(args.out_path) != "" and \ 

88 not os.path.exists(os.path.dirname(args.out_path)): 

89 msgs.append("output file directory does not exist") 

90 

91 if args.integration_time not in INTEGRATION_TIMES: 

92 msgs.append("integration time must be one of {}".\ 

93 format(", ".join([str(n) for n in INTEGRATION_TIMES]))) 

94 

95 less_than_zero_check(args.repeats, "repeats", msgs) 

96 less_than_zero_check(args.intra_sample_delay, "intra sample delay", msgs) 

97 

98 if len(msgs) != 0: 

99 usage(parser, msgs) 

100 

101 

102def main(): 

103 try: 

104 parser = create_arg_parser() 

105 args = parser.parse_args() 

106 

107 check_args(parser, args) 

108 

109 if args.out_path is None: 

110 out = sys.stdout 

111 else: 

112 out = open(args.out_path, "w") 

113 

114 ramses = RAMSES(out=out, 

115 sep=args.output_field_delimiter, 

116 port=args.port, 

117 integration_time=args.integration_time, 

118 repeats=args.repeats, 

119 intra_sample_delay=args.intra_sample_delay, 

120 logger=None, 

121 verbose=args.verbose) 

122 

123 ramses.run() 

124 

125 except KeyboardInterrupt: 

126 print("** keyboard interrupt", file=sys.stderr) 

127 sys.exit(1) 

128 except Exception as e: 

129 print("** exiting with error: {}".format(str(e)), file=sys.stderr) 

130 sys.exit(2) 

131 

132 

133if __name__ == "__main__": 

134 main()