examples/a121/algo/touchless_button/processor.py#
1# Copyright (c) Acconeer AB, 2022-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8import pyqtgraph as pg
9
10import acconeer.exptool as et
11from acconeer.exptool import a121
12from acconeer.exptool.a121.algo.touchless_button import (
13 MeasurementType,
14 Processor,
15 ProcessorConfig,
16 ProcessorResult,
17 get_close_sensor_config,
18)
19
20
21def main():
22 args = a121.ExampleArgumentParser().parse_args()
23 et.utils.config_logging(args)
24
25 client = a121.Client.open(**a121.get_client_args(args))
26
27 processor_config = ProcessorConfig()
28
29 sensor_config = get_close_sensor_config()
30
31 metadata = client.setup_session(sensor_config)
32 client.start_session()
33
34 processor = Processor(
35 sensor_config=sensor_config,
36 metadata=metadata,
37 processor_config=processor_config,
38 )
39
40 pg_updater = PGUpdater(sensor_config=sensor_config, processor_config=processor_config)
41 pg_process = et.PGProcess(pg_updater)
42 pg_process.start()
43
44 interrupt_handler = et.utils.ExampleInterruptHandler()
45 print("Press Ctrl-C to end session")
46
47 while not interrupt_handler.got_signal:
48 result = client.get_next()
49 processor_result = processor.process(result)
50 try:
51 pg_process.put_data(processor_result)
52 except et.PGProccessDiedException:
53 break
54
55 print("Disconnecting...")
56 pg_process.close()
57 client.stop_session()
58 client.close()
59
60
61class PGUpdater:
62 def __init__(self, sensor_config, processor_config):
63 self.detection_history = np.full((2, 100), np.nan)
64 self.sensor_config = sensor_config
65 self.processor_config = processor_config
66
67 def setup(self, win):
68 self.detection_history_plot = self._create_detection_plot(win)
69
70 self.detection_history_curve_close = self.detection_history_plot.plot(
71 pen=et.utils.pg_pen_cycler(1, width=5)
72 )
73 self.detection_history_curve_far = self.detection_history_plot.plot(
74 pen=et.utils.pg_pen_cycler(0, width=5)
75 )
76
77 close_html = (
78 '<div style="text-align: center">'
79 '<span style="color: #FFFFFF;font-size:15pt;">'
80 "{}</span></div>".format("Close detection")
81 )
82 far_html = (
83 '<div style="text-align: center">'
84 '<span style="color: #FFFFFF;font-size:15pt;">'
85 "{}</span></div>".format("Far detection")
86 )
87 self.close_text_item = pg.TextItem(
88 html=close_html,
89 fill=pg.mkColor(0xFF, 0x7F, 0x0E),
90 anchor=(0.5, 0),
91 )
92 self.far_text_item = pg.TextItem(
93 html=far_html,
94 fill=pg.mkColor(0x1F, 0x77, 0xB4),
95 anchor=(0.5, 0),
96 )
97 pos_left = (100 / 3, 1.8)
98 pos_right = (2 * 100 / 3, 1.8)
99 self.close_text_item.setPos(*pos_left)
100 self.far_text_item.setPos(*pos_right)
101 self.detection_history_plot.addItem(self.close_text_item)
102 self.detection_history_plot.addItem(self.far_text_item)
103 self.close_text_item.hide()
104 self.far_text_item.hide()
105
106 self.detection_history = np.full((2, 100), np.nan)
107
108 self.score_history_plot = self._create_score_plot(win)
109 score_plot_legend = self.score_history_plot.legend
110 self.score_smooth_max = et.utils.SmoothMax()
111
112 self.threshold_history_curve_close = self.score_history_plot.plot(
113 pen=et.utils.pg_pen_cycler(1, width=2.5, style="--"),
114 )
115 self.threshold_history_curve_far = self.score_history_plot.plot(
116 pen=et.utils.pg_pen_cycler(0, width=2.5, style="--"),
117 )
118
119 self.threshold_history = np.full((2, 100), np.nan)
120
121 cycle_index = 2 # To not have same colors as thresholds
122 if self.processor_config.measurement_type == MeasurementType.CLOSE_RANGE:
123 measurement_type = "Close"
124 score_plot_legend.addItem(self.threshold_history_curve_close, "Close range threshold")
125 elif self.processor_config.measurement_type == MeasurementType.FAR_RANGE:
126 measurement_type = "Far"
127 score_plot_legend.addItem(self.threshold_history_curve_far, "Far range threshold")
128
129 if self.processor_config.measurement_type != MeasurementType.CLOSE_AND_FAR_RANGE:
130 score_history = np.full((self.sensor_config.subsweep.num_points, 100), np.nan)
131 score_history_curves = np.empty(
132 (self.sensor_config.subsweep.num_points,), dtype=object
133 )
134 for i in range(self.sensor_config.subsweep.num_points):
135 score_history_curves[i] = pg.ScatterPlotItem(
136 brush=et.utils.pg_brush_cycler(cycle_index),
137 name=f"{measurement_type} range, point {i}",
138 )
139 self.score_history_plot.addItem(score_history_curves[i])
140 cycle_index += 1
141
142 if self.processor_config.measurement_type == MeasurementType.CLOSE_RANGE:
143 self.score_history_close = score_history
144 self.score_history_curves_close = score_history_curves
145 self.score_history_far = None
146 self.score_history_curves_far = None
147 elif self.processor_config.measurement_type == MeasurementType.FAR_RANGE:
148 self.score_history_close = None
149 self.score_history_curves_close = None
150 self.score_history_far = score_history
151 self.score_history_curves_far = score_history_curves
152
153 elif self.processor_config.measurement_type == MeasurementType.CLOSE_AND_FAR_RANGE:
154 score_plot_legend.addItem(self.threshold_history_curve_close, "Close range threshold")
155 score_plot_legend.addItem(self.threshold_history_curve_far, "Far range threshold")
156 self.score_history_close = np.full(
157 (self.sensor_config.subsweeps[0].num_points, 100), np.nan
158 )
159 self.score_history_far = np.full(
160 (self.sensor_config.subsweeps[1].num_points, 100), np.nan
161 )
162 self.score_history_curves_close = np.empty(
163 (self.sensor_config.subsweeps[0].num_points,), dtype=object
164 )
165 self.score_history_curves_far = np.empty(
166 (self.sensor_config.subsweeps[1].num_points,), dtype=object
167 )
168
169 range_labels = ["Close", "Far"]
170 for n, subsweep in enumerate(self.sensor_config.subsweeps):
171 measurement_type = range_labels[n]
172 score_history_curve_list = [
173 self.score_history_curves_close,
174 self.score_history_curves_far,
175 ]
176 for i in range(subsweep.num_points):
177 score_history_curve_list[n][i] = pg.ScatterPlotItem(
178 brush=et.utils.pg_brush_cycler(cycle_index),
179 name=f"{measurement_type} range, point {i}",
180 )
181 self.score_history_plot.addItem(score_history_curve_list[n][i])
182 cycle_index += 1
183 self.score_history_curves_close = score_history_curve_list[0]
184 self.score_history_curves_far = score_history_curve_list[1]
185
186 def update(self, processor_result: ProcessorResult):
187 def is_none_or_detection(x):
188 return x.detection if x is not None else None
189
190 detection = np.array(
191 [
192 is_none_or_detection(processor_result.close),
193 is_none_or_detection(processor_result.far),
194 ]
195 )
196 self.detection_history = np.roll(self.detection_history, -1, axis=1)
197 self.detection_history[:, -1] = detection
198
199 self.detection_history_curve_close.setData(self.detection_history[0])
200 self.detection_history_curve_far.setData(self.detection_history[1])
201
202 if processor_result.close is not None:
203 if processor_result.close.detection:
204 self.close_text_item.show()
205 else:
206 self.close_text_item.hide()
207
208 if processor_result.far is not None:
209 if processor_result.far.detection:
210 self.far_text_item.show()
211 else:
212 self.far_text_item.hide()
213
214 max_val = 0.0
215
216 def is_none_or_threshold(x):
217 return x.threshold if x is not None else None
218
219 threshold = np.array(
220 [
221 is_none_or_threshold(processor_result.close),
222 is_none_or_threshold(processor_result.far),
223 ]
224 )
225 if np.nanmax(np.array(threshold, dtype=float)) > max_val:
226 max_val = np.nanmax(np.array(threshold, dtype=float))
227 self.threshold_history = np.roll(self.threshold_history, -1, axis=1)
228 self.threshold_history[:, -1] = threshold
229
230 self.threshold_history_curve_close.setData(self.threshold_history[0])
231 self.threshold_history_curve_far.setData(self.threshold_history[1])
232
233 if self.score_history_close is not None:
234 self.score_history_close = np.roll(self.score_history_close, -1, axis=1)
235 assert processor_result.close is not None
236 # Plot the second highest score
237 self.score_history_close[:, -1] = np.sort(processor_result.close.score, axis=0)[-2, :]
238
239 assert self.score_history_curves_close is not None
240 for i, curve in enumerate(self.score_history_curves_close):
241 # Assign x-values so that setData() doesn't give error when y-values are NaN
242 curve.setData(np.arange(0, 100), self.score_history_close[i, :].flatten())
243
244 if np.max(processor_result.close.score) > max_val:
245 max_val = np.max(processor_result.close.score)
246
247 if self.score_history_far is not None:
248 self.score_history_far = np.roll(self.score_history_far, -1, axis=1)
249 assert processor_result.far is not None
250 # Plot the second highest score
251 self.score_history_far[:, -1] = np.sort(processor_result.far.score, axis=0)[-2, :]
252
253 assert self.score_history_curves_far is not None
254 for i, curve in enumerate(self.score_history_curves_far):
255 # Assign x-values so that setData() doesn't give error when y-values are NaN
256 curve.setData(np.arange(0, 100), self.score_history_far[i, :].flatten())
257
258 if np.max(processor_result.far.score) > max_val:
259 max_val = np.max(processor_result.far.score)
260
261 if max_val != 0.0:
262 self.score_history_plot.setYRange(0.0, self.score_smooth_max.update(max_val))
263
264 @staticmethod
265 def _create_detection_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
266 detection_history_plot = parent.addPlot(row=0, col=0)
267 detection_history_plot.setTitle("Detection")
268 detection_history_plot.setLabel(axis="bottom", text="Frames")
269 detection_history_plot.setMenuEnabled(False)
270 detection_history_plot.setMouseEnabled(x=False, y=False)
271 detection_history_plot.hideButtons()
272 detection_history_plot.showGrid(x=True, y=True, alpha=0.5)
273 detection_history_plot.setYRange(-0.1, 1.8)
274 return detection_history_plot
275
276 @staticmethod
277 def _create_score_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
278 score_history_plot = parent.addPlot(row=1, col=0)
279 score_history_plot.setTitle("Detection score")
280 score_history_plot.setLabel(axis="bottom", text="Frames")
281 score_history_plot.addLegend()
282 score_history_plot.setMenuEnabled(False)
283 score_history_plot.setMouseEnabled(x=False, y=False)
284 score_history_plot.hideButtons()
285 score_history_plot.showGrid(x=True, y=True, alpha=0.5)
286 return score_history_plot
287
288
289if __name__ == "__main__":
290 main()
View this example on GitHub: acconeer/acconeer-python-exploration