examples/a121/algo/bilateration/bilaterator.py

examples/a121/algo/bilateration/bilaterator.py#

  1# Copyright (c) Acconeer AB, 2023-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo.bilateration import Processor, ProcessorConfig
 16from acconeer.exptool.a121.algo.distance import Detector, DetectorConfig, ThresholdMethod
 17
 18
 19_SENSOR_IDS = [2, 3]
 20
 21
 22def main():
 23    args = a121.ExampleArgumentParser().parse_args()
 24    et.utils.config_logging(args)
 25
 26    client = a121.Client.open(**a121.get_client_args(args))
 27
 28    detector_config = DetectorConfig(
 29        start_m=0.25,
 30        end_m=1.0,
 31        signal_quality=25.0,
 32        max_profile=a121.Profile.PROFILE_1,
 33        max_step_length=2,
 34        threshold_method=ThresholdMethod.CFAR,
 35    )
 36
 37    detector = Detector(client=client, sensor_ids=_SENSOR_IDS, detector_config=detector_config)
 38
 39    session_config = detector.session_config
 40
 41    processor_config = ProcessorConfig()
 42
 43    processor = Processor(
 44        session_config=session_config, processor_config=processor_config, sensor_ids=_SENSOR_IDS
 45    )
 46
 47    detector.calibrate_detector()
 48
 49    detector.start()
 50
 51    pg_updater = PlotPlugin(detector_config=detector_config, bilateration_config=processor_config)
 52    pg_process = et.PGProcess(pg_updater)
 53    pg_process.start()
 54
 55    interrupt_handler = et.utils.ExampleInterruptHandler()
 56    print("Press Ctrl-C to end session")
 57
 58    while not interrupt_handler.got_signal:
 59        detector_result = detector.get_next()
 60        processor_result = processor.process(detector_result)
 61
 62        try:
 63            pg_process.put_data(
 64                {"detector_result": detector_result, "processor_result": processor_result}
 65            )
 66        except et.PGProccessDiedException:
 67            break
 68
 69    detector.stop()
 70
 71    print("Disconnecting...")
 72    client.close()
 73
 74
 75class PlotPlugin:
 76    _NUM_POINTS_ON_CIRCLE = 100
 77
 78    def __init__(
 79        self,
 80        bilateration_config: ProcessorConfig,
 81        detector_config: DetectorConfig,
 82    ) -> None:
 83        self.num_curves = 5
 84        self.detector_config = detector_config
 85        self.sensor_half_spacing_m = bilateration_config.sensor_spacing_m / 2
 86
 87    def setup(self, win):
 88        # Define sweep plot.
 89        self.sweep_plot = win.addPlot(row=0, col=0)
 90        self.sweep_plot.setMenuEnabled(False)
 91        self.sweep_plot.showGrid(x=True, y=True)
 92        self.sweep_plot.addLegend()
 93        self.sweep_plot.setLabel("left", "Amplitude")
 94        self.sweep_plot.setLabel("bottom", "Distance (m)")
 95        self.sweep_plot.addItem(pg.PlotDataItem())
 96
 97        pen_sweep_0 = et.utils.pg_pen_cycler(0)
 98        pen_sweep_1 = et.utils.pg_pen_cycler(1)
 99
100        # Add sweep curves for the two sensors.
101        feat_kw_1 = dict(pen=pen_sweep_0)
102        feat_kw_2 = dict(pen=pen_sweep_1)
103        self.sweep_curves_1 = [self.sweep_plot.plot(**feat_kw_1) for _ in range(self.num_curves)]
104        self.sweep_curves_2 = [self.sweep_plot.plot(**feat_kw_2) for _ in range(self.num_curves)]
105
106        pen_sweep_0 = et.utils.pg_pen_cycler(0, "--")
107        pen_sweep_1 = et.utils.pg_pen_cycler(1, "--")
108
109        # Add threshold curves for the two sensors.
110        feat_kw_1 = dict(pen=pen_sweep_0)
111        feat_kw_2 = dict(pen=pen_sweep_1)
112        self.threshold_curves_1 = [
113            self.sweep_plot.plot(**feat_kw_1) for _ in range(self.num_curves)
114        ]
115        self.threshold_curves_2 = [
116            self.sweep_plot.plot(**feat_kw_2) for _ in range(self.num_curves)
117        ]
118
119        # Add legends.
120        sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
121        sweep_plot_legend.setParentItem(self.sweep_plot)
122        sweep_plot_legend.addItem(self.sweep_curves_1[0], "Sweep - Left sensor")
123        sweep_plot_legend.addItem(self.threshold_curves_1[0], "Threshold - Left sensor")
124        sweep_plot_legend.addItem(self.sweep_curves_2[0], "Sweep - Right sensor")
125        sweep_plot_legend.addItem(self.threshold_curves_2[0], "Threshold - Right sensor")
126
127        self.sweep_smooth_max = et.utils.SmoothMax()
128
129        # Define obstacle plot.
130        self.obstacle_location_plot = win.addPlot(row=1, col=0)
131        self.obstacle_location_plot.setMenuEnabled(False)
132        self.obstacle_location_plot.setAspectLocked()
133        self.obstacle_location_plot.showGrid(x=True, y=True)
134        self.obstacle_location_plot.addLegend()
135        self.obstacle_location_plot.setLabel("left", "Y (m)")
136        self.obstacle_location_plot.setLabel("bottom", "X (m)")
137        self.obstacle_location_plot.addItem(pg.PlotDataItem())
138        self.obstacle_location_plot.setXRange(
139            -self.detector_config.end_m, self.detector_config.end_m
140        )
141        self.obstacle_location_plot.setYRange(0.0, self.detector_config.end_m)
142
143        pen_sweep_0 = et.utils.pg_pen_cycler(0)
144        brush_sweep_0 = et.utils.pg_brush_cycler(0)
145        symbol_kw = dict(symbol="o", symbolSize=10, symbolBrush=brush_sweep_0, symbolPen=None)
146        feat_kw = dict(pen=None, **symbol_kw)
147        self.obstacle_location_curve = self.obstacle_location_plot.plot(**feat_kw)
148        feat_kw = dict(pen=pen_sweep_0)
149        self.obstacle_location_half_curve = [
150            self.obstacle_location_plot.plot(**feat_kw) for _ in range(Processor._MAX_NUM_OBJECTS)
151        ]
152
153    def update(self, data) -> None:
154        detector_result = data["detector_result"]
155        processor_result = data["processor_result"]
156
157        # Plot sweep data from both distance detectors.
158        max_val = 0.0
159        for distance_detector_result, sweep_curves, threshold_curves in zip(
160            detector_result.values(),
161            [self.sweep_curves_1, self.sweep_curves_2],
162            [self.threshold_curves_1, self.threshold_curves_2],
163        ):
164            for idx, distance_processor_result in enumerate(
165                distance_detector_result.processor_results
166            ):
167                abs_sweep = distance_processor_result.extra_result.abs_sweep
168                threshold = distance_processor_result.extra_result.used_threshold
169                distances_m = distance_processor_result.extra_result.distances_m
170
171                assert abs_sweep is not None
172                assert threshold is not None
173                assert distances_m is not None
174
175                sweep_curves[idx].setData(distances_m, abs_sweep)
176                threshold_curves[idx].setData(distances_m, threshold)
177
178                if max_val < np.max(abs_sweep):
179                    max_val = float(np.max(abs_sweep))
180
181                if max_val < np.max(threshold):
182                    max_val = float(np.max(threshold))
183
184        if max_val != 0.0:
185            self.sweep_plot.setYRange(0.0, self.sweep_smooth_max.update(max_val))
186
187        # Plot result from bilateration processor.
188        # Start with the points.
189        points = processor_result.points
190        xs = [point.x_coord for point in points]
191        ys = [point.y_coord for point in points]
192        self.obstacle_location_curve.setData(xs, ys)
193
194        # Plot objects without counter part.
195        objects_without_counterpart = processor_result.objects_without_counterpart
196        for i, o in enumerate(objects_without_counterpart):
197            x = np.cos(np.linspace(0, np.pi, self._NUM_POINTS_ON_CIRCLE)) * o.distance
198            y = np.sin(np.linspace(0, np.pi, self._NUM_POINTS_ON_CIRCLE)) * o.distance
199
200            # Offset circle to center around the sensor that detected the object.
201            if o.sensor_position == Processor._SENSOR_POSITION_LEFT:
202                x -= self.sensor_half_spacing_m
203            elif o.sensor_position == Processor._SENSOR_POSITION_RIGHT:
204                x += self.sensor_half_spacing_m
205            else:
206                msg = "Invalid sensor position."
207                raise ValueError(msg)
208
209            self.obstacle_location_half_curve[i].setData(x, y)
210
211        # Remove curves that does not have an object to visualize.
212        for i in range(len(objects_without_counterpart), Processor._MAX_NUM_OBJECTS):
213            self.obstacle_location_half_curve[i].setData([], [])
214
215
216if __name__ == "__main__":
217    main()

View this example on GitHub: acconeer/acconeer-python-exploration