examples/a121/algo/presence/detector.py#
1# Copyright (c) Acconeer AB, 2022-2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10from PySide6 import QtCore
11
12import pyqtgraph as pg
13
14import acconeer.exptool as et
15from acconeer.exptool import a121
16from acconeer.exptool.a121._core.entities.configs.sensor_config import SensorConfig
17from acconeer.exptool.a121.algo.presence import Detector, DetectorConfig
18
19
20def main():
21 args = a121.ExampleArgumentParser().parse_args()
22 et.utils.config_logging(args)
23
24 client = a121.Client.open(**a121.get_client_args(args))
25
26 detector_config = DetectorConfig(
27 start_m=1.0,
28 end_m=3.0,
29 )
30
31 detector = Detector(client=client, sensor_id=1, detector_config=detector_config)
32 detector.start()
33
34 pg_updater = PGUpdater(
35 detector_config,
36 detector._get_sensor_config(detector_config),
37 detector.estimated_frame_rate,
38 )
39 pg_process = et.PGProcess(pg_updater)
40 pg_process.start()
41
42 interrupt_handler = et.utils.ExampleInterruptHandler()
43 print("Press Ctrl-C to end session")
44
45 while not interrupt_handler.got_signal:
46 detector_result = detector.get_next()
47 s = "Presence! " if detector_result.presence_detected else "No presence. "
48 s += (
49 f"Intra presence score {detector_result.intra_presence_score:.3f}, "
50 f"inter presence score {detector_result.inter_presence_score:.3f}, "
51 f"presence at {detector_result.presence_distance:.3f} m"
52 )
53 print(s)
54 try:
55 pg_process.put_data(detector_result)
56 except et.PGProccessDiedException:
57 break
58
59 detector.stop()
60
61 print("Disconnecting...")
62 client.close()
63
64
65class PGUpdater:
66 def __init__(
67 self,
68 detector_config: DetectorConfig,
69 sensor_config: SensorConfig,
70 estimated_frame_rate: float,
71 ):
72 self.detector_config = detector_config
73 self.distances = np.linspace(
74 detector_config.start_m, detector_config.end_m, sensor_config.num_points
75 )
76
77 self.history_length_s = 5
78 self.history_length_n = int(round(self.history_length_s * estimated_frame_rate))
79 self.intra_history = np.zeros(self.history_length_n)
80 self.inter_history = np.zeros(self.history_length_n)
81
82 self.setup_is_done = False
83
84 def setup(self, win):
85 win.setWindowTitle("Acconeer presence detection example")
86
87 self.intra_limit_lines = []
88 self.inter_limit_lines = []
89
90 # Noise estimation plot
91
92 self.noise_plot = win.addPlot(
93 row=0,
94 col=0,
95 title="Noise",
96 )
97 self.noise_plot.setMenuEnabled(False)
98 self.noise_plot.setMouseEnabled(x=False, y=False)
99 self.noise_plot.hideButtons()
100 self.noise_plot.showGrid(x=True, y=True)
101 self.noise_plot.setLabel("bottom", "Distance (m)")
102 self.noise_plot.setLabel("left", "Amplitude")
103 self.noise_plot.setVisible(False)
104 self.noise_curve = self.noise_plot.plot(pen=et.utils.pg_pen_cycler())
105 self.noise_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
106
107 # Depthwise presence plot
108
109 self.move_plot = pg.PlotItem(title="Depthwise presence")
110 self.move_plot.setMenuEnabled(False)
111 self.move_plot.setMouseEnabled(x=False, y=False)
112 self.move_plot.hideButtons()
113 self.move_plot.showGrid(x=True, y=True)
114 self.move_plot.setLabel("bottom", "Distance (m)")
115 self.move_plot.setLabel("left", "Norm. ampl.")
116 self.move_plot.setXRange(self.distances[0], self.distances[-1])
117 self.intra_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(1))
118 if not self.detector_config.intra_enable:
119 self.intra_curve.hide()
120
121 self.inter_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(0))
122 if not self.detector_config.inter_enable:
123 self.inter_curve.hide()
124
125 self.move_smooth_max = et.utils.SmoothMax(
126 self.detector_config.frame_rate,
127 tau_decay=1.0,
128 tau_grow=0.25,
129 )
130
131 self.move_depth_line = pg.InfiniteLine(pen=pg.mkPen("k", width=1.5))
132 self.move_depth_line.hide()
133 self.move_plot.addItem(self.move_depth_line)
134
135 self.present_html_format = (
136 '<div style="text-align: center">'
137 '<span style="color: #FFFFFF;font-size:15pt;">'
138 "{}</span></div>"
139 )
140 not_present_html = (
141 '<div style="text-align: center">'
142 '<span style="color: #FFFFFF;font-size:15pt;">'
143 "{}</span></div>".format("No presence detected")
144 )
145 self.present_text_item = pg.TextItem(
146 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
147 anchor=(0.5, 0),
148 )
149 self.not_present_text_item = pg.TextItem(
150 html=not_present_html,
151 fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
152 anchor=(0.5, 0),
153 )
154
155 self.move_plot.addItem(self.present_text_item)
156 self.move_plot.addItem(self.not_present_text_item)
157 self.present_text_item.hide()
158 self.not_present_text_item.hide()
159
160 # Intra presence history plot
161
162 self.intra_hist_plot = win.addPlot(
163 row=1,
164 col=0,
165 title="Intra presence history (fast motions)",
166 )
167 self.intra_hist_plot.setMenuEnabled(False)
168 self.intra_hist_plot.setMouseEnabled(x=False, y=False)
169 self.intra_hist_plot.hideButtons()
170 self.intra_hist_plot.showGrid(x=True, y=True)
171 self.intra_hist_plot.setLabel("bottom", "Time (s)")
172 self.intra_hist_plot.setLabel("left", "Score")
173 self.intra_hist_plot.setXRange(-self.history_length_s, 0)
174 self.intra_history_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
175 self.intra_hist_plot.setYRange(0, 10)
176 if not self.detector_config.intra_enable:
177 intra_color = et.utils.color_cycler(1)
178 intra_color = f"{intra_color}50"
179 intra_dashed_pen = pg.mkPen(intra_color, width=2.5, style=QtCore.Qt.DashLine)
180 intra_pen = pg.mkPen(intra_color, width=2)
181 else:
182 intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
183 intra_pen = et.utils.pg_pen_cycler(1)
184
185 self.intra_hist_curve = self.intra_hist_plot.plot(pen=intra_pen)
186 limit_line = pg.InfiniteLine(angle=0, pen=intra_dashed_pen)
187 self.intra_hist_plot.addItem(limit_line)
188 self.intra_limit_lines.append(limit_line)
189
190 for line in self.intra_limit_lines:
191 line.setPos(self.detector_config.intra_detection_threshold)
192
193 # Inter presence history plot
194
195 self.inter_hist_plot = win.addPlot(
196 row=1,
197 col=1,
198 title="Inter presence history (slow motions)",
199 )
200 self.inter_hist_plot.setMenuEnabled(False)
201 self.inter_hist_plot.setMouseEnabled(x=False, y=False)
202 self.inter_hist_plot.hideButtons()
203 self.inter_hist_plot.showGrid(x=True, y=True)
204 self.inter_hist_plot.setLabel("bottom", "Time (s)")
205 self.inter_hist_plot.setLabel("left", "Score")
206 self.inter_hist_plot.setXRange(-self.history_length_s, 0)
207 self.inter_history_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
208 self.inter_hist_plot.setYRange(0, 10)
209 if not self.detector_config.inter_enable:
210 inter_color = et.utils.color_cycler(0)
211 inter_color = f"{inter_color}50"
212 inter_dashed_pen = pg.mkPen(inter_color, width=2.5, style=QtCore.Qt.DashLine)
213 inter_pen = pg.mkPen(inter_color, width=2)
214 else:
215 inter_pen = et.utils.pg_pen_cycler(0)
216 inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
217
218 self.inter_hist_curve = self.inter_hist_plot.plot(pen=inter_pen)
219 limit_line = pg.InfiniteLine(angle=0, pen=inter_dashed_pen)
220 self.inter_hist_plot.addItem(limit_line)
221 self.inter_limit_lines.append(limit_line)
222
223 for line in self.inter_limit_lines:
224 line.setPos(self.detector_config.inter_detection_threshold)
225
226 sublayout = win.addLayout(row=2, col=0, colspan=2)
227 sublayout.layout.setColumnStretchFactor(0, 2)
228 sublayout.addItem(self.move_plot, row=0, col=0)
229
230 self.setup_is_done = True
231
232 def update(self, data):
233 noise = data.processor_extra_result.lp_noise
234 self.noise_curve.setData(self.distances, noise)
235 self.noise_plot.setYRange(0, self.noise_smooth_max.update(noise))
236
237 movement_x = data.presence_distance
238
239 self.inter_curve.setData(self.distances, data.inter_depthwise_scores)
240 self.intra_curve.setData(self.distances, data.intra_depthwise_scores)
241 m = self.move_smooth_max.update(
242 np.max(np.maximum(data.inter_depthwise_scores, data.intra_depthwise_scores))
243 )
244 m = max(
245 m,
246 2
247 * np.maximum(
248 self.detector_config.intra_detection_threshold,
249 self.detector_config.inter_detection_threshold,
250 ),
251 )
252 self.move_plot.setYRange(0, m)
253 self.move_depth_line.setPos(movement_x)
254 self.move_depth_line.setVisible(bool(data.presence_detected))
255
256 self.set_present_text_y_pos(m)
257
258 if data.presence_detected:
259 present_text = "Presence detected at {:.0f} cm".format(movement_x * 100)
260 present_html = self.present_html_format.format(present_text)
261 self.present_text_item.setHtml(present_html)
262
263 self.present_text_item.show()
264 self.not_present_text_item.hide()
265 else:
266 self.present_text_item.hide()
267 self.not_present_text_item.show()
268
269 # Intra presence
270 move_hist_xs = np.linspace(-self.history_length_s, 0, self.history_length_n)
271
272 self.intra_history = np.roll(self.intra_history, -1)
273 self.intra_history[-1] = data.intra_presence_score
274
275 m_hist = max(
276 float(np.max(self.intra_history)),
277 self.detector_config.intra_detection_threshold * 1.05,
278 )
279 m_hist = self.intra_history_smooth_max.update(m_hist)
280
281 self.intra_hist_plot.setYRange(0, m_hist)
282 self.intra_hist_curve.setData(move_hist_xs, self.intra_history)
283
284 # Inter presence
285
286 self.inter_history = np.roll(self.inter_history, -1)
287 self.inter_history[-1] = data.inter_presence_score
288
289 m_hist = max(
290 float(np.max(self.inter_history)),
291 self.detector_config.inter_detection_threshold * 1.05,
292 )
293 m_hist = self.inter_history_smooth_max.update(m_hist)
294
295 self.inter_hist_plot.setYRange(0, m_hist)
296 self.inter_hist_curve.setData(move_hist_xs, self.inter_history)
297
298 def set_present_text_y_pos(self, y):
299 x_pos = self.distances[0] + (self.distances[-1] - self.distances[0]) / 2
300 self.present_text_item.setPos(x_pos, 0.95 * y)
301 self.not_present_text_item.setPos(x_pos, 0.95 * y)
302
303
304if __name__ == "__main__":
305 main()
View this example on GitHub: acconeer/acconeer-python-exploration