examples/a121/algo/cargo/example_app.py

examples/a121/algo/cargo/example_app.py#

  1# Copyright (c) Acconeer AB, 2025
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10from PySide6.QtGui import QFont
 11
 12import pyqtgraph as pg
 13
 14import acconeer.exptool as et
 15from acconeer.exptool import a121
 16from acconeer.exptool.a121.algo.cargo import (
 17    CargoPresenceConfig,
 18    ExApp,
 19    ExAppConfig,
 20    ExAppContext,
 21    ExAppResult,
 22    UtilizationLevelConfig,
 23)
 24
 25# These are some of the presets available.
 26# Replace 'ex_app_config' further down to use one of the presets
 27from acconeer.exptool.a121.algo.cargo._configs import (
 28    get_10_ft_container_config,
 29    get_20_ft_container_config,
 30    get_40_ft_container_config,
 31)
 32from acconeer.exptool.a121.algo.cargo._ex_app import PRESENCE_RUN_TIME_S, ContainerSize, _Mode
 33
 34
 35SENSOR_ID = 1
 36
 37
 38def main():
 39    args = a121.ExampleArgumentParser().parse_args()
 40    et.utils.config_logging(args)
 41
 42    client = a121.Client.open(**a121.get_client_args(args))
 43
 44    # All configurable parameters are displayed here.
 45    # These settings correspond to the "No lens" preset.
 46    # There are also other preset configurations available by the imports!
 47
 48    utilization_level_config = UtilizationLevelConfig(
 49        update_rate=5,
 50        threshold_sensitivity=0.5,
 51        signal_quality=25,
 52    )
 53
 54    cargo_presence_config = CargoPresenceConfig(
 55        burst_rate=0.1,
 56        update_rate=6,
 57        signal_quality=30,
 58        sweeps_per_frame=12,
 59        inter_detection_threshold=2,
 60        intra_detection_threshold=2,
 61    )
 62
 63    ex_app_config = ExAppConfig(
 64        activate_presence=True,
 65        cargo_presence_config=cargo_presence_config,
 66        activate_utilization_level=True,
 67        utilization_level_config=utilization_level_config,
 68        container_size=ContainerSize.CONTAINER_20_FT,
 69    )
 70
 71    example_app = ExApp(
 72        client=client,
 73        sensor_id=SENSOR_ID,
 74        ex_app_config=ex_app_config,
 75    )
 76    example_app.start()
 77
 78    pg_updater = PGUpdater(
 79        ex_app_config,
 80        example_app.ex_app_context,
 81    )
 82
 83    pg_process = et.PGProcess(pg_updater)
 84    pg_process.start()
 85
 86    interrupt_handler = et.utils.ExampleInterruptHandler()
 87    print("Press Ctrl-C to end session")
 88
 89    while not interrupt_handler.got_signal:
 90        example_app_result = example_app.get_next()
 91        if example_app_result.mode == _Mode.DISTANCE:
 92            if example_app_result.level_m is not None:
 93                result_str = (
 94                    f"Utilization level: {np.around(example_app_result.level_m, 2)} m, "
 95                    f"{int(np.around(example_app_result.level_percent, 0))}%"
 96                )
 97            else:
 98                result_str = "No utilization level detected"
 99        else:
100            if example_app_result.presence_detected:
101                result_str = "Presence detected"
102            else:
103                result_str = "Presence not detected"
104        print(f"Running detector: {example_app_result.mode.name}\n" f"{result_str}\n")
105
106        try:
107            pg_process.put_data(example_app_result)
108        except et.PGProccessDiedException:
109            break
110
111    print("Disconnecting...")
112    pg_process.close()
113    client.close()
114
115
116class PGUpdater:
117    def __init__(self, ex_app_config: ExAppConfig, ex_app_context: ExAppContext):
118        self.ex_app_config = ex_app_config
119        self.ex_app_context = ex_app_context
120
121    def setup(self, win) -> None:
122        self.utilization_level_config = self.ex_app_config.utilization_level_config
123        self.cargo_presence_config = self.ex_app_config.cargo_presence_config
124
125        self.num_rects = 16
126
127        # Utilization level
128
129        if self.ex_app_config.activate_utilization_level:
130            # Distance sweep plot
131
132            self.sweep_plot = win.addPlot(row=0, col=0, title="Distance sweep")
133            self.sweep_plot.setMenuEnabled(False)
134            self.sweep_plot.showGrid(x=True, y=True)
135            self.sweep_plot.addLegend()
136            self.sweep_plot.setLabel("left", "Amplitude")
137            self.sweep_plot.setLabel("bottom", "Distance (m)")
138            self.sweep_plot.addItem(pg.PlotDataItem())
139
140            self.num_curves = 4
141
142            pen = et.utils.pg_pen_cycler(0)
143            brush = et.utils.pg_brush_cycler(0)
144            symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
145            feat_kw = dict(pen=pen, **symbol_kw)
146            self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
147
148            pen = et.utils.pg_pen_cycler(1)
149            brush = et.utils.pg_brush_cycler(1)
150            symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
151            feat_kw = dict(pen=pen, **symbol_kw)
152            self.threshold_curves = [
153                self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)
154            ]
155
156            sweep_plot_legend = pg.LegendItem(offset=(-30, 30))
157            sweep_plot_legend.setParentItem(self.sweep_plot.graphicsItem())
158            sweep_plot_legend.addItem(self.sweep_curves[0], "Sweep")
159            sweep_plot_legend.addItem(self.threshold_curves[0], "Threshold")
160
161            font = QFont()
162            font.setPixelSize(16)
163            self.sweep_text_item = pg.TextItem(
164                fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
165                anchor=(0.5, 0),
166                color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
167            )
168            self.sweep_text_item.setFont(font)
169            self.sweep_text_item.hide()
170            self.sweep_plot.addItem(self.sweep_text_item)
171
172            self.sweep_main_peak_line = pg.InfiniteLine(pen=pg.mkPen("k", width=1.5, dash=[2, 8]))
173            self.sweep_main_peak_line.hide()
174            self.sweep_plot.addItem(self.sweep_main_peak_line)
175
176            self.sweep_smooth_max = et.utils.SmoothMax()
177
178            # Utilization level plot
179
180            self.rect_plot = win.addPlot(row=1, col=0, title="Utilization level")
181            self.rect_plot.setAspectLocked()
182            self.rect_plot.hideAxis("left")
183            self.rect_plot.hideAxis("bottom")
184            self.rects = []
185
186            pen = pg.mkPen(None)
187            rect_height = self.num_rects / 2.0
188            for r in np.arange(self.num_rects) + 1:
189                rect = pg.QtWidgets.QGraphicsRectItem(r, rect_height, 1, rect_height)
190                rect.setPen(pen)
191                rect.setBrush(et.utils.pg_brush_cycler(7))
192                self.rect_plot.addItem(rect)
193                self.rects.append(rect)
194
195            self.level_html_format = (
196                '<div style="text-align: center">'
197                '<span style="color: #FFFFFF;font-size:12pt;">'
198                "{}</span></div>"
199            )
200
201            self.level_text_item = pg.TextItem(
202                fill=pg.mkColor(0, 150, 0),
203                anchor=(0.5, 0),
204            )
205
206            no_detection_html = (
207                '<div style="text-align: center">'
208                '<span style="color: #FFFFFF;font-size:12pt;">'
209                "{}</span></div>".format("No level detected")
210            )
211
212            self.no_detection_text_item = pg.TextItem(
213                html=no_detection_html,
214                fill=pg.mkColor("r"),
215                anchor=(0.5, 0),
216            )
217
218            self.rect_plot.addItem(self.level_text_item)
219            self.rect_plot.addItem(self.no_detection_text_item)
220            self.level_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
221            self.level_text_item.hide()
222            self.no_detection_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
223            self.no_detection_text_item.show()
224
225        # Presence
226
227        if self.ex_app_config.activate_presence:
228            estimated_frame_rate = self.ex_app_context.presence_context.estimated_frame_rate
229
230            self.history_length_n = int(round(PRESENCE_RUN_TIME_S * estimated_frame_rate) + 1)
231            self.intra_history = np.zeros(self.history_length_n)
232            self.inter_history = np.zeros(self.history_length_n)
233
234            # Presence history plot
235
236            self.presence_hist_plot = win.addPlot(
237                row=0,
238                col=1,
239                title="Presence history",
240            )
241            self.presence_hist_plot.setMenuEnabled(False)
242            self.presence_hist_plot.setMouseEnabled(x=False, y=False)
243            self.presence_hist_plot.hideButtons()
244            self.presence_hist_plot.showGrid(x=True, y=True)
245            self.presence_hist_plot.setLabel("bottom", "Time (s)")
246            self.presence_hist_plot.setLabel("left", "Score")
247            self.presence_hist_plot.setXRange(-PRESENCE_RUN_TIME_S, 0)
248            self.presence_history_smooth_max = et.utils.SmoothMax(estimated_frame_rate)
249            self.presence_hist_plot.setYRange(0, 10)
250
251            self.intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
252            self.intra_pen = et.utils.pg_pen_cycler(1)
253
254            self.intra_hist_curve = self.presence_hist_plot.plot(pen=self.intra_pen)
255            self.intra_limit_line = pg.InfiniteLine(angle=0, pen=self.intra_dashed_pen)
256            self.presence_hist_plot.addItem(self.intra_limit_line)
257            self.intra_limit_line.setPos(self.cargo_presence_config.intra_detection_threshold)
258            self.intra_limit_line.setPen(self.intra_dashed_pen)
259
260            self.inter_pen = et.utils.pg_pen_cycler(0)
261            self.inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
262
263            self.inter_hist_curve = self.presence_hist_plot.plot(pen=self.inter_pen)
264            self.inter_limit_line = pg.InfiniteLine(angle=0, pen=self.inter_dashed_pen)
265            self.presence_hist_plot.addItem(self.inter_limit_line)
266            self.inter_limit_line.setPos(self.cargo_presence_config.inter_detection_threshold)
267            self.inter_limit_line.setPen(self.inter_dashed_pen)
268
269            self.hist_xs = np.linspace(-PRESENCE_RUN_TIME_S, 0, self.history_length_n)
270
271            # Presence detection plot
272
273            self.presence_detection_plot = win.addPlot(row=1, col=1, title="Presence")
274            self.presence_detection_plot.setAspectLocked()
275            self.presence_detection_plot.hideAxis("left")
276            self.presence_detection_plot.hideAxis("bottom")
277
278            present_html_format = (
279                '<div style="text-align: center">'
280                '<span style="color: #FFFFFF;font-size:12pt;">'
281                "{}</span></div>".format("Presence detected")
282            )
283            not_present_html = (
284                '<div style="text-align: center">'
285                '<span style="color: #FFFFFF;font-size:12pt;">'
286                "{}</span></div>".format("No presence detected")
287            )
288            self.present_text_item = pg.TextItem(
289                html=present_html_format,
290                fill=pg.mkColor(0, 150, 0),
291                anchor=(0.65, 0),
292            )
293            self.not_present_text_item = pg.TextItem(
294                html=not_present_html,
295                fill=pg.mkColor("r"),
296                anchor=(0.6, 0),
297            )
298
299            self.presence_detection_plot.addItem(self.present_text_item)
300            self.presence_detection_plot.addItem(self.not_present_text_item)
301            self.present_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
302            self.not_present_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
303            self.present_text_item.hide()
304
305            pen = pg.mkPen(None)
306            rect_height = self.num_rects / 2.0
307            rect_length = self.num_rects
308
309            self.presence_rect = pg.QtWidgets.QGraphicsRectItem(
310                0, rect_height, rect_length, rect_height
311            )
312            self.presence_rect.setPen(pen)
313            self.presence_rect.setBrush(et.utils.pg_brush_cycler(7))
314            self.presence_detection_plot.addItem(self.presence_rect)
315
316    def update(self, result: ExAppResult) -> None:
317        if result.mode == _Mode.DISTANCE:
318            if self.ex_app_config.activate_presence:
319                self.intra_history = np.zeros(self.history_length_n)
320                self.inter_history = np.zeros(self.history_length_n)
321
322            # Sweep plot
323
324            distance = result.distance
325            max_val_in_plot = 0
326            if result.distance_processor_result is not None:
327                for idx, processor_result in enumerate(result.distance_processor_result):
328                    abs_sweep = processor_result.extra_result.abs_sweep
329                    threshold = processor_result.extra_result.used_threshold
330                    distances_m = processor_result.extra_result.distances_m
331
332                    self.sweep_curves[idx].setData(distances_m, abs_sweep)
333                    self.threshold_curves[idx].setData(distances_m, threshold)
334
335                    max_val_in_subsweep = max(max(threshold), max(abs_sweep))
336                    if max_val_in_plot < max_val_in_subsweep:
337                        max_val_in_plot = max_val_in_subsweep
338
339                self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_plot))
340
341            if distance is not None:
342                text_y_pos = self.sweep_plot.getAxis("left").range[1] * 0.95
343                text_x_pos = (
344                    self.sweep_plot.getAxis("bottom").range[1]
345                    + self.sweep_plot.getAxis("bottom").range[0]
346                ) / 2.0
347                self.sweep_text_item.setPos(text_x_pos, text_y_pos)
348                self.sweep_text_item.setHtml("Main peak distance: {:.3f} m".format(distance))
349                self.sweep_text_item.show()
350
351                self.sweep_main_peak_line.setPos(distance)
352                self.sweep_main_peak_line.show()
353            else:
354                self.sweep_text_item.hide()
355                self.sweep_main_peak_line.hide()
356
357            # Utilization level plot
358
359            # Show the percentage level plot if the plot width is greater than 400 pixels,
360            # otherwise display the level as text.
361
362            if result.level_percent is None:  # No detection
363                for rect in self.rects:
364                    rect.setBrush(et.utils.pg_brush_cycler(7))
365                self.level_text_item.hide()
366                self.no_detection_text_item.show()
367            else:
368                self.bar_loc = int(
369                    np.around(self.num_rects - result.level_percent / 100 * self.num_rects)
370                )
371                for rect in self.rects[self.bar_loc :]:
372                    rect.setBrush(et.utils.pg_brush_cycler(0))
373
374                for rect in self.rects[: self.bar_loc]:
375                    rect.setBrush(et.utils.pg_brush_cycler(7))
376
377                level_text = "Utilization level: {:.2f} m, {:.0f} %".format(
378                    result.level_m,
379                    result.level_percent,
380                )
381                level_html = self.level_html_format.format(level_text)
382                self.level_text_item.setHtml(level_html)
383                self.level_text_item.show()
384                self.no_detection_text_item.hide()
385
386            for rect in self.rects:
387                rect.setVisible(True)
388
389        else:
390            # Presence history
391
392            self.intra_history = np.roll(self.intra_history, -1)
393            self.intra_history[-1] = result.intra_presence_score
394            self.intra_hist_curve.setData(self.hist_xs, self.intra_history)
395
396            self.inter_history = np.roll(self.inter_history, -1)
397            self.inter_history[-1] = result.inter_presence_score
398            self.inter_hist_curve.setData(self.hist_xs, self.inter_history)
399
400            # Set y-range
401
402            if np.isnan(self.intra_history).all():
403                intra_m_hist = self.cargo_presence_config.intra_detection_threshold
404            else:
405                intra_m_hist = max(
406                    float(np.nanmax(self.intra_history)),
407                    self.cargo_presence_config.intra_detection_threshold * 1.05,
408                )
409
410            if np.isnan(self.inter_history).all():
411                inter_m_hist = self.cargo_presence_config.inter_detection_threshold
412            else:
413                inter_m_hist = max(
414                    float(np.nanmax(self.inter_history)),
415                    self.cargo_presence_config.inter_detection_threshold * 1.05,
416                )
417
418            m_hist = max(intra_m_hist, inter_m_hist)
419            m_hist = self.presence_history_smooth_max.update(m_hist)
420            self.presence_hist_plot.setYRange(0, m_hist)
421
422            # Presence detection plot
423
424            if result.presence_detected:
425                self.present_text_item.show()
426                self.not_present_text_item.hide()
427                self.presence_rect.setBrush(pg.mkColor(0, 150, 0))
428            else:
429                self.present_text_item.hide()
430                self.not_present_text_item.show()
431                self.presence_rect.setBrush(et.utils.pg_brush_cycler(7))
432
433
434if __name__ == "__main__":
435    main()

View this example on GitHub: acconeer/acconeer-python-exploration