examples/a121/algo/smart_presence/ref_app.py#
1# Copyright (c) Acconeer AB, 2023-2024
2# All rights reserved
3
4from __future__ import annotations
5
6from typing import List, Tuple
7
8import numpy as np
9import numpy.typing as npt
10
11from PySide6 import QtCore
12
13import pyqtgraph as pg
14
15import acconeer.exptool as et
16from acconeer.exptool import a121
17from acconeer.exptool.a121.algo.presence import Detector
18from acconeer.exptool.a121.algo.smart_presence._ref_app import (
19 PresenceWakeUpConfig,
20 PresenceZoneConfig,
21 RefApp,
22 RefAppConfig,
23 RefAppResult,
24 _Mode,
25)
26
27
28def main():
29 args = a121.ExampleArgumentParser().parse_args()
30 et.utils.config_logging(args)
31
32 client = a121.Client.open(**a121.get_client_args(args))
33
34 ref_app_config = RefAppConfig(
35 wake_up_mode=True,
36 wake_up_config=PresenceWakeUpConfig(
37 start_m=1.0,
38 end_m=3.0,
39 num_zones=5,
40 num_zones_for_wake_up=2,
41 ),
42 nominal_config=PresenceZoneConfig(
43 start_m=1.0,
44 end_m=3.0,
45 num_zones=3,
46 ),
47 )
48
49 ref_app = RefApp(client=client, sensor_id=1, ref_app_config=ref_app_config)
50 ref_app.start()
51
52 nominal_sensor_config = Detector._get_sensor_config(ref_app.nominal_detector_config)
53 distances = np.linspace(
54 ref_app_config.nominal_config.start_m,
55 ref_app_config.nominal_config.end_m,
56 nominal_sensor_config.num_points,
57 )
58 nominal_zone_limits = ref_app.ref_app_processor.create_zones(
59 distances, ref_app_config.nominal_config.num_zones
60 )
61
62 pg_updater = PGUpdater(
63 ref_app_config,
64 ref_app.ref_app_context.wake_up_detector_context.estimated_frame_rate,
65 nominal_zone_limits,
66 ref_app.ref_app_processor.zone_limits,
67 )
68 pg_process = et.PGProcess(pg_updater)
69 pg_process.start()
70
71 interrupt_handler = et.utils.ExampleInterruptHandler()
72 print("Press Ctrl-C to end session")
73
74 while not interrupt_handler.got_signal:
75 ref_app_result = ref_app.get_next()
76 if ref_app_result.presence_detected:
77 print(f"Presence in zone {ref_app_result.max_presence_zone}")
78 else:
79 print("No presence")
80 try:
81 pg_process.put_data(ref_app_result)
82 except et.PGProccessDiedException:
83 break
84
85 ref_app.stop()
86
87 print("Disconnecting...")
88 client.close()
89
90
91class PGUpdater:
92 def __init__(
93 self,
94 ref_app_config: RefAppConfig,
95 estimated_frame_rate: float,
96 nominal_zone_limits: npt.NDArray[np.float64],
97 wake_up_zone_limits: npt.NDArray[np.float64],
98 ) -> None:
99 self.ref_app_config = ref_app_config
100 self.nominal_config = ref_app_config.nominal_config
101 self.wake_up_config = ref_app_config.wake_up_config
102
103 self.show_all_detected_zones = ref_app_config.show_all_detected_zones
104 self.nominal_zone_limits = nominal_zone_limits
105 self.wake_up_zone_limits = wake_up_zone_limits
106 self.estimated_frame_rate = estimated_frame_rate
107
108 self.history_length_s = 5
109 self.time_fifo: List[float] = []
110 self.intra_fifo: List[float] = []
111 self.inter_fifo: List[float] = []
112
113 self.intra_limit_lines = []
114 self.inter_limit_lines = []
115
116 self.setup_is_done = False
117
118 def setup(self, win):
119 win.setWindowTitle("Acconeer smart presence example")
120
121 # Intra presence history plot
122
123 self.intra_hist_plot = win.addPlot(
124 row=0,
125 col=0,
126 title="Intra presence history (fast motions)",
127 )
128 self.intra_hist_plot.setMenuEnabled(False)
129 self.intra_hist_plot.setMouseEnabled(x=False, y=False)
130 self.intra_hist_plot.hideButtons()
131 self.intra_hist_plot.showGrid(x=True, y=True)
132 self.intra_hist_plot.setLabel("bottom", "Time (s)")
133 self.intra_hist_plot.setLabel("left", "Score")
134 self.intra_hist_plot.setXRange(-self.history_length_s, 0)
135 self.intra_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
136 self.intra_hist_plot.setYRange(0, 10)
137 if not self.nominal_config.intra_enable:
138 intra_color = et.utils.color_cycler(1)
139 intra_color = f"{intra_color}50"
140 self.nominal_intra_dashed_pen = pg.mkPen(
141 intra_color, width=2.5, style=QtCore.Qt.DashLine
142 )
143 self.nominal_intra_pen = pg.mkPen(intra_color, width=2)
144 else:
145 self.nominal_intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
146 self.nominal_intra_pen = et.utils.pg_pen_cycler(1)
147
148 self.intra_hist_curve = self.intra_hist_plot.plot(pen=self.nominal_intra_pen)
149 limit_line = pg.InfiniteLine(angle=0, pen=self.nominal_intra_dashed_pen)
150 self.intra_hist_plot.addItem(limit_line)
151 self.intra_limit_lines.append(limit_line)
152
153 for line in self.intra_limit_lines:
154 line.setPos(self.nominal_config.intra_detection_threshold)
155
156 # Inter presence history plot
157
158 self.inter_hist_plot = win.addPlot(
159 row=0,
160 col=1,
161 title="Inter presence history (slow motions)",
162 )
163 self.inter_hist_plot.setMenuEnabled(False)
164 self.inter_hist_plot.setMouseEnabled(x=False, y=False)
165 self.inter_hist_plot.hideButtons()
166 self.inter_hist_plot.showGrid(x=True, y=True)
167 self.inter_hist_plot.setLabel("bottom", "Time (s)")
168 self.inter_hist_plot.setLabel("left", "Score")
169 self.inter_hist_plot.setXRange(-self.history_length_s, 0)
170 self.inter_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
171 self.inter_hist_plot.setYRange(0, 10)
172 if not self.nominal_config.inter_enable:
173 inter_color = et.utils.color_cycler(0)
174 inter_color = f"{inter_color}50"
175 self.nominal_inter_dashed_pen = pg.mkPen(
176 inter_color, width=2.5, style=QtCore.Qt.DashLine
177 )
178 self.nominal_inter_pen = pg.mkPen(inter_color, width=2)
179 else:
180 self.nominal_inter_pen = et.utils.pg_pen_cycler(0)
181 self.nominal_inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
182
183 self.inter_hist_curve = self.inter_hist_plot.plot(pen=self.nominal_inter_pen)
184 limit_line = pg.InfiniteLine(angle=0, pen=self.nominal_inter_dashed_pen)
185 self.inter_hist_plot.addItem(limit_line)
186 self.inter_limit_lines.append(limit_line)
187
188 for line in self.inter_limit_lines:
189 line.setPos(self.nominal_config.inter_detection_threshold)
190
191 # Sector plot
192
193 if self.ref_app_config.wake_up_mode:
194 title = (
195 "Nominal config<br>"
196 "Detection type: fast (orange), slow (blue), both (green)<br>"
197 "Green background indicates active"
198 )
199 else:
200 title = "Nominal config<br>" "Detection type: fast (orange), slow (blue), both (green)"
201
202 self.nominal_sector_plot, self.nominal_sectors = self.create_sector_plot(
203 title,
204 self.ref_app_config.nominal_config.num_zones,
205 self.nominal_config.start_m,
206 self.nominal_zone_limits,
207 )
208
209 if not self.ref_app_config.wake_up_mode:
210 sublayout = win.addLayout(row=1, col=0, colspan=2)
211 sublayout.layout.setColumnStretchFactor(0, 2)
212 sublayout.addItem(self.nominal_sector_plot, row=0, col=0)
213 else:
214 assert self.wake_up_config is not None
215 sublayout = win.addLayout(row=1, col=0, colspan=2)
216 sublayout.addItem(self.nominal_sector_plot, row=0, col=1)
217
218 title = (
219 "Wake up config<br>"
220 "Detection type: fast (orange), slow (blue), both (green),<br>"
221 "lingering (light grey)<br>"
222 "Green background indicates active"
223 )
224 self.wake_up_sector_plot, self.wake_up_sectors = self.create_sector_plot(
225 title,
226 self.wake_up_config.num_zones,
227 self.wake_up_config.start_m,
228 self.wake_up_zone_limits,
229 )
230
231 sublayout.addItem(self.wake_up_sector_plot, row=0, col=0)
232
233 if self.wake_up_config.intra_enable:
234 self.wake_up_intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
235 self.wake_up_intra_pen = et.utils.pg_pen_cycler(1)
236 else:
237 intra_color = et.utils.color_cycler(1)
238 intra_color = f"{intra_color}50"
239 self.wake_up_intra_dashed_pen = pg.mkPen(
240 intra_color, width=2.5, style=QtCore.Qt.DashLine
241 )
242 self.wake_up_intra_pen = pg.mkPen(intra_color, width=2)
243
244 if self.wake_up_config.inter_enable:
245 self.wake_up_inter_pen = et.utils.pg_pen_cycler(0)
246 self.wake_up_inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
247 else:
248 inter_color = et.utils.color_cycler(0)
249 inter_color = f"{inter_color}50"
250 self.wake_up_inter_dashed_pen = pg.mkPen(
251 inter_color, width=2.5, style=QtCore.Qt.DashLine
252 )
253 self.wake_up_inter_pen = pg.mkPen(inter_color, width=2)
254
255 @staticmethod
256 def create_sector_plot(
257 title: str, num_sectors: int, start_m: float, zone_limits: npt.NDArray[np.float64]
258 ) -> Tuple[pg.PlotItem, List[pg.QtWidgets.QGraphicsEllipseItem]]:
259 sector_plot = pg.PlotItem(title=title)
260
261 sector_plot.setAspectLocked()
262 sector_plot.hideAxis("left")
263 sector_plot.hideAxis("bottom")
264
265 sectors = []
266 limit_text = []
267
268 range_html = (
269 '<div style="text-align: center">'
270 '<span style="color: #000000;font-size:12pt;">'
271 "{}</span></div>"
272 )
273
274 if start_m == zone_limits[0]:
275 x_offset = 0.7
276 else:
277 x_offset = 0
278
279 pen = pg.mkPen("k", width=1)
280 span_deg = 25
281 for r in np.flip(np.arange(1, num_sectors + 2)):
282 sector = pg.QtWidgets.QGraphicsEllipseItem(-r, -r, r * 2, r * 2)
283 sector.setStartAngle(-16 * span_deg)
284 sector.setSpanAngle(16 * span_deg * 2)
285 sector.setPen(pen)
286 sector_plot.addItem(sector)
287 sectors.append(sector)
288
289 if r != 1:
290 limit = pg.TextItem(html=range_html, anchor=(0.5, 0.5), angle=25)
291 x = r * np.cos(np.radians(span_deg))
292 y = r * np.sin(np.radians(span_deg))
293 limit.setPos(x - x_offset, y + 0.25)
294 sector_plot.addItem(limit)
295 limit_text.append(limit)
296
297 sectors.reverse()
298
299 if not start_m == zone_limits[0]:
300 start_limit_text = pg.TextItem(html=range_html, anchor=(0.5, 0.5), angle=25)
301 start_range_html = range_html.format(f"{start_m}")
302 start_limit_text.setHtml(start_range_html)
303 x = 1 * np.cos(np.radians(span_deg))
304 y = 1 * np.sin(np.radians(span_deg))
305
306 start_limit_text.setPos(x, y + 0.25)
307 sector_plot.addItem(start_limit_text)
308
309 unit_text = pg.TextItem(html=range_html, anchor=(0.5, 0.5))
310 unit_html = range_html.format("[m]")
311 unit_text.setHtml(unit_html)
312 x = (num_sectors + 2) * np.cos(np.radians(span_deg))
313 y = (num_sectors + 2) * np.sin(np.radians(span_deg))
314 unit_text.setPos(x - x_offset, y + 0.25)
315 sector_plot.addItem(unit_text)
316
317 for text_item, limit in zip(limit_text, np.flip(zone_limits)):
318 zone_range_html = range_html.format(np.around(limit, 1))
319 text_item.setHtml(zone_range_html)
320
321 return sector_plot, sectors
322
323 def update(self, data: RefAppResult) -> None:
324 if data.used_config == _Mode.NOMINAL_CONFIG:
325 inter_threshold = self.nominal_config.inter_detection_threshold
326 intra_threshold = self.nominal_config.intra_detection_threshold
327 intra_pen = self.nominal_intra_pen
328 intra_dashed_pen = self.nominal_intra_dashed_pen
329 inter_pen = self.nominal_inter_pen
330 inter_dashed_pen = self.nominal_inter_dashed_pen
331 else:
332 assert self.wake_up_config is not None
333 inter_threshold = self.wake_up_config.inter_detection_threshold
334 intra_threshold = self.wake_up_config.intra_detection_threshold
335 intra_pen = self.wake_up_intra_pen
336 intra_dashed_pen = self.wake_up_intra_dashed_pen
337 inter_pen = self.wake_up_inter_pen
338 inter_dashed_pen = self.wake_up_inter_dashed_pen
339
340 self.time_fifo.append(data.service_result.tick_time)
341
342 if data.switch_delay:
343 self.intra_fifo.append(float("nan"))
344 self.inter_fifo.append(float("nan"))
345 else:
346 self.intra_fifo.append(data.intra_presence_score)
347 self.inter_fifo.append(data.inter_presence_score)
348
349 while self.time_fifo[-1] - self.time_fifo[0] > self.history_length_s:
350 self.time_fifo.pop(0)
351 self.intra_fifo.pop(0)
352 self.inter_fifo.pop(0)
353
354 times = [t - self.time_fifo[-1] for t in self.time_fifo]
355
356 # Intra presence
357
358 if np.isnan(self.intra_fifo).all():
359 m_hist = intra_threshold
360 else:
361 m_hist = np.maximum(float(np.nanmax(self.intra_fifo)), intra_threshold * 1.05)
362
363 m_hist = self.intra_history_smooth_max.update(m_hist)
364
365 self.intra_hist_plot.setYRange(0, m_hist)
366 self.intra_hist_curve.setData(times, self.intra_fifo, connect="finite")
367 self.intra_hist_curve.setPen(intra_pen)
368
369 for line in self.intra_limit_lines:
370 line.setPos(intra_threshold)
371 line.setPen(intra_dashed_pen)
372
373 # Inter presence
374
375 if np.isnan(self.inter_fifo).all():
376 m_hist = inter_threshold
377 else:
378 m_hist = np.maximum(float(np.nanmax(self.inter_fifo)), inter_threshold * 1.05)
379
380 m_hist = self.inter_history_smooth_max.update(m_hist)
381
382 self.inter_hist_plot.setYRange(0, m_hist)
383 self.inter_hist_curve.setData(times, self.inter_fifo, connect="finite")
384 self.inter_hist_curve.setPen(inter_pen)
385
386 for line in self.inter_limit_lines:
387 line.setPos(inter_threshold)
388 line.setPen(inter_dashed_pen)
389
390 # Sector
391
392 brush = et.utils.pg_brush_cycler(7)
393 for sector in self.nominal_sectors:
394 sector.setBrush(brush)
395
396 if not self.ref_app_config.wake_up_mode:
397 sectors = self.nominal_sectors[1:]
398 show_all_zones = self.show_all_detected_zones
399 color_nominal = "white"
400 else:
401 if data.used_config == _Mode.WAKE_UP_CONFIG:
402 sectors = self.wake_up_sectors[1:]
403 show_all_zones = True
404 color_wake_up = "#DFF1D6"
405 color_nominal = "white"
406 else:
407 sectors = self.nominal_sectors[1:]
408 show_all_zones = self.show_all_detected_zones
409 color_wake_up = "white"
410 color_nominal = "#DFF1D6"
411
412 vb = self.nominal_sector_plot.getViewBox()
413 vb.setBackgroundColor(color_nominal)
414 vb = self.wake_up_sector_plot.getViewBox()
415 vb.setBackgroundColor(color_wake_up)
416
417 for sector in self.wake_up_sectors:
418 sector.setBrush(brush)
419
420 if data.presence_detected:
421 self.color_zones(data, show_all_zones, sectors)
422 self.switch_data = data
423 elif data.switch_delay:
424 self.color_zones(self.switch_data, True, self.wake_up_sectors[1:])
425
426 self.nominal_sectors[0].setPen(pg.mkPen(color_nominal, width=1))
427 self.nominal_sectors[0].setBrush(pg.mkBrush(color_nominal))
428
429 if self.ref_app_config.wake_up_mode:
430 self.wake_up_sectors[0].setPen(pg.mkPen(color_wake_up, width=1))
431 self.wake_up_sectors[0].setBrush(pg.mkBrush(color_wake_up))
432
433 @staticmethod
434 def color_zones(
435 data: RefAppResult,
436 show_all_detected_zones: bool,
437 sectors: List[pg.QtWidgets.QGraphicsEllipseItem],
438 ) -> None:
439 if show_all_detected_zones:
440 for zone, (inter_value, intra_value) in enumerate(
441 zip(data.inter_zone_detections, data.intra_zone_detections)
442 ):
443 if inter_value + intra_value == 2:
444 sectors[zone].setBrush(et.utils.pg_brush_cycler(2))
445 elif inter_value == 1:
446 sectors[zone].setBrush(et.utils.pg_brush_cycler(0))
447 elif intra_value == 1:
448 sectors[zone].setBrush(et.utils.pg_brush_cycler(1))
449 elif data.used_config == _Mode.WAKE_UP_CONFIG:
450 assert data.wake_up_detections is not None
451 if data.wake_up_detections[zone] > 0:
452 sectors[zone].setBrush(pg.mkBrush("#b5afa0"))
453 else:
454 assert data.max_presence_zone is not None
455 if data.max_presence_zone == data.max_intra_zone:
456 sectors[data.max_presence_zone].setBrush(et.utils.pg_brush_cycler(1))
457 else:
458 sectors[data.max_presence_zone].setBrush(et.utils.pg_brush_cycler(0))
459
460
461if __name__ == "__main__":
462 main()
View this example on GitHub: acconeer/acconeer-python-exploration