Coverage for src/aquasense/ramses/ramses.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-20 10:23 +0000

1""" 

2RAMSES sampling class using PyTrios library functionality. 

3 

4G1 devices currently supported and tested. 

5""" 

6 

7import datetime 

8import logging 

9import time 

10 

11from aquasense.common.sensor import SensorBase 

12 

13import pytrios.radman as radiometer_manager 

14 

15from typing import List, TextIO, Tuple 

16 

17class RAMSES(SensorBase): 

18 

19 def __init__(self, 

20 out: TextIO, 

21 sep: str, 

22 port: str, 

23 integration_time: int = 0, 

24 repeats: int = 1, 

25 intra_sample_delay: float = 1, 

26 logger: logging.Logger=None, 

27 verbose: bool=False): 

28 

29 """Initialise RAMSES object. 

30  

31 Args: 

32 out: File-like object to write output to. 

33 sep: Output column separator. 

34 port: serial port name. 

35 integration_time: integration time for sensor data acquisition. 

36 repeats: number of times to repeat sample acquisition; None means forever. 

37 intra_sample_delay: delay between sample acquisitions. 

38 logger: A logger object; defaults to None. 

39 verbose: Verbose mode flag. 

40 """ 

41 devtype = "G1" 

42 in_out = None # PyTrios creates communication channels 

43 SensorBase.__init__(self, in_out, out, sep, logger, verbose) 

44 self.integration_time = integration_time 

45 self.repeats = repeats 

46 self.intra_sample_delay = intra_sample_delay 

47 

48 self.logger.info("Starting {} radiometry manager".format(devtype)) 

49 self.rad_manager = radiometer_manager.TriosManager(port) 

50 

51 

52 def single_sample(self) -> Tuple[datetime.datetime, List[int], List[int], List[int]]: 

53 """Read and return a single sample for each connected device.""" 

54 self.logger.info("Trigger measurement") 

55 

56 trig_time, specs, sids, itimes, _, _, _ = \ 

57 self.rad_manager.sample_all(datetime.datetime.now(), 

58 inttime=self.integration_time) 

59 

60 return trig_time, specs, sids, itimes 

61 

62 

63 def run(self): 

64 """Read samples from one or more RAMSES devices.""" 

65 if self.rad_manager.ready: 

66 if self.repeats is None: 

67 self.logger.info("Starting measurements (press CTRL-C to interrupt)") 

68 else: 

69 self.logger.info("Starting {repeats} measurements (press CTRL-C to interrupt)") 

70 

71 finished = False 

72 while not finished: 

73 try: 

74 trig_time, specs, sids, itimes = self.single_sample() 

75 

76 for i, sid in enumerate(sids): 

77 self.logger.info("Received spectrum from {sid}: {trig_time} | int-time: {itimes[i]} ms | Spectrum: {specs[i][0:3]}...{specs[i][-3::]}") 

78 print("{}\t{}\t{}\t{}\n".format(sid, trig_time.isoformat(), 

79 itimes[i], 

80 ','.join([str(s) for s in specs[i]])), 

81 file=self.out) 

82 

83 if self.repeats > 0: 

84 self.repeats = self.repeats - 1 

85 

86 finished = self.repeats == 0 

87 

88 time.sleep(self.intra_sample_delay) 

89 

90 except KeyboardInterrupt: 

91 finished = True 

92 else: 

93 self.logger.warning("Radiometry manager not ready. Exiting") 

94 

95 self.logger.info("Stopping radiometry manager.") 

96 if self.rad_manager is not None: 

97 self.rad_manager.stop() 

98 self.logger.info("Done.")