Coverage for src/aquasense/ramses/ramses.py: 0%
43 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-20 10:23 +0000
1"""
2RAMSES sampling class using PyTrios library functionality.
4G1 devices currently supported and tested.
5"""
7import datetime
8import logging
9import time
11from aquasense.common.sensor import SensorBase
13import pytrios.radman as radiometer_manager
15from typing import List, TextIO, Tuple
17class RAMSES(SensorBase):
19 def __init__(self,
20 out: TextIO,
21 sep: str,
22 port: str,
23 integration_time: int = 0,
24 repeats: int = 1,
25 intra_sample_delay: float = 1,
26 logger: logging.Logger=None,
27 verbose: bool=False):
29 """Initialise RAMSES object.
31 Args:
32 out: File-like object to write output to.
33 sep: Output column separator.
34 port: serial port name.
35 integration_time: integration time for sensor data acquisition.
36 repeats: number of times to repeat sample acquisition; None means forever.
37 intra_sample_delay: delay between sample acquisitions.
38 logger: A logger object; defaults to None.
39 verbose: Verbose mode flag.
40 """
41 devtype = "G1"
42 in_out = None # PyTrios creates communication channels
43 SensorBase.__init__(self, in_out, out, sep, logger, verbose)
44 self.integration_time = integration_time
45 self.repeats = repeats
46 self.intra_sample_delay = intra_sample_delay
48 self.logger.info("Starting {} radiometry manager".format(devtype))
49 self.rad_manager = radiometer_manager.TriosManager(port)
52 def single_sample(self) -> Tuple[datetime.datetime, List[int], List[int], List[int]]:
53 """Read and return a single sample for each connected device."""
54 self.logger.info("Trigger measurement")
56 trig_time, specs, sids, itimes, _, _, _ = \
57 self.rad_manager.sample_all(datetime.datetime.now(),
58 inttime=self.integration_time)
60 return trig_time, specs, sids, itimes
63 def run(self):
64 """Read samples from one or more RAMSES devices."""
65 if self.rad_manager.ready:
66 if self.repeats is None:
67 self.logger.info("Starting measurements (press CTRL-C to interrupt)")
68 else:
69 self.logger.info("Starting {repeats} measurements (press CTRL-C to interrupt)")
71 finished = False
72 while not finished:
73 try:
74 trig_time, specs, sids, itimes = self.single_sample()
76 for i, sid in enumerate(sids):
77 self.logger.info("Received spectrum from {sid}: {trig_time} | int-time: {itimes[i]} ms | Spectrum: {specs[i][0:3]}...{specs[i][-3::]}")
78 print("{}\t{}\t{}\t{}\n".format(sid, trig_time.isoformat(),
79 itimes[i],
80 ','.join([str(s) for s in specs[i]])),
81 file=self.out)
83 if self.repeats > 0:
84 self.repeats = self.repeats - 1
86 finished = self.repeats == 0
88 time.sleep(self.intra_sample_delay)
90 except KeyboardInterrupt:
91 finished = True
92 else:
93 self.logger.warning("Radiometry manager not ready. Exiting")
95 self.logger.info("Stopping radiometry manager.")
96 if self.rad_manager is not None:
97 self.rad_manager.stop()
98 self.logger.info("Done.")