examples/a121/algo/obstacle/detector.py#
1# Copyright (c) Acconeer AB, 2023-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9# import PySide6 # noqa: F401
10# from PySide6.QtWidgets import QLabel, QPushButton, QVBoxLayout, QWidget
11from PySide6.QtGui import QTransform
12
13import pyqtgraph as pg
14
15import acconeer.exptool as et
16from acconeer.exptool import a121
17from acconeer.exptool.a121.algo.obstacle import Detector, DetectorConfig, DetectorResult
18
19
20SENSOR_IDS = [2, 3]
21
22
23def main():
24 args = a121.ExampleArgumentParser().parse_args()
25 et.utils.config_logging(args)
26
27 client = a121.Client.open(**a121.get_client_args(args))
28
29 # Creating a list of subsweep configurations.
30 subsweep_configurations = [
31 a121.SubsweepConfig(
32 start_point=24, # ~6 cm
33 num_points=96,
34 step_length=1,
35 profile=a121.Profile.PROFILE_1,
36 hwaas=4,
37 ),
38 a121.SubsweepConfig(
39 start_point=96, # ~24 cm
40 num_points=48,
41 step_length=4,
42 profile=a121.Profile.PROFILE_3,
43 hwaas=16,
44 ),
45 a121.SubsweepConfig(
46 start_point=300, # ~75 cm
47 num_points=48,
48 step_length=4,
49 profile=a121.Profile.PROFILE_3,
50 hwaas=2,
51 ),
52 ]
53
54 # Configure the detector using multiple subsweeps
55 detector_config = DetectorConfig(
56 num_mean_threshold=1.5,
57 num_std_threshold=4,
58 subsweep_configurations=subsweep_configurations,
59 )
60
61 detector = Detector(client=client, sensor_ids=SENSOR_IDS, detector_config=detector_config)
62
63 detector.calibrate_detector()
64 detector.start()
65
66 pg_updater = PGUpdater(
67 num_sensors=len(SENSOR_IDS),
68 num_subsweeps=len(subsweep_configurations),
69 )
70
71 pg_process = et.PGProcess(pg_updater)
72 pg_process.start()
73
74 interrupt_handler = et.utils.ExampleInterruptHandler()
75 print("Press Ctrl-C to end session")
76
77 while not interrupt_handler.got_signal:
78 # v_current = get_speed_from_some_Robot_API(), e.g ROS?
79 # detector.update_robot_speed(v_current)
80
81 detector_result = detector.get_next()
82
83 try:
84 pg_process.put_data(detector_result)
85 except et.PGProccessDiedException:
86 break
87
88 detector.stop()
89
90 print("Disconnecting...")
91 client.close()
92
93
94PLOT_HISTORY_FRAMES = 50
95PLOT_THRESHOLDS = True
96
97
98class PGUpdater:
99 def __init__(self, num_sensors, num_subsweeps):
100 self.num_sensors = num_sensors
101 self.num_subsweeps = num_subsweeps
102 self.obst_vel_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
103 self.obst_dist_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
104 self.obst_bil_ys = np.nan * np.ones(PLOT_HISTORY_FRAMES)
105 self.hist_x = np.linspace(-100, 0, PLOT_HISTORY_FRAMES)
106
107 def setup(self, win: pg.GraphicsLayout):
108 self.fftmap_plots: list[pg.PlotItem] = []
109 self.fftmap_images: list[pg.ImageItem] = []
110 self.range_hist_curves: list[pg.PlotDataItem] = []
111 self.angle_hist_curves: list[pg.PlotDataItem] = []
112
113 if PLOT_THRESHOLDS:
114 self.bin0_curves: list[pg.PlotDataItem] = []
115 self.bin0_threshold_curves: list[pg.PlotDataItem] = []
116 self.other_bins_curves: list[pg.PlotDataItem] = []
117 self.other_bins_threshold_curves: list[pg.PlotDataItem] = []
118
119 row_offset = 2 if PLOT_THRESHOLDS else 0
120
121 for i_s in range(self.num_sensors):
122 for i_ss in range(self.num_subsweeps):
123 col = i_s * self.num_subsweeps + i_ss
124 p = win.addPlot(
125 row=0, col=col, title=f"FFT Map, sensor {SENSOR_IDS[i_s]}, subsweep {i_ss}"
126 )
127 im = pg.ImageItem(autoDownsample=True)
128 im.setLookupTable(et.utils.pg_mpl_cmap("viridis"))
129 self.fftmap_images.append(im)
130
131 p.setLabel("bottom", "Distance (cm)")
132 p.setLabel("left", "Velocity (cm/s)")
133 p.addItem(im)
134
135 self.fftmap_plots.append(p)
136
137 if PLOT_THRESHOLDS:
138 self.bin0 = pg.PlotItem(title="Zeroth velocity/angle bin")
139 self.bin0.showGrid(x=True, y=True)
140 self.bin0.setLabel("bottom", "Range (cm)")
141 self.bin0.setLabel("left", "Amplitude")
142 self.bin0.addLegend()
143
144 pen = et.utils.pg_pen_cycler(0)
145 brush = et.utils.pg_brush_cycler(0)
146 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
147 feat_kw = dict(pen=pen, **symbol_kw)
148 self.bin0_curves += [self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)]
149
150 pen = et.utils.pg_pen_cycler(1)
151 brush = et.utils.pg_brush_cycler(1)
152 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
153 feat_kw = dict(pen=pen, **symbol_kw)
154 self.bin0_threshold_curves += [
155 self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)
156 ]
157
158 bin0_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
159 bin0_plot_legend.setParentItem(self.bin0)
160 bin0_plot_legend.addItem(self.bin0_curves[0], "Sweep")
161 bin0_plot_legend.addItem(self.bin0_threshold_curves[0], "Threshold")
162
163 sublayout = win.addLayout(
164 row=1,
165 col=i_s * self.num_subsweeps,
166 colspan=self.num_subsweeps,
167 )
168 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
169 sublayout.addItem(self.bin0, row=0, col=0)
170
171 self.other_bins = pg.PlotItem(title="Other velocity/angle bins")
172 self.other_bins.showGrid(x=True, y=True)
173 self.other_bins.setLabel("bottom", "Range (cm)")
174 self.other_bins.setLabel("left", "Amplitude")
175 self.other_bins.addLegend()
176
177 pen = et.utils.pg_pen_cycler(0)
178 brush = et.utils.pg_brush_cycler(0)
179 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
180 feat_kw = dict(pen=pen, **symbol_kw)
181 self.other_bins_curves += [
182 self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
183 ]
184
185 pen = et.utils.pg_pen_cycler(1)
186 brush = et.utils.pg_brush_cycler(1)
187 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
188 feat_kw = dict(pen=pen, **symbol_kw)
189 self.other_bins_threshold_curves += [
190 self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
191 ]
192
193 other_bins_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
194 other_bins_plot_legend.setParentItem(self.other_bins)
195 other_bins_plot_legend.addItem(self.other_bins_curves[0], "Max Sweep")
196 other_bins_plot_legend.addItem(self.other_bins_threshold_curves[0], "Threshold")
197
198 sublayout = win.addLayout(
199 row=2,
200 col=i_s * self.num_subsweeps,
201 colspan=self.num_subsweeps,
202 )
203 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
204 sublayout.addItem(self.other_bins, row=0, col=0)
205
206 self.angle_hist = pg.PlotItem(title="Angle/velocity history")
207 self.angle_hist.showGrid(x=True, y=True)
208 self.angle_hist.setLabel("bottom", "Time (frames)")
209 self.angle_hist.setLabel("left", "velocity (cm/s)")
210 self.angle_hist.setXRange(-100, 0)
211 self.angle_hist.addLegend()
212 self.angle_hist_curves.append(self.angle_hist.plot(symbolSize=5, symbol="o"))
213
214 sublayout = win.addLayout(
215 row=1 + row_offset,
216 col=i_s * self.num_subsweeps,
217 colspan=self.num_subsweeps,
218 )
219
220 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
221 sublayout.addItem(self.angle_hist, row=0, col=0)
222
223 self.range_hist = pg.PlotItem(title="Range history")
224 self.range_hist.showGrid(x=True, y=True)
225 self.range_hist.setLabel("bottom", "Time (frames)")
226 self.range_hist.setLabel("left", "Range (cm)")
227 self.range_hist.setXRange(-100, 0)
228 self.range_hist.addLegend()
229 self.range_hist_curves.append(self.range_hist.plot(symbolSize=5, symbol="o"))
230
231 sublayout = win.addLayout(
232 row=2 + row_offset,
233 col=i_s * self.num_subsweeps,
234 colspan=self.num_subsweeps,
235 )
236
237 sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
238 sublayout.addItem(self.range_hist, row=0, col=0)
239
240 def update(self, detector_result: DetectorResult):
241 for i_s in range(self.num_sensors):
242 pr = detector_result.processor_results[SENSOR_IDS[i_s]]
243
244 for i_ss in range(self.num_subsweeps):
245 curve_idx = self.num_subsweeps * i_s + i_ss
246
247 fftmap = pr.subsweeps_extra_results[i_ss].fft_map
248 fftmap_threshold = pr.subsweeps_extra_results[i_ss].fft_map_threshold
249
250 # fftmap = 10*np.log10(fftmap) # to dB
251
252 spf = fftmap.shape[0]
253 r = 100 * pr.subsweeps_extra_results[i_ss].r
254
255 transform = QTransform()
256 transform.translate(
257 r[0], -100 * pr.extra_result.dv * spf / 2 - 0.5 * 100 * pr.extra_result.dv
258 )
259 transform.scale(r[1] - r[0], 100 * pr.extra_result.dv)
260
261 self.fftmap_images[curve_idx].setTransform(transform)
262
263 self.fftmap_images[curve_idx].updateImage(
264 np.fft.fftshift(fftmap, 0).T,
265 levels=(0, 1.05 * np.max(fftmap)),
266 )
267
268 if PLOT_THRESHOLDS:
269 bin0 = fftmap[0, :]
270 threshold_bin0 = fftmap_threshold[0, :]
271
272 max_other_bins = np.max(fftmap[1:, :], axis=0)
273 threshold_other_bins = fftmap_threshold[1, :]
274
275 self.bin0_curves[curve_idx].setData(r, bin0)
276 self.bin0_threshold_curves[curve_idx].setData(r, threshold_bin0)
277 self.other_bins_curves[curve_idx].setData(r, max_other_bins)
278 self.other_bins_threshold_curves[curve_idx].setData(r, threshold_other_bins)
279
280 v = pr.targets[0].velocity if pr.targets else np.nan
281
282 self.obst_vel_ys[i_s] = np.roll(self.obst_vel_ys[i_s], -1)
283 self.obst_vel_ys[i_s][-1] = 100 * v # m/s -> cm/s
284
285 if np.isnan(self.obst_vel_ys[i_s]).all():
286 self.angle_hist_curves[i_s].setVisible(False)
287 else:
288 self.angle_hist_curves[i_s].setVisible(True)
289 self.angle_hist_curves[i_s].setData(
290 self.hist_x, self.obst_vel_ys[i_s], connect="finite"
291 )
292
293 # TODO: should be known earlier
294 # r_min = min([er.r[0] for er in pr.subsweeps_extra_results])
295 # r_max = min([er.r[-1] for er in pr.subsweeps_extra_results])
296 # self.range_hist.setYRange(100*r_min, 100*r_max)
297
298 r_targets = pr.targets[0].distance if pr.targets else np.nan
299
300 self.obst_dist_ys[i_s] = np.roll(self.obst_dist_ys[i_s], -1)
301 self.obst_dist_ys[i_s][-1] = 100 * r_targets # m -> cm
302
303 # print(f'{i_s}: r = {r_targets}, v = {v}')
304
305 if np.isnan(self.obst_dist_ys[i_s]).all():
306 self.range_hist_curves[i_s].setVisible(False)
307 else:
308 self.range_hist_curves[i_s].setVisible(True)
309 self.range_hist_curves[i_s].setData(
310 self.hist_x, self.obst_dist_ys[i_s], connect="finite"
311 )
312
313
314if __name__ == "__main__":
315 main()
View this example on GitHub: acconeer/acconeer-python-exploration