examples/a121/algo/obstacle/detector.py

examples/a121/algo/obstacle/detector.py#

  1# Copyright (c) Acconeer AB, 2023-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9# import PySide6  # noqa: F401
 10# from PySide6.QtWidgets import QLabel, QPushButton, QVBoxLayout, QWidget
 11from PySide6.QtGui import QTransform
 12
 13import pyqtgraph as pg
 14
 15import acconeer.exptool as et
 16from acconeer.exptool import a121
 17from acconeer.exptool.a121.algo.obstacle import Detector, DetectorConfig, DetectorResult
 18
 19
 20SENSOR_IDS = [2, 3]
 21
 22
 23def main():
 24    args = a121.ExampleArgumentParser().parse_args()
 25    et.utils.config_logging(args)
 26
 27    client = a121.Client.open(**a121.get_client_args(args))
 28
 29    # Creating a list of subsweep configurations.
 30    subsweep_configurations = [
 31        a121.SubsweepConfig(
 32            start_point=24,  # ~6 cm
 33            num_points=96,
 34            step_length=1,
 35            profile=a121.Profile.PROFILE_1,
 36            hwaas=4,
 37        ),
 38        a121.SubsweepConfig(
 39            start_point=96,  # ~24 cm
 40            num_points=48,
 41            step_length=4,
 42            profile=a121.Profile.PROFILE_3,
 43            hwaas=16,
 44        ),
 45        a121.SubsweepConfig(
 46            start_point=300,  # ~75 cm
 47            num_points=48,
 48            step_length=4,
 49            profile=a121.Profile.PROFILE_3,
 50            hwaas=2,
 51        ),
 52    ]
 53
 54    # Configure the detector using multiple subsweeps
 55    detector_config = DetectorConfig(
 56        num_mean_threshold=1.5,
 57        num_std_threshold=4,
 58        subsweep_configurations=subsweep_configurations,
 59    )
 60
 61    detector = Detector(client=client, sensor_ids=SENSOR_IDS, detector_config=detector_config)
 62
 63    detector.calibrate_detector()
 64    detector.start()
 65
 66    pg_updater = PGUpdater(
 67        num_sensors=len(SENSOR_IDS),
 68        num_subsweeps=len(subsweep_configurations),
 69    )
 70
 71    pg_process = et.PGProcess(pg_updater)
 72    pg_process.start()
 73
 74    interrupt_handler = et.utils.ExampleInterruptHandler()
 75    print("Press Ctrl-C to end session")
 76
 77    while not interrupt_handler.got_signal:
 78        # v_current = get_speed_from_some_Robot_API(), e.g ROS?
 79        # detector.update_robot_speed(v_current)
 80
 81        detector_result = detector.get_next()
 82
 83        try:
 84            pg_process.put_data(detector_result)
 85        except et.PGProccessDiedException:
 86            break
 87
 88    detector.stop()
 89
 90    print("Disconnecting...")
 91    client.close()
 92
 93
 94PLOT_HISTORY_FRAMES = 50
 95PLOT_THRESHOLDS = True
 96
 97
 98class PGUpdater:
 99    def __init__(self, num_sensors, num_subsweeps):
100        self.num_sensors = num_sensors
101        self.num_subsweeps = num_subsweeps
102        self.obst_vel_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
103        self.obst_dist_ys = num_sensors * [np.nan * np.ones(PLOT_HISTORY_FRAMES)]
104        self.obst_bil_ys = np.nan * np.ones(PLOT_HISTORY_FRAMES)
105        self.hist_x = np.linspace(-100, 0, PLOT_HISTORY_FRAMES)
106
107    def setup(self, win: pg.GraphicsLayout):
108        self.fftmap_plots: list[pg.PlotItem] = []
109        self.fftmap_images: list[pg.ImageItem] = []
110        self.range_hist_curves: list[pg.PlotDataItem] = []
111        self.angle_hist_curves: list[pg.PlotDataItem] = []
112
113        if PLOT_THRESHOLDS:
114            self.bin0_curves: list[pg.PlotDataItem] = []
115            self.bin0_threshold_curves: list[pg.PlotDataItem] = []
116            self.other_bins_curves: list[pg.PlotDataItem] = []
117            self.other_bins_threshold_curves: list[pg.PlotDataItem] = []
118
119        row_offset = 2 if PLOT_THRESHOLDS else 0
120
121        for i_s in range(self.num_sensors):
122            for i_ss in range(self.num_subsweeps):
123                col = i_s * self.num_subsweeps + i_ss
124                p = win.addPlot(
125                    row=0, col=col, title=f"FFT Map, sensor {SENSOR_IDS[i_s]}, subsweep {i_ss}"
126                )
127                im = pg.ImageItem(autoDownsample=True)
128                im.setLookupTable(et.utils.pg_mpl_cmap("viridis"))
129                self.fftmap_images.append(im)
130
131                p.setLabel("bottom", "Distance (cm)")
132                p.setLabel("left", "Velocity (cm/s)")
133                p.addItem(im)
134
135                self.fftmap_plots.append(p)
136
137            if PLOT_THRESHOLDS:
138                self.bin0 = pg.PlotItem(title="Zeroth velocity/angle bin")
139                self.bin0.showGrid(x=True, y=True)
140                self.bin0.setLabel("bottom", "Range (cm)")
141                self.bin0.setLabel("left", "Amplitude")
142                self.bin0.addLegend()
143
144                pen = et.utils.pg_pen_cycler(0)
145                brush = et.utils.pg_brush_cycler(0)
146                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
147                feat_kw = dict(pen=pen, **symbol_kw)
148                self.bin0_curves += [self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)]
149
150                pen = et.utils.pg_pen_cycler(1)
151                brush = et.utils.pg_brush_cycler(1)
152                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
153                feat_kw = dict(pen=pen, **symbol_kw)
154                self.bin0_threshold_curves += [
155                    self.bin0.plot(**feat_kw) for _ in range(self.num_subsweeps)
156                ]
157
158                bin0_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
159                bin0_plot_legend.setParentItem(self.bin0)
160                bin0_plot_legend.addItem(self.bin0_curves[0], "Sweep")
161                bin0_plot_legend.addItem(self.bin0_threshold_curves[0], "Threshold")
162
163                sublayout = win.addLayout(
164                    row=1,
165                    col=i_s * self.num_subsweeps,
166                    colspan=self.num_subsweeps,
167                )
168                sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
169                sublayout.addItem(self.bin0, row=0, col=0)
170
171                self.other_bins = pg.PlotItem(title="Other velocity/angle bins")
172                self.other_bins.showGrid(x=True, y=True)
173                self.other_bins.setLabel("bottom", "Range (cm)")
174                self.other_bins.setLabel("left", "Amplitude")
175                self.other_bins.addLegend()
176
177                pen = et.utils.pg_pen_cycler(0)
178                brush = et.utils.pg_brush_cycler(0)
179                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
180                feat_kw = dict(pen=pen, **symbol_kw)
181                self.other_bins_curves += [
182                    self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
183                ]
184
185                pen = et.utils.pg_pen_cycler(1)
186                brush = et.utils.pg_brush_cycler(1)
187                symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
188                feat_kw = dict(pen=pen, **symbol_kw)
189                self.other_bins_threshold_curves += [
190                    self.other_bins.plot(**feat_kw) for _ in range(self.num_subsweeps)
191                ]
192
193                other_bins_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
194                other_bins_plot_legend.setParentItem(self.other_bins)
195                other_bins_plot_legend.addItem(self.other_bins_curves[0], "Max Sweep")
196                other_bins_plot_legend.addItem(self.other_bins_threshold_curves[0], "Threshold")
197
198                sublayout = win.addLayout(
199                    row=2,
200                    col=i_s * self.num_subsweeps,
201                    colspan=self.num_subsweeps,
202                )
203                sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
204                sublayout.addItem(self.other_bins, row=0, col=0)
205
206            self.angle_hist = pg.PlotItem(title="Angle/velocity history")
207            self.angle_hist.showGrid(x=True, y=True)
208            self.angle_hist.setLabel("bottom", "Time (frames)")
209            self.angle_hist.setLabel("left", "velocity (cm/s)")
210            self.angle_hist.setXRange(-100, 0)
211            self.angle_hist.addLegend()
212            self.angle_hist_curves.append(self.angle_hist.plot(symbolSize=5, symbol="o"))
213
214            sublayout = win.addLayout(
215                row=1 + row_offset,
216                col=i_s * self.num_subsweeps,
217                colspan=self.num_subsweeps,
218            )
219
220            sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
221            sublayout.addItem(self.angle_hist, row=0, col=0)
222
223            self.range_hist = pg.PlotItem(title="Range history")
224            self.range_hist.showGrid(x=True, y=True)
225            self.range_hist.setLabel("bottom", "Time (frames)")
226            self.range_hist.setLabel("left", "Range (cm)")
227            self.range_hist.setXRange(-100, 0)
228            self.range_hist.addLegend()
229            self.range_hist_curves.append(self.range_hist.plot(symbolSize=5, symbol="o"))
230
231            sublayout = win.addLayout(
232                row=2 + row_offset,
233                col=i_s * self.num_subsweeps,
234                colspan=self.num_subsweeps,
235            )
236
237            sublayout.layout.setColumnStretchFactor(0, self.num_subsweeps)
238            sublayout.addItem(self.range_hist, row=0, col=0)
239
240    def update(self, detector_result: DetectorResult):
241        for i_s in range(self.num_sensors):
242            pr = detector_result.processor_results[SENSOR_IDS[i_s]]
243
244            for i_ss in range(self.num_subsweeps):
245                curve_idx = self.num_subsweeps * i_s + i_ss
246
247                fftmap = pr.subsweeps_extra_results[i_ss].fft_map
248                fftmap_threshold = pr.subsweeps_extra_results[i_ss].fft_map_threshold
249
250                # fftmap = 10*np.log10(fftmap)  # to dB
251
252                spf = fftmap.shape[0]
253                r = 100 * pr.subsweeps_extra_results[i_ss].r
254
255                transform = QTransform()
256                transform.translate(
257                    r[0], -100 * pr.extra_result.dv * spf / 2 - 0.5 * 100 * pr.extra_result.dv
258                )
259                transform.scale(r[1] - r[0], 100 * pr.extra_result.dv)
260
261                self.fftmap_images[curve_idx].setTransform(transform)
262
263                self.fftmap_images[curve_idx].updateImage(
264                    np.fft.fftshift(fftmap, 0).T,
265                    levels=(0, 1.05 * np.max(fftmap)),
266                )
267
268                if PLOT_THRESHOLDS:
269                    bin0 = fftmap[0, :]
270                    threshold_bin0 = fftmap_threshold[0, :]
271
272                    max_other_bins = np.max(fftmap[1:, :], axis=0)
273                    threshold_other_bins = fftmap_threshold[1, :]
274
275                    self.bin0_curves[curve_idx].setData(r, bin0)
276                    self.bin0_threshold_curves[curve_idx].setData(r, threshold_bin0)
277                    self.other_bins_curves[curve_idx].setData(r, max_other_bins)
278                    self.other_bins_threshold_curves[curve_idx].setData(r, threshold_other_bins)
279
280            v = pr.targets[0].velocity if pr.targets else np.nan
281
282            self.obst_vel_ys[i_s] = np.roll(self.obst_vel_ys[i_s], -1)
283            self.obst_vel_ys[i_s][-1] = 100 * v  # m/s -> cm/s
284
285            if np.isnan(self.obst_vel_ys[i_s]).all():
286                self.angle_hist_curves[i_s].setVisible(False)
287            else:
288                self.angle_hist_curves[i_s].setVisible(True)
289                self.angle_hist_curves[i_s].setData(
290                    self.hist_x, self.obst_vel_ys[i_s], connect="finite"
291                )
292
293            # TODO: should be known earlier
294            # r_min = min([er.r[0] for er in pr.subsweeps_extra_results])
295            # r_max = min([er.r[-1] for er in pr.subsweeps_extra_results])
296            # self.range_hist.setYRange(100*r_min, 100*r_max)
297
298            r_targets = pr.targets[0].distance if pr.targets else np.nan
299
300            self.obst_dist_ys[i_s] = np.roll(self.obst_dist_ys[i_s], -1)
301            self.obst_dist_ys[i_s][-1] = 100 * r_targets  # m -> cm
302
303            # print(f'{i_s}: r = {r_targets}, v = {v}')
304
305            if np.isnan(self.obst_dist_ys[i_s]).all():
306                self.range_hist_curves[i_s].setVisible(False)
307            else:
308                self.range_hist_curves[i_s].setVisible(True)
309                self.range_hist_curves[i_s].setData(
310                    self.hist_x, self.obst_dist_ys[i_s], connect="finite"
311                )
312
313
314if __name__ == "__main__":
315    main()

View this example on GitHub: acconeer/acconeer-python-exploration