examples/a121/plot.py#
1# Copyright (c) Acconeer AB, 2022-2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8import acconeer.exptool as et
9from acconeer.exptool import a121
10
11
12def main():
13 args = a121.ExampleArgumentParser().parse_args()
14 et.utils.config_logging(args)
15
16 client = a121.Client.open(**a121.get_client_args(args))
17
18 session_config = a121.SessionConfig(
19 [
20 {
21 1: a121.SensorConfig(
22 subsweeps=[
23 a121.SubsweepConfig(
24 start_point=25,
25 step_length=2,
26 num_points=30,
27 profile=a121.Profile.PROFILE_1,
28 hwaas=10,
29 ),
30 a121.SubsweepConfig(
31 start_point=75,
32 step_length=4,
33 num_points=25,
34 profile=a121.Profile.PROFILE_3,
35 hwaas=20,
36 ),
37 ],
38 )
39 },
40 {
41 1: a121.SensorConfig(
42 receiver_gain=20,
43 ),
44 },
45 ],
46 extended=True,
47 )
48
49 extended_metadata = client.setup_session(session_config)
50
51 pg_updater = PGUpdater(session_config, extended_metadata)
52 pg_process = et.PGProcess(pg_updater)
53 pg_process.start()
54
55 client.start_session()
56
57 interrupt_handler = et.utils.ExampleInterruptHandler()
58 print("Press Ctrl-C to end session")
59
60 while not interrupt_handler.got_signal:
61 extended_result = client.get_next()
62
63 try:
64 pg_process.put_data(extended_result)
65 except et.PGProccessDiedException:
66 break
67
68 print("Disconnecting...")
69 pg_process.close()
70 client.close()
71
72
73class PGUpdater:
74 def __init__(
75 self,
76 session_config: a121.SessionConfig,
77 extended_metadata: list[dict[int, a121.Metadata]],
78 ) -> None:
79 self.session_config = session_config
80 self.extended_metadata = extended_metadata
81
82 def setup(self, win):
83 self.all_plots = []
84 self.all_curves = []
85 self.all_smooth_maxs = []
86
87 for group_idx, group in enumerate(self.session_config.groups):
88 group_plots = {}
89 group_curves = {}
90 group_smooth_maxs = {}
91
92 for sensor_id, sensor_config in group.items():
93 title = f"Group {group_idx} / Sensor {sensor_id}"
94 plot = win.addPlot(title=title)
95 plot.setMenuEnabled(False)
96 plot.setMouseEnabled(x=False, y=False)
97 plot.hideButtons()
98 plot.showGrid(x=True, y=True)
99
100 plot.setLabel("bottom", "Depth (m)")
101 plot.setLabel("left", "Amplitude")
102
103 curves = []
104 for i in range(sensor_config.num_subsweeps):
105 curve = plot.plot(pen=et.utils.pg_pen_cycler(i))
106 curves.append(curve)
107
108 group_plots[sensor_id] = plot
109 group_curves[sensor_id] = curves
110
111 smooth_max = et.utils.SmoothMax(self.session_config.update_rate)
112 group_smooth_maxs[sensor_id] = smooth_max
113
114 self.all_plots.append(group_plots)
115 self.all_curves.append(group_curves)
116 self.all_smooth_maxs.append(group_smooth_maxs)
117
118 def update(self, extended_result: list[dict[int, a121.Result]]) -> None:
119 for group_idx, group in enumerate(extended_result):
120 for sensor_id, result in group.items():
121 plot = self.all_plots[group_idx][sensor_id]
122 curves = self.all_curves[group_idx][sensor_id]
123
124 max_ = 0
125
126 for sub_idx, subframe in enumerate(result.subframes):
127 x = get_distances_m(
128 self.session_config.groups[group_idx][sensor_id].subsweeps[sub_idx]
129 )
130 y = np.abs(subframe).mean(axis=0)
131 curves[sub_idx].setData(x, y)
132
133 max_ = max(max_, np.max(y))
134
135 smooth_max = self.all_smooth_maxs[group_idx][sensor_id]
136 plot.setYRange(0, smooth_max.update(max_))
137
138
139def get_distances_m(config):
140 range_p = np.arange(config.num_points) * config.step_length + config.start_point
141 return range_p * 2.5e-3
142
143
144if __name__ == "__main__":
145 main()
View this example on GitHub: acconeer/acconeer-python-exploration