examples/a121/algo/obstacle/detector.py

examples/a121/algo/obstacle/detector.py#

  1# Copyright (c) Acconeer AB, 2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9# import PySide6  # noqa: F401
 10# from PySide6.QtWidgets import QLabel, QPushButton, QVBoxLayout, QWidget
 11from PySide6.QtGui import QTransform
 12
 13import pyqtgraph as pg
 14
 15import acconeer.exptool as et
 16from acconeer.exptool import a121
 17from acconeer.exptool.a121.algo.obstacle import Detector, DetectorConfig, DetectorResult
 18
 19
 20SENSOR_IDS = [2, 3]
 21
 22
 23def main():
 24    args = a121.ExampleArgumentParser().parse_args()
 25    et.utils.config_logging(args)
 26
 27    client = a121.Client.open(**a121.get_client_args(args))
 28
 29    # Creating a list of subsweep configurations.
 30    subsweep_configurations = [
 31        a121.SubsweepConfig(
 32            start_point=24,  # ~6 cm
 33            num_points=96,
 34            step_length=1,
 35            profile=a121.Profile.PROFILE_1,
 36            hwaas=4,
 37        ),
 38        a121.SubsweepConfig(
 39            start_point=96,  # ~24 cm
 40            num_points=48,
 41            step_length=4,
 42            profile=a121.Profile.PROFILE_3,
 43            hwaas=16,
 44        ),
 45        a121.SubsweepConfig(
 46            start_point=300,  # ~75 cm
 47            num_points=48,
 48            step_length=4,
 49            profile=a121.Profile.PROFILE_3,
 50            hwaas=2,
 51        ),
 52    ]
 53
 54    # Configure the detector using multiple subsweeps
 55    detector_config = DetectorConfig(
 56        enable_bilateration=(len(SENSOR_IDS) == 2),
 57        bilateration_sensor_spacing_m=0.1,
 58        num_mean_threshold=1.5,
 59        num_std_threshold=4,
 60        subsweep_configurations=subsweep_configurations,
 61    )
 62
 63    detector = Detector(client=client, sensor_ids=SENSOR_IDS, detector_config=detector_config)
 64
 65    detector.calibrate_detector()
 66    detector.start()
 67
 68    pg_updater = PGUpdater(
 69        num_sensors=len(SENSOR_IDS),
 70        num_subsweeps=len(subsweep_configurations),
 71        enable_bilateration=detector_config.enable_bilateration,
 72    )
 73
 74    pg_process = et.PGProcess(pg_updater)
 75    pg_process.start()
 76
 77    interrupt_handler = et.utils.ExampleInterruptHandler()
 78    print("Press Ctrl-C to end session")
 79
 80    while not interrupt_handler.got_signal:
 81        # v_current = get_speed_from_some_Robot_API(), e.g ROS?
 82        # detector.update_robot_speed(v_current)
 83
 84        detector_result = detector.get_next()
 85
 86        try:
 87            pg_process.put_data(detector_result)
 88        except et.PGProccessDiedException:
 89            break
 90
 91    detector.stop()
 92
 93    print("Disconnecting...")
 94    client.close()
 95
 96
 97PLOT_HISTORY_FRAMES = 50
 98PLOT_THRESHOLDS = True
 99
100
101class PGUpdater:
102    def __init__(self, num_sensors, num_subsweeps, enable_bilateration):
103        self.num_sensors = num_sensors
104        self.num_subsweeps = num_subsweeps
105        self.enable_bilateration = enable_bilateration
106        self.obst_vel_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
107        self.obst_dist_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
108        self.obst_bil_ys = np.nan * np.ones(PLOT_HISTORY_FRAMES)
109        self.hist_x = np.linspace(-100, 0, PLOT_HISTORY_FRAMES)
110
111    def setup(self, win: pg.GraphicsLayout):
112        self.fftmap_plots: list[pg.PlotItem] = []
113        self.fftmap_images: list[pg.ImageItem] = []
114        self.range_hist_curves: list[pg.PlotDataItem] = []
115        self.angle_hist_curves: list[pg.PlotDataItem] = []
116
117        if PLOT_THRESHOLDS:
118            self.bin0_curves: list[pg.PlotDataItem] = []
119            self.bin0_threshold_curves: list[pg.PlotDataItem] = []
120            self.other_bins_curves: list[pg.PlotDataItem] = []
121            self.other_bins_threshold_curves: list[pg.PlotDataItem] = []
122
123        row_offset = 2 if PLOT_THRESHOLDS else 0
124
125        for i_s in range(self.num_sensors):
126            for i_ss in range(self.num_subsweeps):
127                col = i_s * self.num_subsweeps + i_ss
128                p = win.addPlot(
129                    row=0, col=col, title=f"FFT Map, sensor {SENSOR_IDS[i_s]}, subsweep {i_ss}"
130                )
131                im = pg.ImageItem(autoDownsample=True)
132                im.setLookupTable(et.utils.pg_mpl_cmap("viridis"))
133                self.fftmap_images.append(im)
134
135                p.setLabel("bottom", "Distance (cm)")
136                p.setLabel("left", "Velocity (cm/s)")
137                p.addItem(im)
138
139                self.fftmap_plots.append(p)
140
141            if PLOT_THRESHOLDS:
142                self.bin0 = pg.PlotItem(title="Zeroth velocity/angle bin")
143                self.bin0.showGrid(x=True, y=True)
144                self.bin0.setLabel("bottom", "Range (cm)")
145                self.bin0.setLabel("left", "Amplitude")
146                self.bin0.addLegend()
147
148                pen = et.utils.pg_pen_cycler(0)
149                brush = et.utils.pg_brush_cycler(0)
150                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
151                feat_kw = dict(pen=pen, **symbol_kw)
152                self.bin0_curves += [self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)]
153
154                pen = et.utils.pg_pen_cycler(1)
155                brush = et.utils.pg_brush_cycler(1)
156                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
157                feat_kw = dict(pen=pen, **symbol_kw)
158                self.bin0_threshold_curves += [
159                    self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)
160                ]
161
162                bin0_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
163                bin0_plot_legend.setParentItem(self.bin0)
164                bin0_plot_legend.addItem(self.bin0_curves[0], "Sweep")
165                bin0_plot_legend.addItem(self.bin0_threshold_curves[0], "Threshold")
166
167                sublayout = win.addLayout(
168                    row=1,
169                    col=i_s * self.num_subsweeps,
170                    colspan=self.num_subsweeps,
171                )
172                sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
173                sublayout.addItem(self.bin0, row=0, col=0)
174
175                self.other_bins = pg.PlotItem(title="Other velocity/angle bins")
176                self.other_bins.showGrid(x=True, y=True)
177                self.other_bins.setLabel("bottom", "Range (cm)")
178                self.other_bins.setLabel("left", "Amplitude")
179                self.other_bins.addLegend()
180
181                pen = et.utils.pg_pen_cycler(0)
182                brush = et.utils.pg_brush_cycler(0)
183                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
184                feat_kw = dict(pen=pen, **symbol_kw)
185                self.other_bins_curves += [
186                    self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
187                ]
188
189                pen = et.utils.pg_pen_cycler(1)
190                brush = et.utils.pg_brush_cycler(1)
191                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
192                feat_kw = dict(pen=pen, **symbol_kw)
193                self.other_bins_threshold_curves += [
194                    self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
195                ]
196
197                other_bins_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
198                other_bins_plot_legend.setParentItem(self.other_bins)
199                other_bins_plot_legend.addItem(self.other_bins_curves[0], "Max Sweep")
200                other_bins_plot_legend.addItem(self.other_bins_threshold_curves[0], "Threshold")
201
202                sublayout = win.addLayout(
203                    row=2,
204                    col=i_s * self.num_subsweeps,
205                    colspan=self.num_subsweeps,
206                )
207                sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
208                sublayout.addItem(self.other_bins, row=0, col=0)
209
210            self.angle_hist = pg.PlotItem(title="Angle/velocity history")
211            self.angle_hist.showGrid(x=True, y=True)
212            self.angle_hist.setLabel("bottom", "Time (frames)")
213            self.angle_hist.setLabel("left", "velocity (cm/s)")
214            self.angle_hist.setXRange(-100, 0)
215            self.angle_hist.addLegend()
216            self.angle_hist_curves.append(self.angle_hist.plot(symbolSize=5, symbol="o"))
217
218            sublayout = win.addLayout(
219                row=1 + row_offset,
220                col=i_s * self.num_subsweeps,
221                colspan=self.num_subsweeps,
222            )
223
224            sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
225            sublayout.addItem(self.angle_hist, row=0, col=0)
226
227            self.range_hist = pg.PlotItem(title="Range history")
228            self.range_hist.showGrid(x=True, y=True)
229            self.range_hist.setLabel("bottom", "Time (frames)")
230            self.range_hist.setLabel("left", "Range (cm)")
231            self.range_hist.setXRange(-100, 0)
232            self.range_hist.addLegend()
233            self.range_hist_curves.append(self.range_hist.plot(symbolSize=5, symbol="o"))
234
235            sublayout = win.addLayout(
236                row=2 + row_offset,
237                col=i_s * self.num_subsweeps,
238                colspan=self.num_subsweeps,
239            )
240
241            sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
242            sublayout.addItem(self.range_hist, row=0, col=0)
243
244        if self.enable_bilateration:
245            self.bil_hist_plot = pg.PlotItem(title="Bilateration history")
246
247            self.bil_hist_plot.showGrid(x=True, y=True)
248            self.bil_hist_plot.setLabel("bottom", "Time (frames)")
249            self.bil_hist_plot.setLabel("left", "Bilateration angle (deg)")
250            self.bil_hist_plot.setXRange(-100, 0)
251            self.bil_hist_plot.setYRange(-90, 90)
252            self.bil_hist_plot.addLegend()
253
254            self.bil_hist_curve = self.bil_hist_plot.plot(pen=et.utils.pg_pen_cycler(1))
255
256            sublayout = win.addLayout(row=3 + row_offset, col=0, colspan=2 * self.num_subsweeps)
257            sublayout.layout.setColumnStretchFactor(0, 2 * self.num_subsweeps)
258            sublayout.addItem(self.bil_hist_plot, row=0, col=0)
259
260    def update(self, detector_result: DetectorResult):
261        for i_s in range(self.num_sensors):
262            pr = detector_result.processor_results[SENSOR_IDS[i_s]]
263
264            for i_ss in range(self.num_subsweeps):
265                curve_idx = self.num_subsweeps * i_s + i_ss
266
267                fftmap = pr.subsweeps_extra_results[i_ss].fft_map
268                fftmap_threshold = pr.subsweeps_extra_results[i_ss].fft_map_threshold
269
270                # fftmap = 10*np.log10(fftmap)  # to dB
271
272                spf = fftmap.shape[0]
273                r = 100 * pr.subsweeps_extra_results[i_ss].r
274
275                transform = QTransform()
276                transform.translate(
277                    r[0], -100 * pr.extra_result.dv * spf / 2 - 0.5 * 100 * pr.extra_result.dv
278                )
279                transform.scale(r[1] - r[0], 100 * pr.extra_result.dv)
280
281                self.fftmap_images[curve_idx].setTransform(transform)
282
283                self.fftmap_images[curve_idx].updateImage(
284                    np.fft.fftshift(fftmap, 0).T,
285                    levels=(0, 1.05 * np.max(fftmap)),
286                )
287
288                if PLOT_THRESHOLDS:
289                    bin0 = fftmap[0, :]
290                    threshold_bin0 = fftmap_threshold[0, :]
291
292                    max_other_bins = np.max(fftmap[1:, :], axis=0)
293                    threshold_other_bins = fftmap_threshold[1, :]
294
295                    self.bin0_curves[curve_idx].setData(r, bin0)
296                    self.bin0_threshold_curves[curve_idx].setData(r, threshold_bin0)
297                    self.other_bins_curves[curve_idx].setData(r, max_other_bins)
298                    self.other_bins_threshold_curves[curve_idx].setData(r, threshold_other_bins)
299
300            v = pr.targets[0].velocity if pr.targets else np.nan
301
302            self.obst_vel_ys[i_s] = np.roll(self.obst_vel_ys[i_s], -1)
303            self.obst_vel_ys[i_s][-1] = 100 * v  # m/s -> cm/s
304
305            if np.isnan(self.obst_vel_ys[i_s]).all():
306                self.angle_hist_curves[i_s].setVisible(False)
307            else:
308                self.angle_hist_curves[i_s].setVisible(True)
309                self.angle_hist_curves[i_s].setData(
310                    self.hist_x, self.obst_vel_ys[i_s], connect="finite"
311                )
312
313            # TODO: should be known earlier
314            # r_min = min([er.r[0] for er in pr.subsweeps_extra_results])
315            # r_max = min([er.r[-1] for er in pr.subsweeps_extra_results])
316            # self.range_hist.setYRange(100*r_min, 100*r_max)
317
318            r_targets = pr.targets[0].distance if pr.targets else np.nan
319
320            self.obst_dist_ys[i_s] = np.roll(self.obst_dist_ys[i_s], -1)
321            self.obst_dist_ys[i_s][-1] = 100 * r_targets  # m -> cm
322
323            # print(f'{i_s}: r = {r_targets}, v = {v}')
324
325            if np.isnan(self.obst_dist_ys[i_s]).all():
326                self.range_hist_curves[i_s].setVisible(False)
327            else:
328                self.range_hist_curves[i_s].setVisible(True)
329                self.range_hist_curves[i_s].setData(
330                    self.hist_x, self.obst_dist_ys[i_s], connect="finite"
331                )
332
333        if self.enable_bilateration:
334            beta = (
335                detector_result.bilateration_result.beta_degs[0]
336                if detector_result.bilateration_result.beta_degs
337                else np.nan
338            )
339
340            self.obst_bil_ys = np.roll(self.obst_bil_ys, -1)
341            self.obst_bil_ys[-1] = beta
342
343            if np.isnan(self.obst_bil_ys).all():
344                self.bil_hist_curve.setVisible(False)
345            else:
346                self.bil_hist_curve.setVisible(True)
347                self.bil_hist_curve.setData(self.hist_x, self.obst_bil_ys, connect="finite")
348
349
350if __name__ == "__main__":
351    main()

View this example on GitHub: acconeer/acconeer-python-exploration