examples/a121/algo/presence/processor.py

examples/a121/algo/presence/processor.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10from PySide6 import QtCore
 11
 12import pyqtgraph as pg
 13
 14import acconeer.exptool as et
 15from acconeer.exptool import a121
 16from acconeer.exptool.a121.algo._utils import get_distances_m
 17from acconeer.exptool.a121.algo.presence import Processor, ProcessorConfig
 18
 19
 20def main():
 21    args = a121.ExampleArgumentParser().parse_args()
 22    et.utils.config_logging(args)
 23
 24    client = a121.Client.open(**a121.get_client_args(args))
 25
 26    sensor_config = a121.SensorConfig(
 27        start_point=600,
 28        step_length=120,
 29        num_points=5,
 30        profile=a121.Profile.PROFILE_5,
 31        sweeps_per_frame=32,
 32        hwaas=32,
 33        frame_rate=20,
 34    )
 35
 36    metadata = client.setup_session(sensor_config)
 37
 38    presence_config = ProcessorConfig(
 39        intra_detection_threshold=1.3,
 40        inter_detection_threshold=1,
 41    )
 42
 43    presence_processor = Processor(
 44        sensor_config=sensor_config,
 45        metadata=metadata,
 46        processor_config=presence_config,
 47    )
 48
 49    pg_updater = PGUpdater(sensor_config, presence_config, metadata)
 50    pg_process = et.PGProcess(pg_updater)
 51    pg_process.start()
 52
 53    client.start_session()
 54
 55    interrupt_handler = et.utils.ExampleInterruptHandler()
 56    print("Press Ctrl-C to end session")
 57
 58    while not interrupt_handler.got_signal:
 59        result = client.get_next()
 60        processed_data = presence_processor.process(result)
 61        try:
 62            pg_process.put_data(processed_data)
 63        except et.PGProccessDiedException:
 64            break
 65
 66    print("Disconnecting...")
 67    pg_process.close()
 68    client.close()
 69
 70
 71class PGUpdater:
 72    def __init__(self, sensor_config, processor_config, metadata):
 73        self.sensor_config = sensor_config
 74        self.processor_config = processor_config
 75
 76        self.history_length_s = 5
 77        self.history_length_n = int(round(self.history_length_s * sensor_config.frame_rate))
 78        self.intra_history = np.zeros(self.history_length_n)
 79        self.inter_history = np.zeros(self.history_length_n)
 80
 81        self.distances = get_distances_m(self.sensor_config, metadata)
 82
 83        self.setup_is_done = False
 84
 85    def setup(self, win):
 86        win.setWindowTitle("Acconeer presence detection example")
 87
 88        self.intra_limit_lines = []
 89        self.inter_limit_lines = []
 90
 91        # Noise estimation plot
 92
 93        self.noise_plot = win.addPlot(
 94            row=0,
 95            col=0,
 96            title="Noise",
 97        )
 98        self.noise_plot.setMenuEnabled(False)
 99        self.noise_plot.setMouseEnabled(x=False, y=False)
100        self.noise_plot.hideButtons()
101        self.noise_plot.showGrid(x=True, y=True)
102        self.noise_plot.setLabel("bottom", "Distance (m)")
103        self.noise_plot.setLabel("left", "Amplitude")
104        self.noise_plot.setVisible(False)
105        self.noise_curve = self.noise_plot.plot(pen=et.utils.pg_pen_cycler())
106        self.noise_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
107
108        # Depthwise presence plot
109
110        self.move_plot = pg.PlotItem(title="Depthwise presence")
111        self.move_plot.setMenuEnabled(False)
112        self.move_plot.setMouseEnabled(x=False, y=False)
113        self.move_plot.hideButtons()
114        self.move_plot.showGrid(x=True, y=True)
115        self.move_plot.setLabel("bottom", "Distance (m)")
116        self.move_plot.setLabel("left", "Norm. ampl.")
117        self.move_plot.setXRange(self.distances[0], self.distances[-1])
118        self.intra_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(1))
119        if not self.processor_config.intra_enable:
120            self.intra_curve.hide()
121
122        self.inter_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(0))
123        if not self.processor_config.inter_enable:
124            self.inter_curve.hide()
125
126        self.move_smooth_max = et.utils.SmoothMax(
127            self.sensor_config.frame_rate,
128            tau_decay=1.0,
129            tau_grow=0.25,
130        )
131
132        self.move_depth_line = pg.InfiniteLine(pen=pg.mkPen("k", width=1.5))
133        self.move_depth_line.hide()
134        self.move_plot.addItem(self.move_depth_line)
135
136        self.present_html_format = (
137            '<div style="text-align: center">'
138            '<span style="color: #FFFFFF;font-size:15pt;">'
139            "{}</span></div>"
140        )
141        not_present_html = (
142            '<div style="text-align: center">'
143            '<span style="color: #FFFFFF;font-size:15pt;">'
144            "{}</span></div>".format("No presence detected")
145        )
146        self.present_text_item = pg.TextItem(
147            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
148            anchor=(0.5, 0),
149        )
150        self.not_present_text_item = pg.TextItem(
151            html=not_present_html,
152            fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
153            anchor=(0.5, 0),
154        )
155
156        self.move_plot.addItem(self.present_text_item)
157        self.move_plot.addItem(self.not_present_text_item)
158        self.present_text_item.hide()
159        self.not_present_text_item.hide()
160
161        # Intra presence history plot
162
163        self.intra_hist_plot = win.addPlot(
164            row=1,
165            col=0,
166            title="Intra presence history (fast motions)",
167        )
168        self.intra_hist_plot.setMenuEnabled(False)
169        self.intra_hist_plot.setMouseEnabled(x=False, y=False)
170        self.intra_hist_plot.hideButtons()
171        self.intra_hist_plot.showGrid(x=True, y=True)
172        self.intra_hist_plot.setLabel("bottom", "Time (s)")
173        self.intra_hist_plot.setLabel("left", "Score")
174        self.intra_hist_plot.setXRange(-self.history_length_s, 0)
175        self.intra_history_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
176        self.intra_hist_plot.setYRange(0, 10)
177        if not self.processor_config.intra_enable:
178            intra_color = et.utils.color_cycler(1)
179            intra_color = f"{intra_color}50"
180            intra_dashed_pen = pg.mkPen(intra_color, width=2.5, style=QtCore.Qt.DashLine)
181            intra_pen = pg.mkPen(intra_color, width=2)
182        else:
183            intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
184            intra_pen = et.utils.pg_pen_cycler(1)
185
186        self.intra_hist_curve = self.intra_hist_plot.plot(pen=intra_pen)
187        limit_line = pg.InfiniteLine(angle=0, pen=intra_dashed_pen)
188        self.intra_hist_plot.addItem(limit_line)
189        self.intra_limit_lines.append(limit_line)
190
191        for line in self.intra_limit_lines:
192            line.setPos(self.processor_config.intra_detection_threshold)
193
194        # Inter presence history plot
195
196        self.inter_hist_plot = win.addPlot(
197            row=1,
198            col=1,
199            title="Inter presence history (slow motions)",
200        )
201        self.inter_hist_plot.setMenuEnabled(False)
202        self.inter_hist_plot.setMouseEnabled(x=False, y=False)
203        self.inter_hist_plot.hideButtons()
204        self.inter_hist_plot.showGrid(x=True, y=True)
205        self.inter_hist_plot.setLabel("bottom", "Time (s)")
206        self.inter_hist_plot.setLabel("left", "Score")
207        self.inter_hist_plot.setXRange(-self.history_length_s, 0)
208        self.inter_history_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
209        self.inter_hist_plot.setYRange(0, 10)
210        if not self.processor_config.inter_enable:
211            inter_color = et.utils.color_cycler(0)
212            inter_color = f"{inter_color}50"
213            inter_dashed_pen = pg.mkPen(inter_color, width=2.5, style=QtCore.Qt.DashLine)
214            inter_pen = pg.mkPen(inter_color, width=2)
215        else:
216            inter_pen = et.utils.pg_pen_cycler(0)
217            inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
218
219        self.inter_hist_curve = self.inter_hist_plot.plot(pen=inter_pen)
220        limit_line = pg.InfiniteLine(angle=0, pen=inter_dashed_pen)
221        self.inter_hist_plot.addItem(limit_line)
222        self.inter_limit_lines.append(limit_line)
223
224        for line in self.inter_limit_lines:
225            line.setPos(self.processor_config.inter_detection_threshold)
226
227        sublayout = win.addLayout(row=2, col=0, colspan=2)
228        sublayout.layout.setColumnStretchFactor(0, 2)
229        sublayout.addItem(self.move_plot, row=0, col=0)
230
231        self.setup_is_done = True
232
233    def update(self, data):
234        noise = data.extra_result.lp_noise
235        self.noise_curve.setData(self.distances, noise)
236        self.noise_plot.setYRange(0, self.noise_smooth_max.update(noise))
237
238        movement_x = data.presence_distance
239
240        self.inter_curve.setData(self.distances, data.inter)
241        self.intra_curve.setData(self.distances, data.intra)
242        m = self.move_smooth_max.update(np.max(np.maximum(data.inter, data.intra)))
243        m = max(
244            m,
245            2
246            * np.maximum(
247                self.processor_config.intra_detection_threshold,
248                self.processor_config.inter_detection_threshold,
249            ),
250        )
251        self.move_plot.setYRange(0, m)
252        self.move_depth_line.setPos(movement_x)
253        self.move_depth_line.setVisible(bool(data.presence_detected))
254
255        self.set_present_text_y_pos(m)
256
257        if data.presence_detected:
258            present_text = "Presence detected at {:.0f} cm".format(movement_x * 100)
259            present_html = self.present_html_format.format(present_text)
260            self.present_text_item.setHtml(present_html)
261
262            self.present_text_item.show()
263            self.not_present_text_item.hide()
264        else:
265            self.present_text_item.hide()
266            self.not_present_text_item.show()
267
268        # Intra presence
269
270        move_hist_xs = np.linspace(-self.history_length_s, 0, self.history_length_n)
271
272        self.intra_history = np.roll(self.intra_history, -1)
273        self.intra_history[-1] = data.intra_presence_score
274
275        m_hist = max(
276            float(np.max(self.intra_history)),
277            self.processor_config.intra_detection_threshold * 1.05,
278        )
279        m_hist = self.intra_history_smooth_max.update(m_hist)
280
281        self.intra_hist_plot.setYRange(0, m_hist)
282        self.intra_hist_curve.setData(move_hist_xs, self.intra_history)
283
284        # Inter presence
285
286        self.inter_history = np.roll(self.inter_history, -1)
287        self.inter_history[-1] = data.inter_presence_score
288
289        m_hist = max(
290            float(np.max(self.inter_history)),
291            self.processor_config.inter_detection_threshold * 1.05,
292        )
293        m_hist = self.inter_history_smooth_max.update(m_hist)
294
295        self.inter_hist_plot.setYRange(0, m_hist)
296        self.inter_hist_curve.setData(move_hist_xs, self.inter_history)
297
298    def set_present_text_y_pos(self, y):
299        x_pos = self.distances[0] + (self.distances[-1] - self.distances[0]) / 2
300        self.present_text_item.setPos(x_pos, 0.95 * y)
301        self.not_present_text_item.setPos(x_pos, 0.95 * y)
302
303
304if __name__ == "__main__":
305    main()

View this example on GitHub: acconeer/acconeer-python-exploration