examples/a121/algo/bilateration/bilaterator.py#
1# Copyright (c) Acconeer AB, 2023-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10
11import pyqtgraph as pg
12
13import acconeer.exptool as et
14from acconeer.exptool import a121
15from acconeer.exptool.a121.algo.bilateration import Processor, ProcessorConfig
16from acconeer.exptool.a121.algo.distance import Detector, DetectorConfig, ThresholdMethod
17
18
19_SENSOR_IDS = [2, 3]
20
21
22def main():
23 args = a121.ExampleArgumentParser().parse_args()
24 et.utils.config_logging(args)
25
26 client = a121.Client.open(**a121.get_client_args(args))
27
28 detector_config = DetectorConfig(
29 start_m=0.25,
30 end_m=1.0,
31 signal_quality=25.0,
32 max_profile=a121.Profile.PROFILE_1,
33 max_step_length=2,
34 threshold_method=ThresholdMethod.CFAR,
35 )
36
37 detector = Detector(client=client, sensor_ids=_SENSOR_IDS, detector_config=detector_config)
38
39 session_config = detector.session_config
40
41 processor_config = ProcessorConfig()
42
43 processor = Processor(
44 session_config=session_config, processor_config=processor_config, sensor_ids=_SENSOR_IDS
45 )
46
47 detector.calibrate_detector()
48
49 detector.start()
50
51 pg_updater = PlotPlugin(detector_config=detector_config, bilateration_config=processor_config)
52 pg_process = et.PGProcess(pg_updater)
53 pg_process.start()
54
55 interrupt_handler = et.utils.ExampleInterruptHandler()
56 print("Press Ctrl-C to end session")
57
58 while not interrupt_handler.got_signal:
59 detector_result = detector.get_next()
60 processor_result = processor.process(detector_result)
61
62 try:
63 pg_process.put_data(
64 {"detector_result": detector_result, "processor_result": processor_result}
65 )
66 except et.PGProccessDiedException:
67 break
68
69 detector.stop()
70
71 print("Disconnecting...")
72 client.close()
73
74
75class PlotPlugin:
76 _NUM_POINTS_ON_CIRCLE = 100
77
78 def __init__(
79 self,
80 bilateration_config: ProcessorConfig,
81 detector_config: DetectorConfig,
82 ) -> None:
83 self.num_curves = 5
84 self.detector_config = detector_config
85 self.sensor_half_spacing_m = bilateration_config.sensor_spacing_m / 2
86
87 def setup(self, win):
88 # Define sweep plot.
89 self.sweep_plot = win.addPlot(row=0, col=0)
90 self.sweep_plot.setMenuEnabled(False)
91 self.sweep_plot.showGrid(x=True, y=True)
92 self.sweep_plot.addLegend()
93 self.sweep_plot.setLabel("left", "Amplitude")
94 self.sweep_plot.setLabel("bottom", "Distance (m)")
95 self.sweep_plot.addItem(pg.PlotDataItem())
96
97 pen_sweep_0 = et.utils.pg_pen_cycler(0)
98 pen_sweep_1 = et.utils.pg_pen_cycler(1)
99
100 # Add sweep curves for the two sensors.
101 feat_kw_1 = dict(pen=pen_sweep_0)
102 feat_kw_2 = dict(pen=pen_sweep_1)
103 self.sweep_curves_1 = [self.sweep_plot.plot(**feat_kw_1) for _ in range(self.num_curves)]
104 self.sweep_curves_2 = [self.sweep_plot.plot(**feat_kw_2) for _ in range(self.num_curves)]
105
106 pen_sweep_0 = et.utils.pg_pen_cycler(0, "--")
107 pen_sweep_1 = et.utils.pg_pen_cycler(1, "--")
108
109 # Add threshold curves for the two sensors.
110 feat_kw_1 = dict(pen=pen_sweep_0)
111 feat_kw_2 = dict(pen=pen_sweep_1)
112 self.threshold_curves_1 = [
113 self.sweep_plot.plot(**feat_kw_1) for _ in range(self.num_curves)
114 ]
115 self.threshold_curves_2 = [
116 self.sweep_plot.plot(**feat_kw_2) for _ in range(self.num_curves)
117 ]
118
119 # Add legends.
120 sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
121 sweep_plot_legend.setParentItem(self.sweep_plot)
122 sweep_plot_legend.addItem(self.sweep_curves_1[0], "Sweep - Left sensor")
123 sweep_plot_legend.addItem(self.threshold_curves_1[0], "Threshold - Left sensor")
124 sweep_plot_legend.addItem(self.sweep_curves_2[0], "Sweep - Right sensor")
125 sweep_plot_legend.addItem(self.threshold_curves_2[0], "Threshold - Right sensor")
126
127 self.sweep_smooth_max = et.utils.SmoothMax()
128
129 # Define obstacle plot.
130 self.obstacle_location_plot = win.addPlot(row=1, col=0)
131 self.obstacle_location_plot.setMenuEnabled(False)
132 self.obstacle_location_plot.setAspectLocked()
133 self.obstacle_location_plot.showGrid(x=True, y=True)
134 self.obstacle_location_plot.addLegend()
135 self.obstacle_location_plot.setLabel("left", "Y (m)")
136 self.obstacle_location_plot.setLabel("bottom", "X (m)")
137 self.obstacle_location_plot.addItem(pg.PlotDataItem())
138 self.obstacle_location_plot.setXRange(
139 -self.detector_config.end_m, self.detector_config.end_m
140 )
141 self.obstacle_location_plot.setYRange(0.0, self.detector_config.end_m)
142
143 pen_sweep_0 = et.utils.pg_pen_cycler(0)
144 brush_sweep_0 = et.utils.pg_brush_cycler(0)
145 symbol_kw = dict(symbol="o", symbolSize=10, symbolBrush=brush_sweep_0, symbolPen=None)
146 feat_kw = dict(pen=None, **symbol_kw)
147 self.obstacle_location_curve = self.obstacle_location_plot.plot(**feat_kw)
148 feat_kw = dict(pen=pen_sweep_0)
149 self.obstacle_location_half_curve = [
150 self.obstacle_location_plot.plot(**feat_kw) for _ in range(Processor._MAX_NUM_OBJECTS)
151 ]
152
153 def update(self, data) -> None:
154 detector_result = data["detector_result"]
155 processor_result = data["processor_result"]
156
157 # Plot sweep data from both distance detectors.
158 max_val = 0.0
159 for distance_detector_result, sweep_curves, threshold_curves in zip(
160 detector_result.values(),
161 [self.sweep_curves_1, self.sweep_curves_2],
162 [self.threshold_curves_1, self.threshold_curves_2],
163 ):
164 for idx, distance_processor_result in enumerate(
165 distance_detector_result.processor_results
166 ):
167 abs_sweep = distance_processor_result.extra_result.abs_sweep
168 threshold = distance_processor_result.extra_result.used_threshold
169 distances_m = distance_processor_result.extra_result.distances_m
170
171 assert abs_sweep is not None
172 assert threshold is not None
173 assert distances_m is not None
174
175 sweep_curves[idx].setData(distances_m, abs_sweep)
176 threshold_curves[idx].setData(distances_m, threshold)
177
178 if max_val < np.max(abs_sweep):
179 max_val = float(np.max(abs_sweep))
180
181 if max_val < np.max(threshold):
182 max_val = float(np.max(threshold))
183
184 if max_val != 0.0:
185 self.sweep_plot.setYRange(0.0, self.sweep_smooth_max.update(max_val))
186
187 # Plot result from bilateration processor.
188 # Start with the points.
189 points = processor_result.points
190 xs = [point.x_coord for point in points]
191 ys = [point.y_coord for point in points]
192 self.obstacle_location_curve.setData(xs, ys)
193
194 # Plot objects without counter part.
195 objects_without_counterpart = processor_result.objects_without_counterpart
196 for i, o in enumerate(objects_without_counterpart):
197 x = np.cos(np.linspace(0, np.pi, self._NUM_POINTS_ON_CIRCLE)) * o.distance
198 y = np.sin(np.linspace(0, np.pi, self._NUM_POINTS_ON_CIRCLE)) * o.distance
199
200 # Offset circle to center around the sensor that detected the object.
201 if o.sensor_position == Processor._SENSOR_POSITION_LEFT:
202 x -= self.sensor_half_spacing_m
203 elif o.sensor_position == Processor._SENSOR_POSITION_RIGHT:
204 x += self.sensor_half_spacing_m
205 else:
206 msg = "Invalid sensor position."
207 raise ValueError(msg)
208
209 self.obstacle_location_half_curve[i].setData(x, y)
210
211 # Remove curves that does not have an object to visualize.
212 for i in range(len(objects_without_counterpart), Processor._MAX_NUM_OBJECTS):
213 self.obstacle_location_half_curve[i].setData([], [])
214
215
216if __name__ == "__main__":
217 main()
View this example on GitHub: acconeer/acconeer-python-exploration