examples/a121/algo/smart_presence/ref_app.py

examples/a121/algo/smart_presence/ref_app.py#

  1# Copyright (c) Acconeer AB, 2023-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6from typing import List, Tuple
  7
  8import numpy as np
  9import numpy.typing as npt
 10
 11from PySide6 import QtCore
 12
 13import pyqtgraph as pg
 14
 15import acconeer.exptool as et
 16from acconeer.exptool import a121
 17from acconeer.exptool.a121.algo.presence import Detector
 18from acconeer.exptool.a121.algo.smart_presence._ref_app import (
 19    PresenceWakeUpConfig,
 20    PresenceZoneConfig,
 21    RefApp,
 22    RefAppConfig,
 23    RefAppResult,
 24    _Mode,
 25)
 26
 27
 28def main():
 29    args = a121.ExampleArgumentParser().parse_args()
 30    et.utils.config_logging(args)
 31
 32    client = a121.Client.open(**a121.get_client_args(args))
 33
 34    ref_app_config = RefAppConfig(
 35        wake_up_mode=True,
 36        wake_up_config=PresenceWakeUpConfig(
 37            start_m=1.0,
 38            end_m=3.0,
 39            num_zones=5,
 40            num_zones_for_wake_up=2,
 41        ),
 42        nominal_config=PresenceZoneConfig(
 43            start_m=1.0,
 44            end_m=3.0,
 45            num_zones=3,
 46        ),
 47    )
 48
 49    ref_app = RefApp(client=client, sensor_id=1, ref_app_config=ref_app_config)
 50    ref_app.start()
 51
 52    nominal_sensor_config = Detector._get_sensor_config(ref_app.nominal_detector_config)
 53    distances = np.linspace(
 54        ref_app_config.nominal_config.start_m,
 55        ref_app_config.nominal_config.end_m,
 56        nominal_sensor_config.num_points,
 57    )
 58    nominal_zone_limits = ref_app.ref_app_processor.create_zones(
 59        distances, ref_app_config.nominal_config.num_zones
 60    )
 61
 62    pg_updater = PGUpdater(
 63        ref_app_config,
 64        ref_app.ref_app_context.wake_up_detector_context.estimated_frame_rate,
 65        nominal_zone_limits,
 66        ref_app.ref_app_processor.zone_limits,
 67    )
 68    pg_process = et.PGProcess(pg_updater)
 69    pg_process.start()
 70
 71    interrupt_handler = et.utils.ExampleInterruptHandler()
 72    print("Press Ctrl-C to end session")
 73
 74    while not interrupt_handler.got_signal:
 75        ref_app_result = ref_app.get_next()
 76        if ref_app_result.presence_detected:
 77            print(f"Presence in zone {ref_app_result.max_presence_zone}")
 78        else:
 79            print("No presence")
 80        try:
 81            pg_process.put_data(ref_app_result)
 82        except et.PGProccessDiedException:
 83            break
 84
 85    ref_app.stop()
 86
 87    print("Disconnecting...")
 88    client.close()
 89
 90
 91class PGUpdater:
 92    def __init__(
 93        self,
 94        ref_app_config: RefAppConfig,
 95        estimated_frame_rate: float,
 96        nominal_zone_limits: npt.NDArray[np.float64],
 97        wake_up_zone_limits: npt.NDArray[np.float64],
 98    ) -> None:
 99        self.ref_app_config = ref_app_config
100        self.nominal_config = ref_app_config.nominal_config
101        self.wake_up_config = ref_app_config.wake_up_config
102
103        self.show_all_detected_zones = ref_app_config.show_all_detected_zones
104        self.nominal_zone_limits = nominal_zone_limits
105        self.wake_up_zone_limits = wake_up_zone_limits
106        self.estimated_frame_rate = estimated_frame_rate
107
108        self.history_length_s = 5
109        self.time_fifo: List[float] = []
110        self.intra_fifo: List[float] = []
111        self.inter_fifo: List[float] = []
112
113        self.intra_limit_lines = []
114        self.inter_limit_lines = []
115
116        self.setup_is_done = False
117
118    def setup(self, win):
119        win.setWindowTitle("Acconeer smart presence example")
120
121        # Intra presence history plot
122
123        self.intra_hist_plot = win.addPlot(
124            row=0,
125            col=0,
126            title="Intra presence history (fast motions)",
127        )
128        self.intra_hist_plot.setMenuEnabled(False)
129        self.intra_hist_plot.setMouseEnabled(x=False, y=False)
130        self.intra_hist_plot.hideButtons()
131        self.intra_hist_plot.showGrid(x=True, y=True)
132        self.intra_hist_plot.setLabel("bottom", "Time (s)")
133        self.intra_hist_plot.setLabel("left", "Score")
134        self.intra_hist_plot.setXRange(-self.history_length_s, 0)
135        self.intra_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
136        self.intra_hist_plot.setYRange(0, 10)
137        if not self.nominal_config.intra_enable:
138            intra_color = et.utils.color_cycler(1)
139            intra_color = f"{intra_color}50"
140            self.nominal_intra_dashed_pen = pg.mkPen(
141                intra_color, width=2.5, style=QtCore.Qt.DashLine
142            )
143            self.nominal_intra_pen = pg.mkPen(intra_color, width=2)
144        else:
145            self.nominal_intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
146            self.nominal_intra_pen = et.utils.pg_pen_cycler(1)
147
148        self.intra_hist_curve = self.intra_hist_plot.plot(pen=self.nominal_intra_pen)
149        limit_line = pg.InfiniteLine(angle=0, pen=self.nominal_intra_dashed_pen)
150        self.intra_hist_plot.addItem(limit_line)
151        self.intra_limit_lines.append(limit_line)
152
153        for line in self.intra_limit_lines:
154            line.setPos(self.nominal_config.intra_detection_threshold)
155
156        # Inter presence history plot
157
158        self.inter_hist_plot = win.addPlot(
159            row=0,
160            col=1,
161            title="Inter presence history (slow motions)",
162        )
163        self.inter_hist_plot.setMenuEnabled(False)
164        self.inter_hist_plot.setMouseEnabled(x=False, y=False)
165        self.inter_hist_plot.hideButtons()
166        self.inter_hist_plot.showGrid(x=True, y=True)
167        self.inter_hist_plot.setLabel("bottom", "Time (s)")
168        self.inter_hist_plot.setLabel("left", "Score")
169        self.inter_hist_plot.setXRange(-self.history_length_s, 0)
170        self.inter_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
171        self.inter_hist_plot.setYRange(0, 10)
172        if not self.nominal_config.inter_enable:
173            inter_color = et.utils.color_cycler(0)
174            inter_color = f"{inter_color}50"
175            self.nominal_inter_dashed_pen = pg.mkPen(
176                inter_color, width=2.5, style=QtCore.Qt.DashLine
177            )
178            self.nominal_inter_pen = pg.mkPen(inter_color, width=2)
179        else:
180            self.nominal_inter_pen = et.utils.pg_pen_cycler(0)
181            self.nominal_inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
182
183        self.inter_hist_curve = self.inter_hist_plot.plot(pen=self.nominal_inter_pen)
184        limit_line = pg.InfiniteLine(angle=0, pen=self.nominal_inter_dashed_pen)
185        self.inter_hist_plot.addItem(limit_line)
186        self.inter_limit_lines.append(limit_line)
187
188        for line in self.inter_limit_lines:
189            line.setPos(self.nominal_config.inter_detection_threshold)
190
191        # Sector plot
192
193        if self.ref_app_config.wake_up_mode:
194            title = (
195                "Nominal config<br>"
196                "Detection type: fast (orange), slow (blue), both (green)<br>"
197                "Green background indicates active"
198            )
199        else:
200            title = "Nominal config<br>" "Detection type: fast (orange), slow (blue), both (green)"
201
202        self.nominal_sector_plot, self.nominal_sectors = self.create_sector_plot(
203            title,
204            self.ref_app_config.nominal_config.num_zones,
205            self.nominal_config.start_m,
206            self.nominal_zone_limits,
207        )
208
209        if not self.ref_app_config.wake_up_mode:
210            sublayout = win.addLayout(row=1, col=0, colspan=2)
211            sublayout.layout.setColumnStretchFactor(0, 2)
212            sublayout.addItem(self.nominal_sector_plot, row=0, col=0)
213        else:
214            assert self.wake_up_config is not None
215            sublayout = win.addLayout(row=1, col=0, colspan=2)
216            sublayout.addItem(self.nominal_sector_plot, row=0, col=1)
217
218            title = (
219                "Wake up config<br>"
220                "Detection type: fast (orange), slow (blue), both (green),<br>"
221                "lingering (light grey)<br>"
222                "Green background indicates active"
223            )
224            self.wake_up_sector_plot, self.wake_up_sectors = self.create_sector_plot(
225                title,
226                self.wake_up_config.num_zones,
227                self.wake_up_config.start_m,
228                self.wake_up_zone_limits,
229            )
230
231            sublayout.addItem(self.wake_up_sector_plot, row=0, col=0)
232
233            if self.wake_up_config.intra_enable:
234                self.wake_up_intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
235                self.wake_up_intra_pen = et.utils.pg_pen_cycler(1)
236            else:
237                intra_color = et.utils.color_cycler(1)
238                intra_color = f"{intra_color}50"
239                self.wake_up_intra_dashed_pen = pg.mkPen(
240                    intra_color, width=2.5, style=QtCore.Qt.DashLine
241                )
242                self.wake_up_intra_pen = pg.mkPen(intra_color, width=2)
243
244            if self.wake_up_config.inter_enable:
245                self.wake_up_inter_pen = et.utils.pg_pen_cycler(0)
246                self.wake_up_inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
247            else:
248                inter_color = et.utils.color_cycler(0)
249                inter_color = f"{inter_color}50"
250                self.wake_up_inter_dashed_pen = pg.mkPen(
251                    inter_color, width=2.5, style=QtCore.Qt.DashLine
252                )
253                self.wake_up_inter_pen = pg.mkPen(inter_color, width=2)
254
255    @staticmethod
256    def create_sector_plot(
257        title: str, num_sectors: int, start_m: float, zone_limits: npt.NDArray[np.float64]
258    ) -> Tuple[pg.PlotItem, List[pg.QtWidgets.QGraphicsEllipseItem]]:
259        sector_plot = pg.PlotItem(title=title)
260
261        sector_plot.setAspectLocked()
262        sector_plot.hideAxis("left")
263        sector_plot.hideAxis("bottom")
264
265        sectors = []
266        limit_text = []
267
268        range_html = (
269            '<div style="text-align: center">'
270            '<span style="color: #000000;font-size:12pt;">'
271            "{}</span></div>"
272        )
273
274        if start_m == zone_limits[0]:
275            x_offset = 0.7
276        else:
277            x_offset = 0
278
279        pen = pg.mkPen("k", width=1)
280        span_deg = 25
281        for r in np.flip(np.arange(1, num_sectors + 2)):
282            sector = pg.QtWidgets.QGraphicsEllipseItem(-r, -r, r * 2, r * 2)
283            sector.setStartAngle(-16 * span_deg)
284            sector.setSpanAngle(16 * span_deg * 2)
285            sector.setPen(pen)
286            sector_plot.addItem(sector)
287            sectors.append(sector)
288
289            if r != 1:
290                limit = pg.TextItem(html=range_html, anchor=(0.5, 0.5), angle=25)
291                x = r * np.cos(np.radians(span_deg))
292                y = r * np.sin(np.radians(span_deg))
293                limit.setPos(x - x_offset, y + 0.25)
294                sector_plot.addItem(limit)
295                limit_text.append(limit)
296
297        sectors.reverse()
298
299        if not start_m == zone_limits[0]:
300            start_limit_text = pg.TextItem(html=range_html, anchor=(0.5, 0.5), angle=25)
301            start_range_html = range_html.format(f"{start_m}")
302            start_limit_text.setHtml(start_range_html)
303            x = 1 * np.cos(np.radians(span_deg))
304            y = 1 * np.sin(np.radians(span_deg))
305
306            start_limit_text.setPos(x, y + 0.25)
307            sector_plot.addItem(start_limit_text)
308
309        unit_text = pg.TextItem(html=range_html, anchor=(0.5, 0.5))
310        unit_html = range_html.format("[m]")
311        unit_text.setHtml(unit_html)
312        x = (num_sectors + 2) * np.cos(np.radians(span_deg))
313        y = (num_sectors + 2) * np.sin(np.radians(span_deg))
314        unit_text.setPos(x - x_offset, y + 0.25)
315        sector_plot.addItem(unit_text)
316
317        for text_item, limit in zip(limit_text, np.flip(zone_limits)):
318            zone_range_html = range_html.format(np.around(limit, 1))
319            text_item.setHtml(zone_range_html)
320
321        return sector_plot, sectors
322
323    def update(self, data: RefAppResult) -> None:
324        if data.used_config == _Mode.NOMINAL_CONFIG:
325            inter_threshold = self.nominal_config.inter_detection_threshold
326            intra_threshold = self.nominal_config.intra_detection_threshold
327            intra_pen = self.nominal_intra_pen
328            intra_dashed_pen = self.nominal_intra_dashed_pen
329            inter_pen = self.nominal_inter_pen
330            inter_dashed_pen = self.nominal_inter_dashed_pen
331        else:
332            assert self.wake_up_config is not None
333            inter_threshold = self.wake_up_config.inter_detection_threshold
334            intra_threshold = self.wake_up_config.intra_detection_threshold
335            intra_pen = self.wake_up_intra_pen
336            intra_dashed_pen = self.wake_up_intra_dashed_pen
337            inter_pen = self.wake_up_inter_pen
338            inter_dashed_pen = self.wake_up_inter_dashed_pen
339
340        self.time_fifo.append(data.service_result.tick_time)
341
342        if data.switch_delay:
343            self.intra_fifo.append(float("nan"))
344            self.inter_fifo.append(float("nan"))
345        else:
346            self.intra_fifo.append(data.intra_presence_score)
347            self.inter_fifo.append(data.inter_presence_score)
348
349        while self.time_fifo[-1] - self.time_fifo[0] > self.history_length_s:
350            self.time_fifo.pop(0)
351            self.intra_fifo.pop(0)
352            self.inter_fifo.pop(0)
353
354        times = [t - self.time_fifo[-1] for t in self.time_fifo]
355
356        # Intra presence
357
358        if np.isnan(self.intra_fifo).all():
359            m_hist = intra_threshold
360        else:
361            m_hist = np.maximum(float(np.nanmax(self.intra_fifo)), intra_threshold * 1.05)
362
363        m_hist = self.intra_history_smooth_max.update(m_hist)
364
365        self.intra_hist_plot.setYRange(0, m_hist)
366        self.intra_hist_curve.setData(times, self.intra_fifo, connect="finite")
367        self.intra_hist_curve.setPen(intra_pen)
368
369        for line in self.intra_limit_lines:
370            line.setPos(intra_threshold)
371            line.setPen(intra_dashed_pen)
372
373        # Inter presence
374
375        if np.isnan(self.inter_fifo).all():
376            m_hist = inter_threshold
377        else:
378            m_hist = np.maximum(float(np.nanmax(self.inter_fifo)), inter_threshold * 1.05)
379
380        m_hist = self.inter_history_smooth_max.update(m_hist)
381
382        self.inter_hist_plot.setYRange(0, m_hist)
383        self.inter_hist_curve.setData(times, self.inter_fifo, connect="finite")
384        self.inter_hist_curve.setPen(inter_pen)
385
386        for line in self.inter_limit_lines:
387            line.setPos(inter_threshold)
388            line.setPen(inter_dashed_pen)
389
390        # Sector
391
392        brush = et.utils.pg_brush_cycler(7)
393        for sector in self.nominal_sectors:
394            sector.setBrush(brush)
395
396        if not self.ref_app_config.wake_up_mode:
397            sectors = self.nominal_sectors[1:]
398            show_all_zones = self.show_all_detected_zones
399            color_nominal = "white"
400        else:
401            if data.used_config == _Mode.WAKE_UP_CONFIG:
402                sectors = self.wake_up_sectors[1:]
403                show_all_zones = True
404                color_wake_up = "#DFF1D6"
405                color_nominal = "white"
406            else:
407                sectors = self.nominal_sectors[1:]
408                show_all_zones = self.show_all_detected_zones
409                color_wake_up = "white"
410                color_nominal = "#DFF1D6"
411
412            vb = self.nominal_sector_plot.getViewBox()
413            vb.setBackgroundColor(color_nominal)
414            vb = self.wake_up_sector_plot.getViewBox()
415            vb.setBackgroundColor(color_wake_up)
416
417            for sector in self.wake_up_sectors:
418                sector.setBrush(brush)
419
420        if data.presence_detected:
421            self.color_zones(data, show_all_zones, sectors)
422            self.switch_data = data
423        elif data.switch_delay:
424            self.color_zones(self.switch_data, True, self.wake_up_sectors[1:])
425
426        self.nominal_sectors[0].setPen(pg.mkPen(color_nominal, width=1))
427        self.nominal_sectors[0].setBrush(pg.mkBrush(color_nominal))
428
429        if self.ref_app_config.wake_up_mode:
430            self.wake_up_sectors[0].setPen(pg.mkPen(color_wake_up, width=1))
431            self.wake_up_sectors[0].setBrush(pg.mkBrush(color_wake_up))
432
433    @staticmethod
434    def color_zones(
435        data: RefAppResult,
436        show_all_detected_zones: bool,
437        sectors: List[pg.QtWidgets.QGraphicsEllipseItem],
438    ) -> None:
439        if show_all_detected_zones:
440            for zone, (inter_value, intra_value) in enumerate(
441                zip(data.inter_zone_detections, data.intra_zone_detections)
442            ):
443                if inter_value + intra_value == 2:
444                    sectors[zone].setBrush(et.utils.pg_brush_cycler(2))
445                elif inter_value == 1:
446                    sectors[zone].setBrush(et.utils.pg_brush_cycler(0))
447                elif intra_value == 1:
448                    sectors[zone].setBrush(et.utils.pg_brush_cycler(1))
449                elif data.used_config == _Mode.WAKE_UP_CONFIG:
450                    assert data.wake_up_detections is not None
451                    if data.wake_up_detections[zone] > 0:
452                        sectors[zone].setBrush(pg.mkBrush("#b5afa0"))
453        else:
454            assert data.max_presence_zone is not None
455            if data.max_presence_zone == data.max_intra_zone:
456                sectors[data.max_presence_zone].setBrush(et.utils.pg_brush_cycler(1))
457            else:
458                sectors[data.max_presence_zone].setBrush(et.utils.pg_brush_cycler(0))
459
460
461if __name__ == "__main__":
462    main()

View this example on GitHub: acconeer/acconeer-python-exploration