examples/a121/algo/presence/processor.py#
1# Copyright (c) Acconeer AB, 2022-2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10from PySide6 import QtCore
11
12import pyqtgraph as pg
13
14import acconeer.exptool as et
15from acconeer.exptool import a121
16from acconeer.exptool.a121.algo._utils import get_distances_m
17from acconeer.exptool.a121.algo.presence import Processor, ProcessorConfig
18
19
20def main():
21 args = a121.ExampleArgumentParser().parse_args()
22 et.utils.config_logging(args)
23
24 client = a121.Client.open(**a121.get_client_args(args))
25
26 sensor_config = a121.SensorConfig(
27 start_point=600,
28 step_length=120,
29 num_points=5,
30 profile=a121.Profile.PROFILE_5,
31 sweeps_per_frame=32,
32 hwaas=32,
33 frame_rate=20,
34 )
35
36 metadata = client.setup_session(sensor_config)
37
38 presence_config = ProcessorConfig(
39 intra_detection_threshold=1.3,
40 inter_detection_threshold=1,
41 )
42
43 presence_processor = Processor(
44 sensor_config=sensor_config,
45 metadata=metadata,
46 processor_config=presence_config,
47 )
48
49 pg_updater = PGUpdater(sensor_config, presence_config, metadata)
50 pg_process = et.PGProcess(pg_updater)
51 pg_process.start()
52
53 client.start_session()
54
55 interrupt_handler = et.utils.ExampleInterruptHandler()
56 print("Press Ctrl-C to end session")
57
58 while not interrupt_handler.got_signal:
59 result = client.get_next()
60 processed_data = presence_processor.process(result)
61 try:
62 pg_process.put_data(processed_data)
63 except et.PGProccessDiedException:
64 break
65
66 print("Disconnecting...")
67 pg_process.close()
68 client.close()
69
70
71class PGUpdater:
72 def __init__(self, sensor_config, processor_config, metadata):
73 self.sensor_config = sensor_config
74 self.processor_config = processor_config
75
76 self.history_length_s = 5
77 self.history_length_n = int(round(self.history_length_s * sensor_config.frame_rate))
78 self.intra_history = np.zeros(self.history_length_n)
79 self.inter_history = np.zeros(self.history_length_n)
80
81 self.distances = get_distances_m(self.sensor_config, metadata)
82
83 self.setup_is_done = False
84
85 def setup(self, win):
86 win.setWindowTitle("Acconeer presence detection example")
87
88 self.intra_limit_lines = []
89 self.inter_limit_lines = []
90
91 # Noise estimation plot
92
93 self.noise_plot = win.addPlot(
94 row=0,
95 col=0,
96 title="Noise",
97 )
98 self.noise_plot.setMenuEnabled(False)
99 self.noise_plot.setMouseEnabled(x=False, y=False)
100 self.noise_plot.hideButtons()
101 self.noise_plot.showGrid(x=True, y=True)
102 self.noise_plot.setLabel("bottom", "Distance (m)")
103 self.noise_plot.setLabel("left", "Amplitude")
104 self.noise_plot.setVisible(False)
105 self.noise_curve = self.noise_plot.plot(pen=et.utils.pg_pen_cycler())
106 self.noise_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
107
108 # Depthwise presence plot
109
110 self.move_plot = pg.PlotItem(title="Depthwise presence")
111 self.move_plot.setMenuEnabled(False)
112 self.move_plot.setMouseEnabled(x=False, y=False)
113 self.move_plot.hideButtons()
114 self.move_plot.showGrid(x=True, y=True)
115 self.move_plot.setLabel("bottom", "Distance (m)")
116 self.move_plot.setLabel("left", "Norm. ampl.")
117 self.move_plot.setXRange(self.distances[0], self.distances[-1])
118 self.intra_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(1))
119 if not self.processor_config.intra_enable:
120 self.intra_curve.hide()
121
122 self.inter_curve = self.move_plot.plot(pen=et.utils.pg_pen_cycler(0))
123 if not self.processor_config.inter_enable:
124 self.inter_curve.hide()
125
126 self.move_smooth_max = et.utils.SmoothMax(
127 self.sensor_config.frame_rate,
128 tau_decay=1.0,
129 tau_grow=0.25,
130 )
131
132 self.move_depth_line = pg.InfiniteLine(pen=pg.mkPen("k", width=1.5))
133 self.move_depth_line.hide()
134 self.move_plot.addItem(self.move_depth_line)
135
136 self.present_html_format = (
137 '<div style="text-align: center">'
138 '<span style="color: #FFFFFF;font-size:15pt;">'
139 "{}</span></div>"
140 )
141 not_present_html = (
142 '<div style="text-align: center">'
143 '<span style="color: #FFFFFF;font-size:15pt;">'
144 "{}</span></div>".format("No presence detected")
145 )
146 self.present_text_item = pg.TextItem(
147 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
148 anchor=(0.5, 0),
149 )
150 self.not_present_text_item = pg.TextItem(
151 html=not_present_html,
152 fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
153 anchor=(0.5, 0),
154 )
155
156 self.move_plot.addItem(self.present_text_item)
157 self.move_plot.addItem(self.not_present_text_item)
158 self.present_text_item.hide()
159 self.not_present_text_item.hide()
160
161 # Intra presence history plot
162
163 self.intra_hist_plot = win.addPlot(
164 row=1,
165 col=0,
166 title="Intra presence history (fast motions)",
167 )
168 self.intra_hist_plot.setMenuEnabled(False)
169 self.intra_hist_plot.setMouseEnabled(x=False, y=False)
170 self.intra_hist_plot.hideButtons()
171 self.intra_hist_plot.showGrid(x=True, y=True)
172 self.intra_hist_plot.setLabel("bottom", "Time (s)")
173 self.intra_hist_plot.setLabel("left", "Score")
174 self.intra_hist_plot.setXRange(-self.history_length_s, 0)
175 self.intra_history_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
176 self.intra_hist_plot.setYRange(0, 10)
177 if not self.processor_config.intra_enable:
178 intra_color = et.utils.color_cycler(1)
179 intra_color = f"{intra_color}50"
180 intra_dashed_pen = pg.mkPen(intra_color, width=2.5, style=QtCore.Qt.DashLine)
181 intra_pen = pg.mkPen(intra_color, width=2)
182 else:
183 intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
184 intra_pen = et.utils.pg_pen_cycler(1)
185
186 self.intra_hist_curve = self.intra_hist_plot.plot(pen=intra_pen)
187 limit_line = pg.InfiniteLine(angle=0, pen=intra_dashed_pen)
188 self.intra_hist_plot.addItem(limit_line)
189 self.intra_limit_lines.append(limit_line)
190
191 for line in self.intra_limit_lines:
192 line.setPos(self.processor_config.intra_detection_threshold)
193
194 # Inter presence history plot
195
196 self.inter_hist_plot = win.addPlot(
197 row=1,
198 col=1,
199 title="Inter presence history (slow motions)",
200 )
201 self.inter_hist_plot.setMenuEnabled(False)
202 self.inter_hist_plot.setMouseEnabled(x=False, y=False)
203 self.inter_hist_plot.hideButtons()
204 self.inter_hist_plot.showGrid(x=True, y=True)
205 self.inter_hist_plot.setLabel("bottom", "Time (s)")
206 self.inter_hist_plot.setLabel("left", "Score")
207 self.inter_hist_plot.setXRange(-self.history_length_s, 0)
208 self.inter_history_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
209 self.inter_hist_plot.setYRange(0, 10)
210 if not self.processor_config.inter_enable:
211 inter_color = et.utils.color_cycler(0)
212 inter_color = f"{inter_color}50"
213 inter_dashed_pen = pg.mkPen(inter_color, width=2.5, style=QtCore.Qt.DashLine)
214 inter_pen = pg.mkPen(inter_color, width=2)
215 else:
216 inter_pen = et.utils.pg_pen_cycler(0)
217 inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
218
219 self.inter_hist_curve = self.inter_hist_plot.plot(pen=inter_pen)
220 limit_line = pg.InfiniteLine(angle=0, pen=inter_dashed_pen)
221 self.inter_hist_plot.addItem(limit_line)
222 self.inter_limit_lines.append(limit_line)
223
224 for line in self.inter_limit_lines:
225 line.setPos(self.processor_config.inter_detection_threshold)
226
227 sublayout = win.addLayout(row=2, col=0, colspan=2)
228 sublayout.layout.setColumnStretchFactor(0, 2)
229 sublayout.addItem(self.move_plot, row=0, col=0)
230
231 self.setup_is_done = True
232
233 def update(self, data):
234 noise = data.extra_result.lp_noise
235 self.noise_curve.setData(self.distances, noise)
236 self.noise_plot.setYRange(0, self.noise_smooth_max.update(noise))
237
238 movement_x = data.presence_distance
239
240 self.inter_curve.setData(self.distances, data.inter)
241 self.intra_curve.setData(self.distances, data.intra)
242 m = self.move_smooth_max.update(np.max(np.maximum(data.inter, data.intra)))
243 m = max(
244 m,
245 2
246 * np.maximum(
247 self.processor_config.intra_detection_threshold,
248 self.processor_config.inter_detection_threshold,
249 ),
250 )
251 self.move_plot.setYRange(0, m)
252 self.move_depth_line.setPos(movement_x)
253 self.move_depth_line.setVisible(bool(data.presence_detected))
254
255 self.set_present_text_y_pos(m)
256
257 if data.presence_detected:
258 present_text = "Presence detected at {:.0f} cm".format(movement_x * 100)
259 present_html = self.present_html_format.format(present_text)
260 self.present_text_item.setHtml(present_html)
261
262 self.present_text_item.show()
263 self.not_present_text_item.hide()
264 else:
265 self.present_text_item.hide()
266 self.not_present_text_item.show()
267
268 # Intra presence
269
270 move_hist_xs = np.linspace(-self.history_length_s, 0, self.history_length_n)
271
272 self.intra_history = np.roll(self.intra_history, -1)
273 self.intra_history[-1] = data.intra_presence_score
274
275 m_hist = max(
276 float(np.max(self.intra_history)),
277 self.processor_config.intra_detection_threshold * 1.05,
278 )
279 m_hist = self.intra_history_smooth_max.update(m_hist)
280
281 self.intra_hist_plot.setYRange(0, m_hist)
282 self.intra_hist_curve.setData(move_hist_xs, self.intra_history)
283
284 # Inter presence
285
286 self.inter_history = np.roll(self.inter_history, -1)
287 self.inter_history[-1] = data.inter_presence_score
288
289 m_hist = max(
290 float(np.max(self.inter_history)),
291 self.processor_config.inter_detection_threshold * 1.05,
292 )
293 m_hist = self.inter_history_smooth_max.update(m_hist)
294
295 self.inter_hist_plot.setYRange(0, m_hist)
296 self.inter_hist_curve.setData(move_hist_xs, self.inter_history)
297
298 def set_present_text_y_pos(self, y):
299 x_pos = self.distances[0] + (self.distances[-1] - self.distances[0]) / 2
300 self.present_text_item.setPos(x_pos, 0.95 * y)
301 self.not_present_text_item.setPos(x_pos, 0.95 * y)
302
303
304if __name__ == "__main__":
305 main()
View this example on GitHub: acconeer/acconeer-python-exploration