examples/a121/algo/smart_presence/processor.py

examples/a121/algo/smart_presence/processor.py#

  1# Copyright (c) Acconeer AB, 2023-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7import numpy.typing as npt
  8
  9from PySide6 import QtCore
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo import presence, smart_presence
 16
 17
 18def main():
 19    args = a121.ExampleArgumentParser().parse_args()
 20    et.utils.config_logging(args)
 21
 22    client = a121.Client.open(**a121.get_client_args(args))
 23
 24    processor_config = smart_presence.ProcessorConfig(
 25        num_zones=3,
 26    )
 27
 28    detector_config = presence.DetectorConfig(
 29        start_m=1,
 30        end_m=3,
 31    )
 32
 33    presence_detector = presence.Detector(
 34        client=client, sensor_id=1, detector_config=detector_config
 35    )
 36    presence_detector.start()
 37
 38    smart_presence_processor = smart_presence.Processor(
 39        processor_config,
 40        detector_config,
 41        presence_detector.session_config,
 42        presence_detector.detector_metadata,
 43    )
 44
 45    pg_updater = PGUpdater(
 46        detector_config,
 47        processor_config,
 48        presence_detector._get_sensor_config(detector_config),
 49        presence_detector.estimated_frame_rate,
 50        smart_presence_processor.zone_limits,
 51    )
 52    pg_process = et.PGProcess(pg_updater)
 53    pg_process.start()
 54
 55    interrupt_handler = et.utils.ExampleInterruptHandler()
 56    print("Press Ctrl-C to end session")
 57
 58    while not interrupt_handler.got_signal:
 59        detector_result = presence_detector.get_next()
 60        processor_result = smart_presence_processor.process(detector_result)
 61        if detector_result.presence_detected:
 62            print(f"Presence in zone {processor_result.max_presence_zone}")
 63        else:
 64            print("No presence")
 65
 66        data = {
 67            "processor_result": processor_result,
 68            "detector_result": detector_result,
 69        }
 70        try:
 71            pg_process.put_data(data)
 72        except et.PGProccessDiedException:
 73            break
 74
 75    presence_detector.stop()
 76
 77    print("Disconnecting...")
 78    client.close()
 79
 80
 81class PGUpdater:
 82    def __init__(
 83        self,
 84        detector_config: presence.DetectorConfig,
 85        processor_config: smart_presence.ProcessorConfig,
 86        sensor_config: a121.SensorConfig,
 87        estimated_frame_rate: float,
 88        zone_limits: npt.NDArray[np.float64],
 89    ):
 90        self.detector_config = detector_config
 91
 92        self.history_length_s = 5
 93        self.estimated_frame_rate = estimated_frame_rate
 94        self.history_length_n = int(round(self.history_length_s * estimated_frame_rate))
 95        self.intra_history = np.zeros(self.history_length_n)
 96        self.inter_history = np.zeros(self.history_length_n)
 97
 98        self.num_sectors = min(processor_config.num_zones, sensor_config.num_points)
 99        self.sector_size = max(1, -(-sensor_config.num_points // self.num_sectors))
100
101        self.sector_offset = (self.num_sectors * self.sector_size - sensor_config.num_points) // 2
102        self.zone_limits = zone_limits
103
104        self.setup_is_done = False
105
106    def setup(self, win):
107        win.setWindowTitle("Acconeer smart presence example")
108
109        self.intra_limit_lines = []
110        self.inter_limit_lines = []
111
112        # Intra presence history plot
113
114        self.intra_hist_plot = win.addPlot(
115            row=0,
116            col=0,
117            title="Intra presence history (fast motions)",
118        )
119        self.intra_hist_plot.setMenuEnabled(False)
120        self.intra_hist_plot.setMouseEnabled(x=False, y=False)
121        self.intra_hist_plot.hideButtons()
122        self.intra_hist_plot.showGrid(x=True, y=True)
123        self.intra_hist_plot.setLabel("bottom", "Time (s)")
124        self.intra_hist_plot.setLabel("left", "Score")
125        self.intra_hist_plot.setXRange(-self.history_length_s, 0)
126        self.intra_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
127        self.intra_hist_plot.setYRange(0, 10)
128        if not self.detector_config.intra_enable:
129            intra_color = et.utils.color_cycler(1)
130            intra_color = f"{intra_color}50"
131            intra_dashed_pen = pg.mkPen(intra_color, width=2.5, style=QtCore.Qt.DashLine)
132            intra_pen = pg.mkPen(intra_color, width=2)
133        else:
134            intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
135            intra_pen = et.utils.pg_pen_cycler(1)
136
137        self.intra_hist_curve = self.intra_hist_plot.plot(pen=intra_pen)
138        limit_line = pg.InfiniteLine(angle=0, pen=intra_dashed_pen)
139        self.intra_hist_plot.addItem(limit_line)
140        self.intra_limit_lines.append(limit_line)
141
142        for line in self.intra_limit_lines:
143            line.setPos(self.detector_config.intra_detection_threshold)
144
145        # Inter presence history plot
146
147        self.inter_hist_plot = win.addPlot(
148            row=0,
149            col=1,
150            title="Inter presence history (slow motions)",
151        )
152        self.inter_hist_plot.setMenuEnabled(False)
153        self.inter_hist_plot.setMouseEnabled(x=False, y=False)
154        self.inter_hist_plot.hideButtons()
155        self.inter_hist_plot.showGrid(x=True, y=True)
156        self.inter_hist_plot.setLabel("bottom", "Time (s)")
157        self.inter_hist_plot.setLabel("left", "Score")
158        self.inter_hist_plot.setXRange(-self.history_length_s, 0)
159        self.inter_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
160        self.inter_hist_plot.setYRange(0, 10)
161        if not self.detector_config.inter_enable:
162            inter_color = et.utils.color_cycler(0)
163            inter_color = f"{inter_color}50"
164            inter_dashed_pen = pg.mkPen(inter_color, width=2.5, style=QtCore.Qt.DashLine)
165            inter_pen = pg.mkPen(inter_color, width=2)
166        else:
167            inter_pen = et.utils.pg_pen_cycler(0)
168            inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
169
170        self.inter_hist_curve = self.inter_hist_plot.plot(pen=inter_pen)
171        limit_line = pg.InfiniteLine(angle=0, pen=inter_dashed_pen)
172        self.inter_hist_plot.addItem(limit_line)
173        self.inter_limit_lines.append(limit_line)
174
175        for line in self.inter_limit_lines:
176            line.setPos(self.detector_config.inter_detection_threshold)
177
178        # Sector plot
179
180        self.sector_plot = pg.PlotItem(
181            title="Detection zone<br>Detection type: fast (orange), slow (blue), both (green)"
182        )
183        self.sector_plot.setAspectLocked()
184        self.sector_plot.hideAxis("left")
185        self.sector_plot.hideAxis("bottom")
186        self.sectors = []
187        self.limit_text = []
188
189        self.range_html = (
190            '<div style="text-align: center">'
191            '<span style="color: #000000;font-size:12pt;">'
192            "{}</span></div>"
193        )
194
195        pen = pg.mkPen("k", width=1)
196        span_deg = 25
197        for r in np.flip(np.arange(self.num_sectors) + 1):
198            sector = pg.QtWidgets.QGraphicsEllipseItem(-r, -r, r * 2, r * 2)
199            sector.setStartAngle(-16 * span_deg)
200            sector.setSpanAngle(16 * span_deg * 2)
201            sector.setPen(pen)
202            self.sector_plot.addItem(sector)
203            self.sectors.append(sector)
204
205            limit = pg.TextItem(html=self.range_html, anchor=(0.5, 0.5), angle=25)
206            x = r * np.cos(np.radians(span_deg))
207            y = r * np.sin(np.radians(span_deg))
208            limit.setPos(x, y + 0.25)
209            self.sector_plot.addItem(limit)
210            self.limit_text.append(limit)
211
212        self.sectors.reverse()
213
214        start_limit_text = pg.TextItem(html=self.range_html, anchor=(0.5, 0.5), angle=25)
215        range_html = self.range_html.format(f"{self.detector_config.start_m}")
216        start_limit_text.setHtml(range_html)
217        start_limit_text.setPos(0, 0.25)
218        self.sector_plot.addItem(start_limit_text)
219
220        unit_text = pg.TextItem(html=self.range_html, anchor=(0.5, 0.5))
221        unit_html = self.range_html.format("[m]")
222        unit_text.setHtml(unit_html)
223        unit_text.setPos(
224            self.num_sectors + 0.5, (self.num_sectors + 1) * np.sin(np.radians(span_deg))
225        )
226        self.sector_plot.addItem(unit_text)
227
228        for text_item, limit in zip(self.limit_text, np.flip(self.zone_limits)):
229            range_html = self.range_html.format(np.around(limit, 1))
230            text_item.setHtml(range_html)
231
232        sublayout = win.addLayout(row=1, col=0, colspan=2)
233        sublayout.layout.setColumnStretchFactor(0, 2)
234        sublayout.addItem(self.sector_plot, row=0, col=0)
235
236        self.setup_is_done = True
237
238    def update(self, data):
239        processor_result = data["processor_result"]
240        detector_result = data["detector_result"]
241
242        # Intra presence
243
244        move_hist_xs = np.linspace(-self.history_length_s, 0, self.history_length_n)
245
246        self.intra_history = np.roll(self.intra_history, -1)
247        self.intra_history[-1] = detector_result.intra_presence_score
248
249        m_hist = max(
250            float(np.max(self.intra_history)),
251            self.detector_config.intra_detection_threshold * 1.05,
252        )
253        m_hist = self.intra_history_smooth_max.update(m_hist)
254
255        self.intra_hist_plot.setYRange(0, m_hist)
256        self.intra_hist_curve.setData(move_hist_xs, self.intra_history)
257
258        # Inter presence
259
260        self.inter_history = np.roll(self.inter_history, -1)
261        self.inter_history[-1] = detector_result.inter_presence_score
262
263        m_hist = max(
264            float(np.max(self.inter_history)),
265            self.detector_config.inter_detection_threshold * 1.05,
266        )
267        m_hist = self.inter_history_smooth_max.update(m_hist)
268
269        self.inter_hist_plot.setYRange(0, m_hist)
270        self.inter_hist_curve.setData(move_hist_xs, self.inter_history)
271
272        # Sector
273
274        brush = et.utils.pg_brush_cycler(7)
275        for sector in self.sectors:
276            sector.setBrush(brush)
277
278        if detector_result.presence_detected:
279            assert processor_result.max_presence_zone is not None
280            if processor_result.max_presence_zone == processor_result.max_intra_zone:
281                self.sectors[processor_result.max_presence_zone].setBrush(
282                    et.utils.pg_brush_cycler(1)
283                )
284            else:
285                self.sectors[processor_result.max_presence_zone].setBrush(
286                    et.utils.pg_brush_cycler(0)
287                )
288
289
290if __name__ == "__main__":
291    main()

View this example on GitHub: acconeer/acconeer-python-exploration