examples/a121/algo/obstacle/detector.py#
1# Copyright (c) Acconeer AB, 2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9# import PySide6 # noqa: F401
10# from PySide6.QtWidgets import QLabel, QPushButton, QVBoxLayout, QWidget
11from PySide6.QtGui import QTransform
12
13import pyqtgraph as pg
14
15import acconeer.exptool as et
16from acconeer.exptool import a121
17from acconeer.exptool.a121.algo.obstacle import Detector, DetectorConfig, DetectorResult
18
19
20SENSOR_IDS = [2, 3]
21
22
23def main():
24 args = a121.ExampleArgumentParser().parse_args()
25 et.utils.config_logging(args)
26
27 client = a121.Client.open(**a121.get_client_args(args))
28
29 # Creating a list of subsweep configurations.
30 subsweep_configurations = [
31 a121.SubsweepConfig(
32 start_point=24, # ~6 cm
33 num_points=96,
34 step_length=1,
35 profile=a121.Profile.PROFILE_1,
36 hwaas=4,
37 ),
38 a121.SubsweepConfig(
39 start_point=96, # ~24 cm
40 num_points=48,
41 step_length=4,
42 profile=a121.Profile.PROFILE_3,
43 hwaas=16,
44 ),
45 a121.SubsweepConfig(
46 start_point=300, # ~75 cm
47 num_points=48,
48 step_length=4,
49 profile=a121.Profile.PROFILE_3,
50 hwaas=2,
51 ),
52 ]
53
54 # Configure the detector using multiple subsweeps
55 detector_config = DetectorConfig(
56 enable_bilateration=(len(SENSOR_IDS) == 2),
57 bilateration_sensor_spacing_m=0.1,
58 num_mean_threshold=1.5,
59 num_std_threshold=4,
60 subsweep_configurations=subsweep_configurations,
61 )
62
63 detector = Detector(client=client, sensor_ids=SENSOR_IDS, detector_config=detector_config)
64
65 detector.calibrate_detector()
66 detector.start()
67
68 pg_updater = PGUpdater(
69 num_sensors=len(SENSOR_IDS),
70 num_subsweeps=len(subsweep_configurations),
71 enable_bilateration=detector_config.enable_bilateration,
72 )
73
74 pg_process = et.PGProcess(pg_updater)
75 pg_process.start()
76
77 interrupt_handler = et.utils.ExampleInterruptHandler()
78 print("Press Ctrl-C to end session")
79
80 while not interrupt_handler.got_signal:
81 # v_current = get_speed_from_some_Robot_API(), e.g ROS?
82 # detector.update_robot_speed(v_current)
83
84 detector_result = detector.get_next()
85
86 try:
87 pg_process.put_data(detector_result)
88 except et.PGProccessDiedException:
89 break
90
91 detector.stop()
92
93 print("Disconnecting...")
94 client.close()
95
96
97PLOT_HISTORY_FRAMES = 50
98PLOT_THRESHOLDS = True
99
100
101class PGUpdater:
102 def __init__(self, num_sensors, num_subsweeps, enable_bilateration):
103 self.num_sensors = num_sensors
104 self.num_subsweeps = num_subsweeps
105 self.enable_bilateration = enable_bilateration
106 self.obst_vel_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
107 self.obst_dist_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
108 self.obst_bil_ys = np.nan * np.ones(PLOT_HISTORY_FRAMES)
109 self.hist_x = np.linspace(-100, 0, PLOT_HISTORY_FRAMES)
110
111 def setup(self, win: pg.GraphicsLayout):
112 self.fftmap_plots: list[pg.PlotItem] = []
113 self.fftmap_images: list[pg.ImageItem] = []
114 self.range_hist_curves: list[pg.PlotDataItem] = []
115 self.angle_hist_curves: list[pg.PlotDataItem] = []
116
117 if PLOT_THRESHOLDS:
118 self.bin0_curves: list[pg.PlotDataItem] = []
119 self.bin0_threshold_curves: list[pg.PlotDataItem] = []
120 self.other_bins_curves: list[pg.PlotDataItem] = []
121 self.other_bins_threshold_curves: list[pg.PlotDataItem] = []
122
123 row_offset = 2 if PLOT_THRESHOLDS else 0
124
125 for i_s in range(self.num_sensors):
126 for i_ss in range(self.num_subsweeps):
127 col = i_s * self.num_subsweeps + i_ss
128 p = win.addPlot(
129 row=0, col=col, title=f"FFT Map, sensor {SENSOR_IDS[i_s]}, subsweep {i_ss}"
130 )
131 im = pg.ImageItem(autoDownsample=True)
132 im.setLookupTable(et.utils.pg_mpl_cmap("viridis"))
133 self.fftmap_images.append(im)
134
135 p.setLabel("bottom", "Distance (cm)")
136 p.setLabel("left", "Velocity (cm/s)")
137 p.addItem(im)
138
139 self.fftmap_plots.append(p)
140
141 if PLOT_THRESHOLDS:
142 self.bin0 = pg.PlotItem(title="Zeroth velocity/angle bin")
143 self.bin0.showGrid(x=True, y=True)
144 self.bin0.setLabel("bottom", "Range (cm)")
145 self.bin0.setLabel("left", "Amplitude")
146 self.bin0.addLegend()
147
148 pen = et.utils.pg_pen_cycler(0)
149 brush = et.utils.pg_brush_cycler(0)
150 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
151 feat_kw = dict(pen=pen, **symbol_kw)
152 self.bin0_curves += [self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)]
153
154 pen = et.utils.pg_pen_cycler(1)
155 brush = et.utils.pg_brush_cycler(1)
156 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
157 feat_kw = dict(pen=pen, **symbol_kw)
158 self.bin0_threshold_curves += [
159 self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)
160 ]
161
162 bin0_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
163 bin0_plot_legend.setParentItem(self.bin0)
164 bin0_plot_legend.addItem(self.bin0_curves[0], "Sweep")
165 bin0_plot_legend.addItem(self.bin0_threshold_curves[0], "Threshold")
166
167 sublayout = win.addLayout(
168 row=1,
169 col=i_s * self.num_subsweeps,
170 colspan=self.num_subsweeps,
171 )
172 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
173 sublayout.addItem(self.bin0, row=0, col=0)
174
175 self.other_bins = pg.PlotItem(title="Other velocity/angle bins")
176 self.other_bins.showGrid(x=True, y=True)
177 self.other_bins.setLabel("bottom", "Range (cm)")
178 self.other_bins.setLabel("left", "Amplitude")
179 self.other_bins.addLegend()
180
181 pen = et.utils.pg_pen_cycler(0)
182 brush = et.utils.pg_brush_cycler(0)
183 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
184 feat_kw = dict(pen=pen, **symbol_kw)
185 self.other_bins_curves += [
186 self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
187 ]
188
189 pen = et.utils.pg_pen_cycler(1)
190 brush = et.utils.pg_brush_cycler(1)
191 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
192 feat_kw = dict(pen=pen, **symbol_kw)
193 self.other_bins_threshold_curves += [
194 self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
195 ]
196
197 other_bins_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
198 other_bins_plot_legend.setParentItem(self.other_bins)
199 other_bins_plot_legend.addItem(self.other_bins_curves[0], "Max Sweep")
200 other_bins_plot_legend.addItem(self.other_bins_threshold_curves[0], "Threshold")
201
202 sublayout = win.addLayout(
203 row=2,
204 col=i_s * self.num_subsweeps,
205 colspan=self.num_subsweeps,
206 )
207 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
208 sublayout.addItem(self.other_bins, row=0, col=0)
209
210 self.angle_hist = pg.PlotItem(title="Angle/velocity history")
211 self.angle_hist.showGrid(x=True, y=True)
212 self.angle_hist.setLabel("bottom", "Time (frames)")
213 self.angle_hist.setLabel("left", "velocity (cm/s)")
214 self.angle_hist.setXRange(-100, 0)
215 self.angle_hist.addLegend()
216 self.angle_hist_curves.append(self.angle_hist.plot(symbolSize=5, symbol="o"))
217
218 sublayout = win.addLayout(
219 row=1 + row_offset,
220 col=i_s * self.num_subsweeps,
221 colspan=self.num_subsweeps,
222 )
223
224 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
225 sublayout.addItem(self.angle_hist, row=0, col=0)
226
227 self.range_hist = pg.PlotItem(title="Range history")
228 self.range_hist.showGrid(x=True, y=True)
229 self.range_hist.setLabel("bottom", "Time (frames)")
230 self.range_hist.setLabel("left", "Range (cm)")
231 self.range_hist.setXRange(-100, 0)
232 self.range_hist.addLegend()
233 self.range_hist_curves.append(self.range_hist.plot(symbolSize=5, symbol="o"))
234
235 sublayout = win.addLayout(
236 row=2 + row_offset,
237 col=i_s * self.num_subsweeps,
238 colspan=self.num_subsweeps,
239 )
240
241 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
242 sublayout.addItem(self.range_hist, row=0, col=0)
243
244 if self.enable_bilateration:
245 self.bil_hist_plot = pg.PlotItem(title="Bilateration history")
246
247 self.bil_hist_plot.showGrid(x=True, y=True)
248 self.bil_hist_plot.setLabel("bottom", "Time (frames)")
249 self.bil_hist_plot.setLabel("left", "Bilateration angle (deg)")
250 self.bil_hist_plot.setXRange(-100, 0)
251 self.bil_hist_plot.setYRange(-90, 90)
252 self.bil_hist_plot.addLegend()
253
254 self.bil_hist_curve = self.bil_hist_plot.plot(pen=et.utils.pg_pen_cycler(1))
255
256 sublayout = win.addLayout(row=3 + row_offset, col=0, colspan=2 * self.num_subsweeps)
257 sublayout.layout.setColumnStretchFactor(0, 2 * self.num_subsweeps)
258 sublayout.addItem(self.bil_hist_plot, row=0, col=0)
259
260 def update(self, detector_result: DetectorResult):
261 for i_s in range(self.num_sensors):
262 pr = detector_result.processor_results[SENSOR_IDS[i_s]]
263
264 for i_ss in range(self.num_subsweeps):
265 curve_idx = self.num_subsweeps * i_s + i_ss
266
267 fftmap = pr.subsweeps_extra_results[i_ss].fft_map
268 fftmap_threshold = pr.subsweeps_extra_results[i_ss].fft_map_threshold
269
270 # fftmap = 10*np.log10(fftmap) # to dB
271
272 spf = fftmap.shape[0]
273 r = 100 * pr.subsweeps_extra_results[i_ss].r
274
275 transform = QTransform()
276 transform.translate(
277 r[0], -100 * pr.extra_result.dv * spf / 2 - 0.5 * 100 * pr.extra_result.dv
278 )
279 transform.scale(r[1] - r[0], 100 * pr.extra_result.dv)
280
281 self.fftmap_images[curve_idx].setTransform(transform)
282
283 self.fftmap_images[curve_idx].updateImage(
284 np.fft.fftshift(fftmap, 0).T,
285 levels=(0, 1.05 * np.max(fftmap)),
286 )
287
288 if PLOT_THRESHOLDS:
289 bin0 = fftmap[0, :]
290 threshold_bin0 = fftmap_threshold[0, :]
291
292 max_other_bins = np.max(fftmap[1:, :], axis=0)
293 threshold_other_bins = fftmap_threshold[1, :]
294
295 self.bin0_curves[curve_idx].setData(r, bin0)
296 self.bin0_threshold_curves[curve_idx].setData(r, threshold_bin0)
297 self.other_bins_curves[curve_idx].setData(r, max_other_bins)
298 self.other_bins_threshold_curves[curve_idx].setData(r, threshold_other_bins)
299
300 v = pr.targets[0].velocity if pr.targets else np.nan
301
302 self.obst_vel_ys[i_s] = np.roll(self.obst_vel_ys[i_s], -1)
303 self.obst_vel_ys[i_s][-1] = 100 * v # m/s -> cm/s
304
305 if np.isnan(self.obst_vel_ys[i_s]).all():
306 self.angle_hist_curves[i_s].setVisible(False)
307 else:
308 self.angle_hist_curves[i_s].setVisible(True)
309 self.angle_hist_curves[i_s].setData(
310 self.hist_x, self.obst_vel_ys[i_s], connect="finite"
311 )
312
313 # TODO: should be known earlier
314 # r_min = min([er.r[0] for er in pr.subsweeps_extra_results])
315 # r_max = min([er.r[-1] for er in pr.subsweeps_extra_results])
316 # self.range_hist.setYRange(100*r_min, 100*r_max)
317
318 r_targets = pr.targets[0].distance if pr.targets else np.nan
319
320 self.obst_dist_ys[i_s] = np.roll(self.obst_dist_ys[i_s], -1)
321 self.obst_dist_ys[i_s][-1] = 100 * r_targets # m -> cm
322
323 # print(f'{i_s}: r = {r_targets}, v = {v}')
324
325 if np.isnan(self.obst_dist_ys[i_s]).all():
326 self.range_hist_curves[i_s].setVisible(False)
327 else:
328 self.range_hist_curves[i_s].setVisible(True)
329 self.range_hist_curves[i_s].setData(
330 self.hist_x, self.obst_dist_ys[i_s], connect="finite"
331 )
332
333 if self.enable_bilateration:
334 beta = (
335 detector_result.bilateration_result.beta_degs[0]
336 if detector_result.bilateration_result.beta_degs
337 else np.nan
338 )
339
340 self.obst_bil_ys = np.roll(self.obst_bil_ys, -1)
341 self.obst_bil_ys[-1] = beta
342
343 if np.isnan(self.obst_bil_ys).all():
344 self.bil_hist_curve.setVisible(False)
345 else:
346 self.bil_hist_curve.setVisible(True)
347 self.bil_hist_curve.setData(self.hist_x, self.obst_bil_ys, connect="finite")
348
349
350if __name__ == "__main__":
351 main()
View this example on GitHub: acconeer/acconeer-python-exploration