Coverage for src/aquasense/ramses/driver.py: 0%
46 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
1"""
2RAMSES I/O driver for integration testing.
3"""
5import argparse
6import os
7import sys
9from aquasense.common.arghelp import less_than_zero_check, usage
10from aquasense.ramses.ramses import RAMSES
13INTEGRATION_TIMES = [0] + [2**i for i in range(3, 14)]
16def create_arg_parser() -> argparse.ArgumentParser:
17 """
18 Create and return the command-line parser.
19 """
20 parser = argparse.ArgumentParser(
21 description="Read RAMSES data from a serial port.")
23 parser.add_argument("--port", "-p",
24 dest="port",
25 type=str,
26 default=None,
27 help="Serial port name")
29 parser.add_argument("--output-path", "-o",
30 dest="out_path",
31 type=str,
32 default=None,
33 help="Output file path")
35 parser.add_argument("--output-field-delimiter", "-d",
36 dest="output_field_delimiter",
37 type=str,
38 default=",",
39 help="Delimiter for fields in output rows (default: ,)")
41 parser.add_argument("--integration-time", "-i",
42 dest="integration_time",
43 type=int,
44 default=0,
45 help="Integration time in ms (default: 0 [auto]); can be one of "
46 "{}".format(", ".join([str(n) for n in INTEGRATION_TIMES])))
48 parser.add_argument("--repeats", "-r",
49 dest="repeats",
50 type=int,
51 default=1,
52 help="Number of sampling repeats (default: 1)")
54 parser.add_argument("--intra-sample-delay", "-s",
55 dest="intra_sample_delay",
56 type=float,
57 default=1,
58 help="Delay between sampling in secs (default: 1)")
60 parser.add_argument("--verbose", "-v",
61 dest="verbose",
62 default=False,
63 action="store_true",
64 help="Verbose output mode")
67 return parser
70def check_args(parser: argparse.ArgumentParser, args: argparse.Namespace):
71 """
72 Check args and exit if any problem is found.
74 Args:
75 parser: Command-line parser
76 args: Command-line options
78 Returns:
79 Whether or not the source is serial.
80 """
81 msgs = []
83 if args.port is None:
84 msgs.append("serial port name required")
86 if args.out_path is not None and \
87 os.path.dirname(args.out_path) != "" and \
88 not os.path.exists(os.path.dirname(args.out_path)):
89 msgs.append("output file directory does not exist")
91 if args.integration_time not in INTEGRATION_TIMES:
92 msgs.append("integration time must be one of {}".\
93 format(", ".join([str(n) for n in INTEGRATION_TIMES])))
95 less_than_zero_check(args.repeats, "repeats", msgs)
96 less_than_zero_check(args.intra_sample_delay, "intra sample delay", msgs)
98 if len(msgs) != 0:
99 usage(parser, msgs)
102def main():
103 try:
104 parser = create_arg_parser()
105 args = parser.parse_args()
107 check_args(parser, args)
109 if args.out_path is None:
110 out = sys.stdout
111 else:
112 out = open(args.out_path, "w")
114 ramses = RAMSES(out=out,
115 sep=args.output_field_delimiter,
116 port=args.port,
117 integration_time=args.integration_time,
118 repeats=args.repeats,
119 intra_sample_delay=args.intra_sample_delay,
120 logger=None,
121 verbose=args.verbose)
123 ramses.run()
125 except KeyboardInterrupt:
126 print("** keyboard interrupt", file=sys.stderr)
127 sys.exit(1)
128 except Exception as e:
129 print("** exiting with error: {}".format(str(e)), file=sys.stderr)
130 sys.exit(2)
133if __name__ == "__main__":
134 main()