examples/a121/algo/presence/detector.py

examples/a121/algo/presence/detector.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10from PySide6 import QtCore
 11
 12import pyqtgraph as pg
 13
 14import acconeer.exptool as et
 15from acconeer.exptool import a121
 16from acconeer.exptool.a121._core.entities.configs.sensor_config import SensorConfig
 17from acconeer.exptool.a121.algo.presence import Detector, DetectorConfig
 18
 19
 20def main():
 21    args = a121.ExampleArgumentParser().parse_args()
 22    et.utils.config_logging(args)
 23
 24    client = a121.Client.open(**a121.get_client_args(args))
 25
 26    detector_config = DetectorConfig(
 27        start_m=1.0,
 28        end_m=3.0,
 29    )
 30
 31    detector = Detector(client=client, sensor_id=1, detector_config=detector_config)
 32    detector.start()
 33
 34    pg_updater = PGUpdater(
 35        detector_config,
 36        detector._get_sensor_config(detector_config),
 37        detector.estimated_frame_rate,
 38    )
 39    pg_process = et.PGProcess(pg_updater)
 40    pg_process.start()
 41
 42    interrupt_handler = et.utils.ExampleInterruptHandler()
 43    print("Press Ctrl-C to end session")
 44
 45    while not interrupt_handler.got_signal:
 46        detector_result = detector.get_next()
 47        s = "Presence! " if detector_result.presence_detected else "No presence. "
 48        s += (
 49            f"Intra presence score {detector_result.intra_presence_score:.3f}, "
 50            f"inter presence score {detector_result.inter_presence_score:.3f}, "
 51            f"presence at {detector_result.presence_distance:.3f} m"
 52        )
 53        print(s)
 54        try:
 55            pg_process.put_data(detector_result)
 56        except et.PGProccessDiedException:
 57            break
 58
 59    detector.stop()
 60
 61    print("Disconnecting...")
 62    client.close()
 63
 64
 65class PGUpdater:
 66    def __init__(
 67        self,
 68        detector_config: DetectorConfig,
 69        sensor_config: SensorConfig,
 70        estimated_frame_rate: float,
 71    ):
 72        self.detector_config = detector_config
 73        self.distances = np.linspace(
 74            detector_config.start_m, detector_config.end_m, sensor_config.num_points
 75        )
 76
 77        self.history_length_s = 5
 78        self.history_length_n = int(round(self.history_length_s * estimated_frame_rate))
 79        self.intra_history = np.zeros(self.history_length_n)
 80        self.inter_history = np.zeros(self.history_length_n)
 81
 82        self.setup_is_done = False
 83
 84    def setup(self, win):
 85        win.setWindowTitle("Acconeer presence detection example")
 86
 87        self.intra_limit_lines = []
 88        self.inter_limit_lines = []
 89
 90        # Noise estimation plot
 91
 92        self.noise_plot = win.addPlot(
 93            row=0,
 94            col=0,
 95            title="Noise",
 96        )
 97        self.noise_plot.setMenuEnabled(False)
 98        self.noise_plot.setMouseEnabled(x=False, y=False)
 99        self.noise_plot.hideButtons()
100        self.noise_plot.showGrid(x=True, y=True)
101        self.noise_plot.setLabel("bottom", "Distance (m)")
102        self.noise_plot.setLabel("left", "Amplitude")
103        self.noise_plot.setVisible(False)
104        self.noise_curve = self.noise_plot.plot(pen=et.utils.pg_pen_cycler())
105        self.noise_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
106
107        # Depthwise presence plot
108
109        self.move_plot = pg.PlotItem(title="Depthwise presence")
110        self.move_plot.setMenuEnabled(False)
111        self.move_plot.setMouseEnabled(x=False, y=False)
112        self.move_plot.hideButtons()
113        self.move_plot.showGrid(x=True, y=True)
114        self.move_plot.setLabel("bottom", "Distance (m)")
115        self.move_plot.setLabel("left", "Norm. ampl.")
116        self.move_plot.setXRange(self.distances[0], self.distances[-1])
117        self.intra_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(1))
118        if not self.detector_config.intra_enable:
119            self.intra_curve.hide()
120
121        self.inter_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(0))
122        if not self.detector_config.inter_enable:
123            self.inter_curve.hide()
124
125        self.move_smooth_max = et.utils.SmoothMax(
126            self.detector_config.frame_rate,
127            tau_decay=1.0,
128            tau_grow=0.25,
129        )
130
131        self.move_depth_line = pg.InfiniteLine(pen=pg.mkPen("k", width=1.5))
132        self.move_depth_line.hide()
133        self.move_plot.addItem(self.move_depth_line)
134
135        self.present_html_format = (
136            '<div style="text-align: center">'
137            '<span style="color: #FFFFFF;font-size:15pt;">'
138            "{}</span></div>"
139        )
140        not_present_html = (
141            '<div style="text-align: center">'
142            '<span style="color: #FFFFFF;font-size:15pt;">'
143            "{}</span></div>".format("No presence detected")
144        )
145        self.present_text_item = pg.TextItem(
146            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
147            anchor=(0.5, 0),
148        )
149        self.not_present_text_item = pg.TextItem(
150            html=not_present_html,
151            fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
152            anchor=(0.5, 0),
153        )
154
155        self.move_plot.addItem(self.present_text_item)
156        self.move_plot.addItem(self.not_present_text_item)
157        self.present_text_item.hide()
158        self.not_present_text_item.hide()
159
160        # Intra presence history plot
161
162        self.intra_hist_plot = win.addPlot(
163            row=1,
164            col=0,
165            title="Intra presence history (fast motions)",
166        )
167        self.intra_hist_plot.setMenuEnabled(False)
168        self.intra_hist_plot.setMouseEnabled(x=False, y=False)
169        self.intra_hist_plot.hideButtons()
170        self.intra_hist_plot.showGrid(x=True, y=True)
171        self.intra_hist_plot.setLabel("bottom", "Time (s)")
172        self.intra_hist_plot.setLabel("left", "Score")
173        self.intra_hist_plot.setXRange(-self.history_length_s, 0)
174        self.intra_history_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
175        self.intra_hist_plot.setYRange(0, 10)
176        if not self.detector_config.intra_enable:
177            intra_color = et.utils.color_cycler(1)
178            intra_color = f"{intra_color}50"
179            intra_dashed_pen = pg.mkPen(intra_color, width=2.5, style=QtCore.Qt.DashLine)
180            intra_pen = pg.mkPen(intra_color, width=2)
181        else:
182            intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
183            intra_pen = et.utils.pg_pen_cycler(1)
184
185        self.intra_hist_curve = self.intra_hist_plot.plot(pen=intra_pen)
186        limit_line = pg.InfiniteLine(angle=0, pen=intra_dashed_pen)
187        self.intra_hist_plot.addItem(limit_line)
188        self.intra_limit_lines.append(limit_line)
189
190        for line in self.intra_limit_lines:
191            line.setPos(self.detector_config.intra_detection_threshold)
192
193        # Inter presence history plot
194
195        self.inter_hist_plot = win.addPlot(
196            row=1,
197            col=1,
198            title="Inter presence history (slow motions)",
199        )
200        self.inter_hist_plot.setMenuEnabled(False)
201        self.inter_hist_plot.setMouseEnabled(x=False, y=False)
202        self.inter_hist_plot.hideButtons()
203        self.inter_hist_plot.showGrid(x=True, y=True)
204        self.inter_hist_plot.setLabel("bottom", "Time (s)")
205        self.inter_hist_plot.setLabel("left", "Score")
206        self.inter_hist_plot.setXRange(-self.history_length_s, 0)
207        self.inter_history_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
208        self.inter_hist_plot.setYRange(0, 10)
209        if not self.detector_config.inter_enable:
210            inter_color = et.utils.color_cycler(0)
211            inter_color = f"{inter_color}50"
212            inter_dashed_pen = pg.mkPen(inter_color, width=2.5, style=QtCore.Qt.DashLine)
213            inter_pen = pg.mkPen(inter_color, width=2)
214        else:
215            inter_pen = et.utils.pg_pen_cycler(0)
216            inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
217
218        self.inter_hist_curve = self.inter_hist_plot.plot(pen=inter_pen)
219        limit_line = pg.InfiniteLine(angle=0, pen=inter_dashed_pen)
220        self.inter_hist_plot.addItem(limit_line)
221        self.inter_limit_lines.append(limit_line)
222
223        for line in self.inter_limit_lines:
224            line.setPos(self.detector_config.inter_detection_threshold)
225
226        sublayout = win.addLayout(row=2, col=0, colspan=2)
227        sublayout.layout.setColumnStretchFactor(0, 2)
228        sublayout.addItem(self.move_plot, row=0, col=0)
229
230        self.setup_is_done = True
231
232    def update(self, data):
233        noise = data.processor_extra_result.lp_noise
234        self.noise_curve.setData(self.distances, noise)
235        self.noise_plot.setYRange(0, self.noise_smooth_max.update(noise))
236
237        movement_x = data.presence_distance
238
239        self.inter_curve.setData(self.distances, data.inter_depthwise_scores)
240        self.intra_curve.setData(self.distances, data.intra_depthwise_scores)
241        m = self.move_smooth_max.update(
242            np.max(np.maximum(data.inter_depthwise_scores, data.intra_depthwise_scores))
243        )
244        m = max(
245            m,
246            2
247            * np.maximum(
248                self.detector_config.intra_detection_threshold,
249                self.detector_config.inter_detection_threshold,
250            ),
251        )
252        self.move_plot.setYRange(0, m)
253        self.move_depth_line.setPos(movement_x)
254        self.move_depth_line.setVisible(bool(data.presence_detected))
255
256        self.set_present_text_y_pos(m)
257
258        if data.presence_detected:
259            present_text = "Presence detected at {:.0f} cm".format(movement_x * 100)
260            present_html = self.present_html_format.format(present_text)
261            self.present_text_item.setHtml(present_html)
262
263            self.present_text_item.show()
264            self.not_present_text_item.hide()
265        else:
266            self.present_text_item.hide()
267            self.not_present_text_item.show()
268
269        # Intra presence
270        move_hist_xs = np.linspace(-self.history_length_s, 0, self.history_length_n)
271
272        self.intra_history = np.roll(self.intra_history, -1)
273        self.intra_history[-1] = data.intra_presence_score
274
275        m_hist = max(
276            float(np.max(self.intra_history)),
277            self.detector_config.intra_detection_threshold * 1.05,
278        )
279        m_hist = self.intra_history_smooth_max.update(m_hist)
280
281        self.intra_hist_plot.setYRange(0, m_hist)
282        self.intra_hist_curve.setData(move_hist_xs, self.intra_history)
283
284        # Inter presence
285
286        self.inter_history = np.roll(self.inter_history, -1)
287        self.inter_history[-1] = data.inter_presence_score
288
289        m_hist = max(
290            float(np.max(self.inter_history)),
291            self.detector_config.inter_detection_threshold * 1.05,
292        )
293        m_hist = self.inter_history_smooth_max.update(m_hist)
294
295        self.inter_hist_plot.setYRange(0, m_hist)
296        self.inter_hist_curve.setData(move_hist_xs, self.inter_history)
297
298    def set_present_text_y_pos(self, y):
299        x_pos = self.distances[0] + (self.distances[-1] - self.distances[0]) / 2
300        self.present_text_item.setPos(x_pos, 0.95 * y)
301        self.not_present_text_item.setPos(x_pos, 0.95 * y)
302
303
304if __name__ == "__main__":
305    main()

View this example on GitHub: acconeer/acconeer-python-exploration