examples/a121/algo/smart_presence/processor.py#
1# Copyright (c) Acconeer AB, 2023-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7import numpy.typing as npt
8
9from PySide6 import QtCore
10
11import pyqtgraph as pg
12
13import acconeer.exptool as et
14from acconeer.exptool import a121
15from acconeer.exptool.a121.algo import presence, smart_presence
16
17
18def main():
19 args = a121.ExampleArgumentParser().parse_args()
20 et.utils.config_logging(args)
21
22 client = a121.Client.open(**a121.get_client_args(args))
23
24 processor_config = smart_presence.ProcessorConfig(
25 num_zones=3,
26 )
27
28 detector_config = presence.DetectorConfig(
29 start_m=1,
30 end_m=3,
31 )
32
33 presence_detector = presence.Detector(
34 client=client, sensor_id=1, detector_config=detector_config
35 )
36 presence_detector.start()
37
38 smart_presence_processor = smart_presence.Processor(
39 processor_config,
40 detector_config,
41 presence_detector.session_config,
42 presence_detector.detector_metadata,
43 )
44
45 pg_updater = PGUpdater(
46 detector_config,
47 processor_config,
48 presence_detector._get_sensor_config(detector_config),
49 presence_detector.estimated_frame_rate,
50 smart_presence_processor.zone_limits,
51 )
52 pg_process = et.PGProcess(pg_updater)
53 pg_process.start()
54
55 interrupt_handler = et.utils.ExampleInterruptHandler()
56 print("Press Ctrl-C to end session")
57
58 while not interrupt_handler.got_signal:
59 detector_result = presence_detector.get_next()
60 processor_result = smart_presence_processor.process(detector_result)
61 if detector_result.presence_detected:
62 print(f"Presence in zone {processor_result.max_presence_zone}")
63 else:
64 print("No presence")
65
66 data = {
67 "processor_result": processor_result,
68 "detector_result": detector_result,
69 }
70 try:
71 pg_process.put_data(data)
72 except et.PGProccessDiedException:
73 break
74
75 presence_detector.stop()
76
77 print("Disconnecting...")
78 client.close()
79
80
81class PGUpdater:
82 def __init__(
83 self,
84 detector_config: presence.DetectorConfig,
85 processor_config: smart_presence.ProcessorConfig,
86 sensor_config: a121.SensorConfig,
87 estimated_frame_rate: float,
88 zone_limits: npt.NDArray[np.float64],
89 ):
90 self.detector_config = detector_config
91
92 self.history_length_s = 5
93 self.estimated_frame_rate = estimated_frame_rate
94 self.history_length_n = int(round(self.history_length_s * estimated_frame_rate))
95 self.intra_history = np.zeros(self.history_length_n)
96 self.inter_history = np.zeros(self.history_length_n)
97
98 self.num_sectors = min(processor_config.num_zones, sensor_config.num_points)
99 self.sector_size = max(1, -(-sensor_config.num_points // self.num_sectors))
100
101 self.sector_offset = (self.num_sectors * self.sector_size - sensor_config.num_points) // 2
102 self.zone_limits = zone_limits
103
104 self.setup_is_done = False
105
106 def setup(self, win):
107 win.setWindowTitle("Acconeer smart presence example")
108
109 self.intra_limit_lines = []
110 self.inter_limit_lines = []
111
112 # Intra presence history plot
113
114 self.intra_hist_plot = win.addPlot(
115 row=0,
116 col=0,
117 title="Intra presence history (fast motions)",
118 )
119 self.intra_hist_plot.setMenuEnabled(False)
120 self.intra_hist_plot.setMouseEnabled(x=False, y=False)
121 self.intra_hist_plot.hideButtons()
122 self.intra_hist_plot.showGrid(x=True, y=True)
123 self.intra_hist_plot.setLabel("bottom", "Time (s)")
124 self.intra_hist_plot.setLabel("left", "Score")
125 self.intra_hist_plot.setXRange(-self.history_length_s, 0)
126 self.intra_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
127 self.intra_hist_plot.setYRange(0, 10)
128 if not self.detector_config.intra_enable:
129 intra_color = et.utils.color_cycler(1)
130 intra_color = f"{intra_color}50"
131 intra_dashed_pen = pg.mkPen(intra_color, width=2.5, style=QtCore.Qt.DashLine)
132 intra_pen = pg.mkPen(intra_color, width=2)
133 else:
134 intra_dashed_pen = et.utils.pg_pen_cycler(1, width=2.5, style="--")
135 intra_pen = et.utils.pg_pen_cycler(1)
136
137 self.intra_hist_curve = self.intra_hist_plot.plot(pen=intra_pen)
138 limit_line = pg.InfiniteLine(angle=0, pen=intra_dashed_pen)
139 self.intra_hist_plot.addItem(limit_line)
140 self.intra_limit_lines.append(limit_line)
141
142 for line in self.intra_limit_lines:
143 line.setPos(self.detector_config.intra_detection_threshold)
144
145 # Inter presence history plot
146
147 self.inter_hist_plot = win.addPlot(
148 row=0,
149 col=1,
150 title="Inter presence history (slow motions)",
151 )
152 self.inter_hist_plot.setMenuEnabled(False)
153 self.inter_hist_plot.setMouseEnabled(x=False, y=False)
154 self.inter_hist_plot.hideButtons()
155 self.inter_hist_plot.showGrid(x=True, y=True)
156 self.inter_hist_plot.setLabel("bottom", "Time (s)")
157 self.inter_hist_plot.setLabel("left", "Score")
158 self.inter_hist_plot.setXRange(-self.history_length_s, 0)
159 self.inter_history_smooth_max = et.utils.SmoothMax(self.estimated_frame_rate)
160 self.inter_hist_plot.setYRange(0, 10)
161 if not self.detector_config.inter_enable:
162 inter_color = et.utils.color_cycler(0)
163 inter_color = f"{inter_color}50"
164 inter_dashed_pen = pg.mkPen(inter_color, width=2.5, style=QtCore.Qt.DashLine)
165 inter_pen = pg.mkPen(inter_color, width=2)
166 else:
167 inter_pen = et.utils.pg_pen_cycler(0)
168 inter_dashed_pen = et.utils.pg_pen_cycler(0, width=2.5, style="--")
169
170 self.inter_hist_curve = self.inter_hist_plot.plot(pen=inter_pen)
171 limit_line = pg.InfiniteLine(angle=0, pen=inter_dashed_pen)
172 self.inter_hist_plot.addItem(limit_line)
173 self.inter_limit_lines.append(limit_line)
174
175 for line in self.inter_limit_lines:
176 line.setPos(self.detector_config.inter_detection_threshold)
177
178 # Sector plot
179
180 self.sector_plot = pg.PlotItem(
181 title="Detection zone<br>Detection type: fast (orange), slow (blue), both (green)"
182 )
183 self.sector_plot.setAspectLocked()
184 self.sector_plot.hideAxis("left")
185 self.sector_plot.hideAxis("bottom")
186 self.sectors = []
187 self.limit_text = []
188
189 self.range_html = (
190 '<div style="text-align: center">'
191 '<span style="color: #000000;font-size:12pt;">'
192 "{}</span></div>"
193 )
194
195 pen = pg.mkPen("k", width=1)
196 span_deg = 25
197 for r in np.flip(np.arange(self.num_sectors) + 1):
198 sector = pg.QtWidgets.QGraphicsEllipseItem(-r, -r, r * 2, r * 2)
199 sector.setStartAngle(-16 * span_deg)
200 sector.setSpanAngle(16 * span_deg * 2)
201 sector.setPen(pen)
202 self.sector_plot.addItem(sector)
203 self.sectors.append(sector)
204
205 limit = pg.TextItem(html=self.range_html, anchor=(0.5, 0.5), angle=25)
206 x = r * np.cos(np.radians(span_deg))
207 y = r * np.sin(np.radians(span_deg))
208 limit.setPos(x, y + 0.25)
209 self.sector_plot.addItem(limit)
210 self.limit_text.append(limit)
211
212 self.sectors.reverse()
213
214 start_limit_text = pg.TextItem(html=self.range_html, anchor=(0.5, 0.5), angle=25)
215 range_html = self.range_html.format(f"{self.detector_config.start_m}")
216 start_limit_text.setHtml(range_html)
217 start_limit_text.setPos(0, 0.25)
218 self.sector_plot.addItem(start_limit_text)
219
220 unit_text = pg.TextItem(html=self.range_html, anchor=(0.5, 0.5))
221 unit_html = self.range_html.format("[m]")
222 unit_text.setHtml(unit_html)
223 unit_text.setPos(
224 self.num_sectors + 0.5, (self.num_sectors + 1) * np.sin(np.radians(span_deg))
225 )
226 self.sector_plot.addItem(unit_text)
227
228 for text_item, limit in zip(self.limit_text, np.flip(self.zone_limits)):
229 range_html = self.range_html.format(np.around(limit, 1))
230 text_item.setHtml(range_html)
231
232 sublayout = win.addLayout(row=1, col=0, colspan=2)
233 sublayout.layout.setColumnStretchFactor(0, 2)
234 sublayout.addItem(self.sector_plot, row=0, col=0)
235
236 self.setup_is_done = True
237
238 def update(self, data):
239 processor_result = data["processor_result"]
240 detector_result = data["detector_result"]
241
242 # Intra presence
243
244 move_hist_xs = np.linspace(-self.history_length_s, 0, self.history_length_n)
245
246 self.intra_history = np.roll(self.intra_history, -1)
247 self.intra_history[-1] = detector_result.intra_presence_score
248
249 m_hist = max(
250 float(np.max(self.intra_history)),
251 self.detector_config.intra_detection_threshold * 1.05,
252 )
253 m_hist = self.intra_history_smooth_max.update(m_hist)
254
255 self.intra_hist_plot.setYRange(0, m_hist)
256 self.intra_hist_curve.setData(move_hist_xs, self.intra_history)
257
258 # Inter presence
259
260 self.inter_history = np.roll(self.inter_history, -1)
261 self.inter_history[-1] = detector_result.inter_presence_score
262
263 m_hist = max(
264 float(np.max(self.inter_history)),
265 self.detector_config.inter_detection_threshold * 1.05,
266 )
267 m_hist = self.inter_history_smooth_max.update(m_hist)
268
269 self.inter_hist_plot.setYRange(0, m_hist)
270 self.inter_hist_curve.setData(move_hist_xs, self.inter_history)
271
272 # Sector
273
274 brush = et.utils.pg_brush_cycler(7)
275 for sector in self.sectors:
276 sector.setBrush(brush)
277
278 if detector_result.presence_detected:
279 assert processor_result.max_presence_zone is not None
280 if processor_result.max_presence_zone == processor_result.max_intra_zone:
281 self.sectors[processor_result.max_presence_zone].setBrush(
282 et.utils.pg_brush_cycler(1)
283 )
284 else:
285 self.sectors[processor_result.max_presence_zone].setBrush(
286 et.utils.pg_brush_cycler(0)
287 )
288
289
290if __name__ == "__main__":
291 main()
View this example on GitHub: acconeer/acconeer-python-exploration