examples/a121/algo/parking/parking.py

examples/a121/algo/parking/parking.py#

  1# Copyright (c) Acconeer AB, 2023-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10from PySide6 import QtCore
 11
 12import pyqtgraph as pg
 13
 14import acconeer.exptool as et
 15from acconeer.exptool import a121
 16from acconeer.exptool.a121.algo._utils import get_distances_m
 17from acconeer.exptool.a121.algo.parking import (
 18    ObstructionProcessor,
 19    RefApp,
 20    RefAppConfig,
 21    RefAppResult,
 22)
 23from acconeer.exptool.a121.algo.parking._processors import MAX_AMPLITUDE
 24from acconeer.exptool.a121.algo.parking._ref_app import get_sensor_configs
 25
 26
 27SENSOR_ID = 1
 28
 29
 30def main():
 31    args = a121.ExampleArgumentParser().parse_args()
 32    et.utils.config_logging(args)
 33
 34    client = a121.Client.open(**a121.get_client_args(args))
 35    ref_app_config = RefAppConfig(
 36        range_start_m=0.1,
 37        range_end_m=0.4,
 38        hwaas=24,
 39        profile=a121.Profile.PROFILE_1,
 40        update_rate=5,
 41        queue_length_n=3,
 42        amplitude_threshold=8.0,
 43        weighted_distance_threshold_m=0.1,
 44        obstruction_detection=True,
 45        obstruction_start_m=0.03,
 46        obstruction_end_m=0.05,
 47        obstruction_distance_threshold=0.06,
 48    )
 49
 50    ref_app = RefApp(client=client, sensor_id=SENSOR_ID, ref_app_config=ref_app_config)
 51    ref_app.calibrate_ref_app()
 52    ref_app.start()
 53
 54    pg_updater = PGUpdater(
 55        ref_app_config,
 56        ref_app.metadata,
 57        SENSOR_ID,
 58        ref_app.session_config,
 59    )
 60
 61    pg_process = et.PGProcess(pg_updater)
 62    pg_process.start()
 63
 64    interrupt_handler = et.utils.ExampleInterruptHandler()
 65    print("Press Ctrl-C to end session")
 66
 67    while not interrupt_handler.got_signal:
 68        ref_app_result = ref_app.get_next()
 69        if ref_app_result.car_detected:
 70            print("Car detected")
 71        else:
 72            print("No car detected")
 73        if ref_app_config.obstruction_detection:
 74            if ref_app_result.obstruction_detected:
 75                print("Obstruction detected")
 76            else:
 77                print("No obstruction detected")
 78        try:
 79            pg_process.put_data(ref_app_result)
 80        except et.PGProccessDiedException:
 81            break
 82
 83    ref_app.stop()
 84
 85    print("Disconnecting...")
 86    client.close()
 87
 88
 89class PGUpdater:
 90    def __init__(
 91        self,
 92        ref_app_config: RefAppConfig,
 93        metadata: a121.Metadata,
 94        sensor_id: int,
 95        session_config: a121.SessionConfig,
 96    ):
 97        self.metadata = metadata
 98        self.ref_app_config = ref_app_config
 99        sensor_configs = get_sensor_configs(ref_app_config)
100        display_config = sensor_configs["base_config"]
101        self.sensor_id = sensor_id
102        self.session_config = session_config
103        self.sensor_config = display_config
104        self.distances = get_distances_m(display_config, metadata)
105
106        if self.ref_app_config.obstruction_detection:
107            obstruction_config = sensor_configs["obstruction_config"]
108            self.obs_distances = get_distances_m(obstruction_config, metadata)
109            self.obs_x_thres, self.obs_y_thres = ObstructionProcessor.get_thresholds(
110                ref_app_config.obstruction_distance_threshold, self.obs_distances
111            )
112
113        self.setup_is_done = False
114
115    def setup(self, win):
116        win.setWindowTitle("Acconeer parking detection example")
117        # Define pens and font.
118        blue_color = et.utils.color_cycler(0)
119        orange_color = et.utils.color_cycler(1)
120        brush = et.utils.pg_brush_cycler(0)
121
122        self.blue = dict(
123            pen=pg.mkPen(blue_color, width=2),
124            symbol="o",
125            symbolSize=1,
126            symbolBrush=brush,
127            symbolPen="k",
128        )
129
130        self.orange = dict(
131            pen=pg.mkPen(orange_color, width=2),
132            symbol="o",
133            symbolSize=1,
134            symbolBrush=brush,
135            symbolPen="k",
136        )
137        self.blue_transparent_pen = pg.mkPen(f"{blue_color}50", width=2)
138        self.orange_transparent_pen = pg.mkPen(f"{orange_color}50", width=2)
139
140        brush_dot = et.utils.pg_brush_cycler(1)
141
142        # Signature plot.
143        self.sig_plot = win.addPlot(row=0, col=0, colspan=2)
144        self.sig_plot.setTitle("Sampled Signatures")
145        self.sig_plot.setMenuEnabled(False)
146        self.sig_plot.showGrid(x=True, y=True)
147        self.sig_plot.addLegend()
148        self.sig_plot.setLabel("left", "Normalized energy")
149        self.sig_plot.setLabel("bottom", "Distance (m)")
150        self.sig_plot.addItem(pg.PlotDataItem())
151        self.sig_plot_x_range = (
152            min(self.distances),
153            max(self.distances) + self.ref_app_config.weighted_distance_threshold_m,
154        )
155
156        self.sig_plot.setXRange(self.sig_plot_x_range[0], self.sig_plot_x_range[1])
157        self.sig_plot.setYRange(0, 100)
158        symbol_kw_main = dict(
159            symbol="o", symbolSize=7, symbolBrush=brush, symbolPen=None, pen=None
160        )
161        self.sig_plot_curve = self.sig_plot.plot(**symbol_kw_main)
162        energy_threshold_line = pg.InfiniteLine(
163            angle=0, pen=pg.mkPen("k", width=1.5, style=QtCore.Qt.PenStyle.DashLine)
164        )
165        energy_threshold_line.setVisible(True)
166        energy_threshold_line.setPos(self.ref_app_config.amplitude_threshold)
167        self.sig_plot.addItem(energy_threshold_line)
168
169        self.sig_plot_cluster_start = pg.InfiniteLine(angle=90, pen=pg.mkPen("k", width=1.5))
170        self.sig_plot_cluster_start.setVisible(True)
171        self.sig_plot.addItem(self.sig_plot_cluster_start)
172
173        self.sig_plot_cluster_end = pg.InfiniteLine(angle=90, pen=pg.mkPen("k", width=1.5))
174        self.sig_plot_cluster_end.setVisible(True)
175        self.sig_plot.addItem(self.sig_plot_cluster_end)
176
177        self.sig_plot_smooth_max = et.utils.SmoothMax(self.session_config.update_rate)
178
179        parking_car_html = (
180            '<div style="text-align: center">'
181            '<span style="color: #FFFFFF;font-size:15pt;">'
182            "{}</span></div>".format("Parked car detected!")
183        )
184        self.parking_car_text_item = pg.TextItem(
185            html=parking_car_html,
186            fill=orange_color,
187            anchor=(0.5, 0),
188        )
189        parking_no_car_html = (
190            '<div style="text-align: center">'
191            '<span style="color: #FFFFFF;font-size:15pt;">'
192            "{}</span></div>".format("No car detected.")
193        )
194        self.parking_no_car_text_item = pg.TextItem(
195            html=parking_no_car_html,
196            fill=blue_color,
197            anchor=(0.5, 0),
198        )
199
200        self.sig_plot.addItem(self.parking_car_text_item)
201        self.sig_plot.addItem(self.parking_no_car_text_item)
202        self.parking_car_text_item.hide()
203        self.parking_no_car_text_item.hide()
204
205        self.cluster_width = self.ref_app_config.weighted_distance_threshold_m
206
207        # Obstruction plot.
208        if self.ref_app_config.obstruction_detection:
209            self.obstruction_plot = win.addPlot(row=1, col=1)
210            self.obstruction_plot.setTitle("Obstruction Detection Signatures")
211            self.obstruction_plot.setMenuEnabled(False)
212            self.obstruction_plot.showGrid(x=True, y=True)
213            self.obstruction_plot.addLegend()
214            self.obstruction_plot.setLabel("left", "Average energy")
215            self.obstruction_plot.setLabel("bottom", "Distance (m)")
216            self.obstruction_plot.addItem(pg.PlotDataItem())
217            self.obstruction_plot.setXRange(min(self.obs_distances), max(self.obs_distances))
218            self.obstruction_plot.setYRange(0, MAX_AMPLITUDE)  # Set to standard
219            self.obstruction_plot_curve = self.obstruction_plot.plot(**self.orange)
220
221            symbol_obstruction_dot = dict(
222                symbol="o",
223                symbolSize=7,
224                symbolBrush=brush_dot,
225                symbolPen=None,
226                pen=None,
227            )
228            self.obstruction_plot_point = self.obstruction_plot.plot(**symbol_obstruction_dot)
229
230            symbol_kw_main = dict(
231                symbol="o", symbolSize=7, symbolBrush=brush, symbolPen=None, pen=None
232            )
233            self.obstruction_plot_center = self.obstruction_plot.plot(**symbol_kw_main)
234
235            self.obstruction_center_rect = pg.QtWidgets.QGraphicsRectItem(0, 0, 0.01, 0.01)
236            self.obstruction_center_rect.setPen(self.orange_transparent_pen)
237            self.obstruction_plot.addItem(self.obstruction_center_rect)
238
239            obstruction_html = (
240                '<div style="text-align: center">'
241                '<span style="color: #FFFFFF;font-size:15pt;">'
242                "{}</span></div>".format("Obstruction detected!")
243            )
244            self.obstruction_text_item = pg.TextItem(
245                html=obstruction_html,
246                fill=orange_color,
247                anchor=(0.5, 0),
248            )
249            no_obstruction_html = (
250                '<div style="text-align: center">'
251                '<span style="color: #FFFFFF;font-size:15pt;">'
252                "{}</span></div>".format("No obstruction detected.")
253            )
254            self.no_obstruction_text_item = pg.TextItem(
255                html=no_obstruction_html,
256                fill=blue_color,
257                anchor=(0.5, 0),
258            )
259
260            obs_text_x_pos = (
261                min(self.obs_distances) + (max(self.obs_distances) - min(self.obs_distances)) * 0.5
262            )
263            obs_text_y_pos = MAX_AMPLITUDE * 0.9
264
265            self.obstruction_text_item.setPos(obs_text_x_pos, obs_text_y_pos)
266            self.no_obstruction_text_item.setPos(obs_text_x_pos, obs_text_y_pos)
267
268            self.obstruction_plot.addItem(self.obstruction_text_item)
269            self.obstruction_plot.addItem(self.no_obstruction_text_item)
270            self.obstruction_text_item.hide()
271            self.no_obstruction_text_item.hide()
272
273        # Parking info plot.
274        self.parking_plot = win.addPlot(row=1, col=0)
275        self.parking_plot.setTitle("Noise adjusted amplitude")
276        self.parking_plot.setMenuEnabled(False)
277        self.parking_plot.showGrid(x=True, y=True)
278        self.parking_plot.setLabel("left", "Normalized energy")
279        self.parking_plot.setLabel("bottom", "Distance (m)")
280        self.parking_plot.addItem(pg.PlotDataItem())
281        self.parking_plot.setXRange(min(self.distances), max(self.distances))
282        self.parking_plot.setYRange(0, 100)  # Set to standard
283        self.parking_plot_curve = self.parking_plot.plot(**self.blue)
284        self.parking_smooth_max = et.utils.SmoothMax(self.session_config.update_rate)
285
286    def update_obstruction_text(self) -> None:
287        self.obstruction_text_timeout -= 1
288        if self.obstruction_text_timeout < 0:
289            self.obstruction_text_timeout = 0
290            self.obstruction_text_item.hide()
291
292    def show_obstruction_text(self) -> None:
293        self.obstruction_text_timeout = 5
294        self.obstruction_text_item.show()
295
296    def update(self, ref_app_result: RefAppResult) -> None:
297        signatures = ref_app_result.extra_result.signature_history
298        parking_data = ref_app_result.extra_result.parking_data
299
300        signature_x = [elm[0] for elm in signatures]
301        signature_y = [elm[1] for elm in signatures]
302
303        cluster_start = ref_app_result.extra_result.closest_object_dist
304        cluster_end = cluster_start + self.cluster_width
305        self.sig_plot_curve.setData(x=signature_x, y=signature_y)
306        self.sig_plot_cluster_start.setPos(cluster_start)
307        self.sig_plot_cluster_end.setPos(cluster_end)
308        self.sig_plot_cluster_start.setVisible(False)
309        self.sig_plot_cluster_end.setVisible(False)
310
311        sig_max = self.sig_plot_smooth_max.update(max(signature_y))
312        self.sig_plot.setYRange(0, sig_max)
313
314        sig_text_x_pos = (
315            self.sig_plot_x_range[0] + (self.sig_plot_x_range[1] - self.sig_plot_x_range[0]) * 0.5
316        )
317        sig_text_y_pos = sig_max * 0.9
318
319        self.parking_car_text_item.setPos(sig_text_x_pos, sig_text_y_pos)
320        self.parking_no_car_text_item.setPos(sig_text_x_pos, sig_text_y_pos)
321
322        if ref_app_result.car_detected:
323            self.parking_no_car_text_item.hide()
324            self.parking_car_text_item.show()
325            self.sig_plot_cluster_start.setVisible(True)
326            self.sig_plot_cluster_end.setVisible(True)
327        else:
328            self.parking_car_text_item.hide()
329            self.parking_no_car_text_item.show()
330
331        if self.ref_app_config.obstruction_detection:
332            obstruction_data = ref_app_result.extra_result.obstruction_data
333            self.obstruction_plot_curve.setData(self.obs_distances, obstruction_data)
334            point_x, point_y = ref_app_result.extra_result.obstruction_signature
335            center_x, center_y = ref_app_result.extra_result.obstruction_center
336            rect_x = center_x - self.obs_x_thres
337            rect_y = center_y - self.obs_y_thres
338            rect_w = 2 * self.obs_x_thres
339            rect_h = 2 * self.obs_y_thres
340
341            self.obstruction_center_rect.setRect(rect_x, rect_y, rect_w, rect_h)
342
343            self.obstruction_plot_point.setData(x=[point_x], y=[point_y])
344            self.obstruction_plot_center.setData(x=[center_x], y=[center_y])
345
346            if ref_app_result.obstruction_detected:
347                self.no_obstruction_text_item.hide()
348                self.obstruction_text_item.show()
349            else:
350                self.obstruction_text_item.hide()
351                self.no_obstruction_text_item.show()
352
353        park_max = np.amax(parking_data)
354        park_max = self.parking_smooth_max.update(park_max)
355        self.parking_plot.setYRange(0, park_max)
356        self.parking_plot_curve.setData(self.distances, parking_data)
357
358
359if __name__ == "__main__":
360    main()