examples/a121/algo/cargo/example_app.py#
1# Copyright (c) Acconeer AB, 2025
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10from PySide6.QtGui import QFont
11
12import pyqtgraph as pg
13
14import acconeer.exptool as et
15from acconeer.exptool import a121
16from acconeer.exptool.a121.algo.cargo import (
17 CargoPresenceConfig,
18 ExApp,
19 ExAppConfig,
20 ExAppContext,
21 ExAppResult,
22 UtilizationLevelConfig,
23)
24
25# These are some of the presets available.
26# Replace 'ex_app_config' further down to use one of the presets
27from acconeer.exptool.a121.algo.cargo._configs import (
28 get_10_ft_container_config,
29 get_20_ft_container_config,
30 get_40_ft_container_config,
31)
32from acconeer.exptool.a121.algo.cargo._ex_app import PRESENCE_RUN_TIME_S, ContainerSize, _Mode
33
34
35SENSOR_ID = 1
36
37
38def main():
39 args = a121.ExampleArgumentParser().parse_args()
40 et.utils.config_logging(args)
41
42 client = a121.Client.open(**a121.get_client_args(args))
43
44 # All configurable parameters are displayed here.
45 # These settings correspond to the "No lens" preset.
46 # There are also other preset configurations available by the imports!
47
48 utilization_level_config = UtilizationLevelConfig(
49 update_rate=5,
50 threshold_sensitivity=0.5,
51 signal_quality=25,
52 )
53
54 cargo_presence_config = CargoPresenceConfig(
55 burst_rate=0.1,
56 update_rate=6,
57 signal_quality=30,
58 sweeps_per_frame=12,
59 inter_detection_threshold=2,
60 intra_detection_threshold=2,
61 )
62
63 ex_app_config = ExAppConfig(
64 activate_presence=True,
65 cargo_presence_config=cargo_presence_config,
66 activate_utilization_level=True,
67 utilization_level_config=utilization_level_config,
68 container_size=ContainerSize.CONTAINER_20_FT,
69 )
70
71 example_app = ExApp(
72 client=client,
73 sensor_id=SENSOR_ID,
74 ex_app_config=ex_app_config,
75 )
76 example_app.start()
77
78 pg_updater = PGUpdater(
79 ex_app_config,
80 example_app.ex_app_context,
81 )
82
83 pg_process = et.PGProcess(pg_updater)
84 pg_process.start()
85
86 interrupt_handler = et.utils.ExampleInterruptHandler()
87 print("Press Ctrl-C to end session")
88
89 while not interrupt_handler.got_signal:
90 example_app_result = example_app.get_next()
91 if example_app_result.mode == _Mode.DISTANCE:
92 if example_app_result.level_m is not None:
93 result_str = (
94 f"Utilization level: {np.around(example_app_result.level_m, 2)} m, "
95 f"{int(np.around(example_app_result.level_percent, 0))}%"
96 )
97 else:
98 result_str = "No utilization level detected"
99 else:
100 if example_app_result.presence_detected:
101 result_str = "Presence detected"
102 else:
103 result_str = "Presence not detected"
104 print(f"Running detector: {example_app_result.mode.name}\n" f"{result_str}\n")
105
106 try:
107 pg_process.put_data(example_app_result)
108 except et.PGProccessDiedException:
109 break
110
111 print("Disconnecting...")
112 pg_process.close()
113 client.close()
114
115
116class PGUpdater:
117 def __init__(self, ex_app_config: ExAppConfig, ex_app_context: ExAppContext):
118 self.ex_app_config = ex_app_config
119 self.ex_app_context = ex_app_context
120
121 def setup(self, win) -> None:
122 self.utilization_level_config = self.ex_app_config.utilization_level_config
123 self.cargo_presence_config = self.ex_app_config.cargo_presence_config
124
125 self.num_rects = 16
126
127 # Utilization level
128
129 if self.ex_app_config.activate_utilization_level:
130 # Distance sweep plot
131
132 self.sweep_plot = win.addPlot(row=0, col=0, title="Distance sweep")
133 self.sweep_plot.setMenuEnabled(False)
134 self.sweep_plot.showGrid(x=True, y=True)
135 self.sweep_plot.addLegend()
136 self.sweep_plot.setLabel("left", "Amplitude")
137 self.sweep_plot.setLabel("bottom", "Distance (m)")
138 self.sweep_plot.addItem(pg.PlotDataItem())
139
140 self.num_curves = 4
141
142 pen = et.utils.pg_pen_cycler(0)
143 brush = et.utils.pg_brush_cycler(0)
144 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
145 feat_kw = dict(pen=pen, **symbol_kw)
146 self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
147
148 pen = et.utils.pg_pen_cycler(1)
149 brush = et.utils.pg_brush_cycler(1)
150 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
151 feat_kw = dict(pen=pen, **symbol_kw)
152 self.threshold_curves = [
153 self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)
154 ]
155
156 sweep_plot_legend = pg.LegendItem(offset=(-30, 30))
157 sweep_plot_legend.setParentItem(self.sweep_plot.graphicsItem())
158 sweep_plot_legend.addItem(self.sweep_curves[0], "Sweep")
159 sweep_plot_legend.addItem(self.threshold_curves[0], "Threshold")
160
161 font = QFont()
162 font.setPixelSize(16)
163 self.sweep_text_item = pg.TextItem(
164 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
165 anchor=(0.5, 0),
166 color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
167 )
168 self.sweep_text_item.setFont(font)
169 self.sweep_text_item.hide()
170 self.sweep_plot.addItem(self.sweep_text_item)
171
172 self.sweep_main_peak_line = pg.InfiniteLine(pen=pg.mkPen("k", width=1.5, dash=[2, 8]))
173 self.sweep_main_peak_line.hide()
174 self.sweep_plot.addItem(self.sweep_main_peak_line)
175
176 self.sweep_smooth_max = et.utils.SmoothMax()
177
178 # Utilization level plot
179
180 self.rect_plot = win.addPlot(row=1, col=0, title="Utilization level")
181 self.rect_plot.setAspectLocked()
182 self.rect_plot.hideAxis("left")
183 self.rect_plot.hideAxis("bottom")
184 self.rects = []
185
186 pen = pg.mkPen(None)
187 rect_height = self.num_rects / 2.0
188 for r in np.arange(self.num_rects) + 1:
189 rect = pg.QtWidgets.QGraphicsRectItem(r, rect_height, 1, rect_height)
190 rect.setPen(pen)
191 rect.setBrush(et.utils.pg_brush_cycler(7))
192 self.rect_plot.addItem(rect)
193 self.rects.append(rect)
194
195 self.level_html_format = (
196 '<div style="text-align: center">'
197 '<span style="color: #FFFFFF;font-size:12pt;">'
198 "{}</span></div>"
199 )
200
201 self.level_text_item = pg.TextItem(
202 fill=pg.mkColor(0, 150, 0),
203 anchor=(0.5, 0),
204 )
205
206 no_detection_html = (
207 '<div style="text-align: center">'
208 '<span style="color: #FFFFFF;font-size:12pt;">'
209 "{}</span></div>".format("No level detected")
210 )
211
212 self.no_detection_text_item = pg.TextItem(
213 html=no_detection_html,
214 fill=pg.mkColor("r"),
215 anchor=(0.5, 0),
216 )
217
218 self.rect_plot.addItem(self.level_text_item)
219 self.rect_plot.addItem(self.no_detection_text_item)
220 self.level_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
221 self.level_text_item.hide()
222 self.no_detection_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
223 self.no_detection_text_item.show()
224
225 # Presence
226
227 if self.ex_app_config.activate_presence:
228 estimated_frame_rate = self.ex_app_context.presence_context.estimated_frame_rate
229
230 self.history_length_n = int(round(PRESENCE_RUN_TIME_S * estimated_frame_rate) + 1)
231 self.intra_history = np.zeros(self.history_length_n)
232 self.inter_history = np.zeros(self.history_length_n)
233
234 # Presence history plot
235
236 self.presence_hist_plot = win.addPlot(
237 row=0,
238 col=1,
239 title="Presence history",
240 )
241 self.presence_hist_plot.setMenuEnabled(False)
242 self.presence_hist_plot.setMouseEnabled(x=False, y=False)
243 self.presence_hist_plot.hideButtons()
244 self.presence_hist_plot.showGrid(x=True, y=True)
245 self.presence_hist_plot.setLabel("bottom", "Time (s)")
246 self.presence_hist_plot.setLabel("left", "Score")
247 self.presence_hist_plot.setXRange(-PRESENCE_RUN_TIME_S, 0)
248 self.presence_history_smooth_max = et.utils.SmoothMax(estimated_frame_rate)
249 self.presence_hist_plot.setYRange(0, 10)
250
251 self.intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
252 self.intra_pen = et.utils.pg_pen_cycler(1)
253
254 self.intra_hist_curve = self.presence_hist_plot.plot(pen=self.intra_pen)
255 self.intra_limit_line = pg.InfiniteLine(angle=0, pen=self.intra_dashed_pen)
256 self.presence_hist_plot.addItem(self.intra_limit_line)
257 self.intra_limit_line.setPos(self.cargo_presence_config.intra_detection_threshold)
258 self.intra_limit_line.setPen(self.intra_dashed_pen)
259
260 self.inter_pen = et.utils.pg_pen_cycler(0)
261 self.inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
262
263 self.inter_hist_curve = self.presence_hist_plot.plot(pen=self.inter_pen)
264 self.inter_limit_line = pg.InfiniteLine(angle=0, pen=self.inter_dashed_pen)
265 self.presence_hist_plot.addItem(self.inter_limit_line)
266 self.inter_limit_line.setPos(self.cargo_presence_config.inter_detection_threshold)
267 self.inter_limit_line.setPen(self.inter_dashed_pen)
268
269 self.hist_xs = np.linspace(-PRESENCE_RUN_TIME_S, 0, self.history_length_n)
270
271 # Presence detection plot
272
273 self.presence_detection_plot = win.addPlot(row=1, col=1, title="Presence")
274 self.presence_detection_plot.setAspectLocked()
275 self.presence_detection_plot.hideAxis("left")
276 self.presence_detection_plot.hideAxis("bottom")
277
278 present_html_format = (
279 '<div style="text-align: center">'
280 '<span style="color: #FFFFFF;font-size:12pt;">'
281 "{}</span></div>".format("Presence detected")
282 )
283 not_present_html = (
284 '<div style="text-align: center">'
285 '<span style="color: #FFFFFF;font-size:12pt;">'
286 "{}</span></div>".format("No presence detected")
287 )
288 self.present_text_item = pg.TextItem(
289 html=present_html_format,
290 fill=pg.mkColor(0, 150, 0),
291 anchor=(0.65, 0),
292 )
293 self.not_present_text_item = pg.TextItem(
294 html=not_present_html,
295 fill=pg.mkColor("r"),
296 anchor=(0.6, 0),
297 )
298
299 self.presence_detection_plot.addItem(self.present_text_item)
300 self.presence_detection_plot.addItem(self.not_present_text_item)
301 self.present_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
302 self.not_present_text_item.setPos(self.num_rects / 2.0 + 1, self.num_rects + 2)
303 self.present_text_item.hide()
304
305 pen = pg.mkPen(None)
306 rect_height = self.num_rects / 2.0
307 rect_length = self.num_rects
308
309 self.presence_rect = pg.QtWidgets.QGraphicsRectItem(
310 0, rect_height, rect_length, rect_height
311 )
312 self.presence_rect.setPen(pen)
313 self.presence_rect.setBrush(et.utils.pg_brush_cycler(7))
314 self.presence_detection_plot.addItem(self.presence_rect)
315
316 def update(self, result: ExAppResult) -> None:
317 if result.mode == _Mode.DISTANCE:
318 if self.ex_app_config.activate_presence:
319 self.intra_history = np.zeros(self.history_length_n)
320 self.inter_history = np.zeros(self.history_length_n)
321
322 # Sweep plot
323
324 distance = result.distance
325 max_val_in_plot = 0
326 if result.distance_processor_result is not None:
327 for idx, processor_result in enumerate(result.distance_processor_result):
328 abs_sweep = processor_result.extra_result.abs_sweep
329 threshold = processor_result.extra_result.used_threshold
330 distances_m = processor_result.extra_result.distances_m
331
332 self.sweep_curves[idx].setData(distances_m, abs_sweep)
333 self.threshold_curves[idx].setData(distances_m, threshold)
334
335 max_val_in_subsweep = max(max(threshold), max(abs_sweep))
336 if max_val_in_plot < max_val_in_subsweep:
337 max_val_in_plot = max_val_in_subsweep
338
339 self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_plot))
340
341 if distance is not None:
342 text_y_pos = self.sweep_plot.getAxis("left").range[1] * 0.95
343 text_x_pos = (
344 self.sweep_plot.getAxis("bottom").range[1]
345 + self.sweep_plot.getAxis("bottom").range[0]
346 ) / 2.0
347 self.sweep_text_item.setPos(text_x_pos, text_y_pos)
348 self.sweep_text_item.setHtml("Main peak distance: {:.3f} m".format(distance))
349 self.sweep_text_item.show()
350
351 self.sweep_main_peak_line.setPos(distance)
352 self.sweep_main_peak_line.show()
353 else:
354 self.sweep_text_item.hide()
355 self.sweep_main_peak_line.hide()
356
357 # Utilization level plot
358
359 # Show the percentage level plot if the plot width is greater than 400 pixels,
360 # otherwise display the level as text.
361
362 if result.level_percent is None: # No detection
363 for rect in self.rects:
364 rect.setBrush(et.utils.pg_brush_cycler(7))
365 self.level_text_item.hide()
366 self.no_detection_text_item.show()
367 else:
368 self.bar_loc = int(
369 np.around(self.num_rects - result.level_percent / 100 * self.num_rects)
370 )
371 for rect in self.rects[self.bar_loc :]:
372 rect.setBrush(et.utils.pg_brush_cycler(0))
373
374 for rect in self.rects[: self.bar_loc]:
375 rect.setBrush(et.utils.pg_brush_cycler(7))
376
377 level_text = "Utilization level: {:.2f} m, {:.0f} %".format(
378 result.level_m,
379 result.level_percent,
380 )
381 level_html = self.level_html_format.format(level_text)
382 self.level_text_item.setHtml(level_html)
383 self.level_text_item.show()
384 self.no_detection_text_item.hide()
385
386 for rect in self.rects:
387 rect.setVisible(True)
388
389 else:
390 # Presence history
391
392 self.intra_history = np.roll(self.intra_history, -1)
393 self.intra_history[-1] = result.intra_presence_score
394 self.intra_hist_curve.setData(self.hist_xs, self.intra_history)
395
396 self.inter_history = np.roll(self.inter_history, -1)
397 self.inter_history[-1] = result.inter_presence_score
398 self.inter_hist_curve.setData(self.hist_xs, self.inter_history)
399
400 # Set y-range
401
402 if np.isnan(self.intra_history).all():
403 intra_m_hist = self.cargo_presence_config.intra_detection_threshold
404 else:
405 intra_m_hist = max(
406 float(np.nanmax(self.intra_history)),
407 self.cargo_presence_config.intra_detection_threshold * 1.05,
408 )
409
410 if np.isnan(self.inter_history).all():
411 inter_m_hist = self.cargo_presence_config.inter_detection_threshold
412 else:
413 inter_m_hist = max(
414 float(np.nanmax(self.inter_history)),
415 self.cargo_presence_config.inter_detection_threshold * 1.05,
416 )
417
418 m_hist = max(intra_m_hist, inter_m_hist)
419 m_hist = self.presence_history_smooth_max.update(m_hist)
420 self.presence_hist_plot.setYRange(0, m_hist)
421
422 # Presence detection plot
423
424 if result.presence_detected:
425 self.present_text_item.show()
426 self.not_present_text_item.hide()
427 self.presence_rect.setBrush(pg.mkColor(0, 150, 0))
428 else:
429 self.present_text_item.hide()
430 self.not_present_text_item.show()
431 self.presence_rect.setBrush(et.utils.pg_brush_cycler(7))
432
433
434if __name__ == "__main__":
435 main()
View this example on GitHub: acconeer/acconeer-python-exploration