examples/a121/algo/waste_level/processor.py#
1# Copyright (c) Acconeer AB, 2024
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8from PySide6 import QtCore
9
10import pyqtgraph as pg
11
12import acconeer.exptool as et
13from acconeer.exptool import a121
14from acconeer.exptool.a121.algo._utils import APPROX_BASE_STEP_LENGTH_M, get_distances_m
15from acconeer.exptool.a121.algo.waste_level import (
16 Processor,
17 ProcessorConfig,
18 ProcessorResult,
19 get_sensor_config,
20)
21
22
23def main():
24 args = a121.ExampleArgumentParser().parse_args()
25 et.utils.config_logging(args)
26
27 client = a121.Client.open(**a121.get_client_args(args))
28
29 processor_config = ProcessorConfig()
30
31 sensor_config = get_sensor_config()
32
33 metadata = client.setup_session(sensor_config)
34 client.start_session()
35
36 processor = Processor(
37 sensor_config=sensor_config,
38 metadata=metadata,
39 processor_config=processor_config,
40 )
41
42 pg_updater = PGUpdater(
43 metadata=metadata, sensor_config=sensor_config, processor_config=processor_config
44 )
45 pg_process = et.PGProcess(pg_updater)
46 pg_process.start()
47
48 interrupt_handler = et.utils.ExampleInterruptHandler()
49 print("Press Ctrl-C to end session")
50
51 while not interrupt_handler.got_signal:
52 result = client.get_next()
53 processor_result = processor.process(result)
54 try:
55 pg_process.put_data(processor_result)
56 except et.PGProccessDiedException:
57 break
58
59 print("Disconnecting...")
60 pg_process.close()
61 client.stop_session()
62 client.close()
63
64
65class PGUpdater:
66 def __init__(self, metadata, sensor_config, processor_config):
67 self.metadata = metadata
68 self.sensor_config = sensor_config
69 self.processor_config = processor_config
70
71 def setup(self, win) -> None:
72 # Phase standard deviation plot
73 self.phase_std_plot = self._create_phase_std_plot(win)
74 pen = et.utils.pg_pen_cycler(0)
75 brush = et.utils.pg_brush_cycler(0)
76 self.phase_std_curve = self.phase_std_plot.plot(pen=pen)
77 self.phase_std_dots_above = pg.ScatterPlotItem(symbol="o", size=10, brush=brush, pen="k")
78 self.phase_std_plot.addItem(self.phase_std_dots_above)
79 brush = et.utils.pg_brush_cycler(1)
80 self.phase_std_dots_below = pg.ScatterPlotItem(symbol="o", size=10, brush=brush, pen="k")
81 self.phase_std_plot.addItem(self.phase_std_dots_below)
82 brush = et.utils.pg_brush_cycler(2)
83 self.phase_std_dots_detection = pg.ScatterPlotItem(
84 symbol="o", size=10, brush=brush, pen="k"
85 )
86 self.phase_std_plot.addItem(self.phase_std_dots_detection)
87
88 dashed_pen = pg.mkPen("k", width=2.5, style=QtCore.Qt.PenStyle.DashLine)
89 threshold_line = pg.InfiniteLine(
90 pos=self.processor_config.threshold, angle=0, pen=dashed_pen
91 )
92 self.phase_std_plot.addItem(threshold_line)
93
94 vertical_line_start = pg.InfiniteLine(
95 pos=self.processor_config.bin_start_m,
96 pen=et.utils.pg_pen_cycler(7),
97 label="Bin top",
98 labelOpts={
99 "position": 0.6,
100 "color": (0, 100, 0),
101 "fill": (200, 200, 200, 50),
102 "movable": True,
103 },
104 )
105 self.phase_std_plot.addItem(vertical_line_start)
106
107 vertical_line_end = pg.InfiniteLine(
108 pos=self.processor_config.bin_end_m,
109 pen=et.utils.pg_pen_cycler(7),
110 label="Bin bottom",
111 labelOpts={
112 "position": 0.6,
113 "color": (0, 100, 0),
114 "fill": (200, 200, 200, 50),
115 "movable": True,
116 },
117 )
118 self.phase_std_plot.addItem(vertical_line_end)
119
120 self.level_line = pg.InfiniteLine(
121 pen=et.utils.pg_pen_cycler(2),
122 label="Fill level",
123 labelOpts={
124 "position": 0.8,
125 "color": (0, 100, 0),
126 "fill": (200, 200, 200, 50),
127 "movable": True,
128 },
129 )
130 self.phase_std_plot.addItem(self.level_line)
131 self.level_line.hide()
132
133 self.distances_m = get_distances_m(self.sensor_config, self.metadata)
134 self.threshold = self.processor_config.threshold
135 self.sequence_ones = np.ones(self.processor_config.distance_sequence_n)
136
137 # Level history plot
138 self.level_history_plot = self._create_history_plot(win)
139
140 if self.sensor_config.frame_rate is not None:
141 history_length_s = 5
142 history_length_n = int(round(history_length_s * self.sensor_config.frame_rate))
143 self.hist_xs = np.linspace(-history_length_s, 0, history_length_n)
144 else:
145 history_length_n = 100
146 self.hist_xs = np.linspace(-history_length_n, 0, history_length_n)
147 self.level_history_plot.setLabel("bottom", "Frame")
148
149 self.level_history = np.full(history_length_n, np.nan)
150 self.level_history_plot.setYRange(
151 0,
152 self.processor_config.bin_end_m
153 - np.minimum(
154 self.processor_config.bin_start_m,
155 self.sensor_config.subsweeps[0].start_point * APPROX_BASE_STEP_LENGTH_M,
156 ),
157 )
158
159 pen = et.utils.pg_pen_cycler(0)
160 brush = et.utils.pg_brush_cycler(0)
161 symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
162 feat_kw = dict(pen=pen, **symbol_kw)
163 self.level_history_curve = self.level_history_plot.plot(**feat_kw, connect="finite")
164
165 top_bin_horizontal_line = pg.InfiniteLine(
166 pos=self.processor_config.bin_end_m - self.processor_config.bin_start_m,
167 pen=et.utils.pg_pen_cycler(7),
168 angle=0,
169 label="Bin top",
170 labelOpts={
171 "position": 0.5,
172 "color": (0, 100, 0),
173 "fill": (200, 200, 200, 50),
174 "movable": True,
175 },
176 )
177 self.level_history_plot.addItem(top_bin_horizontal_line)
178
179 bottom_bin_horizontal_line = pg.InfiniteLine(
180 pos=0,
181 pen=et.utils.pg_pen_cycler(7),
182 angle=0,
183 label="Bin bottom",
184 labelOpts={
185 "position": 0.5,
186 "color": (0, 100, 0),
187 "fill": (200, 200, 200, 50),
188 "movable": True,
189 },
190 )
191 self.level_history_plot.addItem(bottom_bin_horizontal_line)
192
193 # Level plot
194 self.num_rects = 16
195 self.rect_plot = pg.PlotItem()
196 self.rect_plot.setAspectLocked()
197 self.rect_plot.hideAxis("left")
198 self.rect_plot.hideAxis("bottom")
199 self.rects = []
200
201 pen = pg.mkPen(None)
202 rect_width = self.num_rects / 2.0
203 for r in np.arange(self.num_rects) + 1:
204 rect = pg.QtWidgets.QGraphicsRectItem(0, r, rect_width, 1)
205 rect.setPen(pen)
206 self.rect_plot.addItem(rect)
207 self.rects.append(rect)
208
209 self.level_html_format = (
210 '<div style="text-align: center">'
211 '<span style="color: #FFFFFF;font-size:12pt;">'
212 "{}</span></div>"
213 )
214
215 self.level_text_item = pg.TextItem(
216 fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
217 anchor=(0.5, 0),
218 )
219
220 no_detection_html = (
221 '<div style="text-align: center">'
222 '<span style="color: #FFFFFF;font-size:12pt;">'
223 "{}</span></div>".format("No detection")
224 )
225
226 self.no_detection_text_item = pg.TextItem(
227 html=no_detection_html,
228 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
229 anchor=(0.5, 0),
230 )
231
232 self.rect_plot.addItem(self.level_text_item)
233 self.rect_plot.addItem(self.no_detection_text_item)
234 self.level_text_item.setPos(self.num_rects / 4.0, self.num_rects + 4.0)
235 self.level_text_item.hide()
236 self.no_detection_text_item.setPos(self.num_rects / 4.0, self.num_rects + 4.0)
237 self.no_detection_text_item.show()
238
239 win.addItem(self.rect_plot, row=0, col=0)
240 self.win = win
241
242 def update(self, processor_result: ProcessorResult) -> None:
243 # Phase standard deviation plot
244 self.phase_std_curve.setData(self.distances_m, processor_result.extra_result.phase_std)
245 if processor_result.extra_result.distance_m is not None:
246 self.level_line.setPos(processor_result.extra_result.distance_m)
247 self.level_line.show()
248 else:
249 self.level_line.hide()
250
251 detection_array = processor_result.extra_result.phase_std < self.threshold
252 if np.all(detection_array):
253 self.phase_std_dots_below.setData(
254 self.distances_m, processor_result.extra_result.phase_std
255 )
256 self.phase_std_dots_above.hide()
257 self.phase_std_dots_detection.hide()
258 elif np.any(detection_array):
259 above_idxs = np.argwhere(~detection_array)
260 above_idxs = above_idxs.reshape(above_idxs.shape[0])
261 below_idxs = np.argwhere(detection_array)
262 below_idxs = below_idxs.reshape(below_idxs.shape[0])
263 consecutive_true_indices = np.where(
264 np.convolve(detection_array, self.sequence_ones, mode="valid")
265 == self.sequence_ones.shape[0]
266 )[0]
267 detection_idxs = []
268 for i in consecutive_true_indices:
269 for j in np.arange(i, i + self.sequence_ones.shape[0]):
270 if j not in detection_idxs and j < detection_array.shape[0]:
271 detection_idxs.append(j)
272
273 remove = np.isin(below_idxs, detection_idxs)
274 below_idxs = below_idxs[~remove]
275
276 self.phase_std_dots_above.setData(
277 self.distances_m[above_idxs], processor_result.extra_result.phase_std[above_idxs]
278 )
279 self.phase_std_dots_below.setData(
280 self.distances_m[below_idxs], processor_result.extra_result.phase_std[below_idxs]
281 )
282 if len(detection_idxs) > 0:
283 self.phase_std_dots_detection.setData(
284 self.distances_m[detection_idxs],
285 processor_result.extra_result.phase_std[detection_idxs],
286 )
287 self.phase_std_dots_detection.show()
288 else:
289 self.phase_std_dots_detection.hide()
290
291 self.phase_std_dots_above.show()
292 self.phase_std_dots_below.show()
293 else:
294 self.phase_std_dots_above.setData(
295 self.distances_m, processor_result.extra_result.phase_std
296 )
297 self.phase_std_dots_below.hide()
298 self.phase_std_dots_detection.hide()
299
300 # History plot
301
302 self.level_history = np.roll(self.level_history, -1)
303 if processor_result.level_m is not None:
304 self.level_history[-1] = processor_result.level_m
305 else:
306 self.level_history[-1] = np.nan
307
308 if np.all(np.isnan(self.level_history)):
309 self.level_history_curve.hide()
310 else:
311 self.level_history_curve.setData(self.hist_xs, self.level_history)
312 self.level_history_curve.show()
313
314 # Level plot
315
316 # Show the percentage level plot if the plot width is greater than 600 pixels,
317 # otherwise display the level as text.
318 if self.win.width() < 600:
319 if processor_result.level_percent is None:
320 self.level_text_item.hide()
321 self.no_detection_text_item.show()
322 elif processor_result.level_percent > 100: # Overflow
323 level_text = "Overflow"
324 level_html = self.level_html_format.format(level_text)
325 self.level_text_item.setHtml(level_html)
326 self.level_text_item.show()
327 self.no_detection_text_item.hide()
328 elif processor_result.level_percent > 0: # In bin detection
329 assert processor_result.level_m is not None
330 assert processor_result.level_percent is not None
331 level_text = "Level: {:.2f} m, {:.0f} %".format(
332 processor_result.level_m,
333 processor_result.level_percent,
334 )
335 level_html = self.level_html_format.format(level_text)
336 self.level_text_item.setHtml(level_html)
337 self.level_text_item.show()
338 self.no_detection_text_item.hide()
339 else: # No detection
340 self.level_text_item.hide()
341 self.no_detection_text_item.show()
342
343 for rect in self.rects:
344 rect.setVisible(False)
345 else:
346 if processor_result.level_percent is None: # No detection
347 level_text = "No detection"
348 for rect in self.rects:
349 rect.setBrush(et.utils.pg_brush_cycler(7))
350 self.level_text_item.hide()
351 self.no_detection_text_item.show()
352 elif processor_result.level_percent > 100: # Overflow
353 for rect in self.rects:
354 rect.setBrush(et.utils.pg_brush_cycler(0))
355
356 level_text = "Overflow"
357 level_html = self.level_html_format.format(level_text)
358 self.level_text_item.setHtml(level_html)
359 self.level_text_item.show()
360 self.no_detection_text_item.hide()
361 else: # In bin detection
362 self.bar_loc = int(
363 np.around(processor_result.level_percent / 100 * self.num_rects)
364 )
365 for rect in self.rects[: self.bar_loc]:
366 rect.setBrush(et.utils.pg_brush_cycler(0))
367
368 for rect in self.rects[self.bar_loc :]:
369 rect.setBrush(et.utils.pg_brush_cycler(7))
370
371 assert processor_result.level_m is not None
372 assert processor_result.level_percent is not None
373 level_text = "Level: {:.2f} m, {:.0f} %".format(
374 processor_result.level_m,
375 processor_result.level_percent,
376 )
377 level_html = self.level_html_format.format(level_text)
378 self.level_text_item.setHtml(level_html)
379 self.level_text_item.show()
380 self.no_detection_text_item.hide()
381
382 for rect in self.rects:
383 rect.setVisible(True)
384
385 @staticmethod
386 def _create_phase_std_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
387 phase_std_plot = parent.addPlot(row=1, col=0, colspan=3)
388 phase_std_plot.setTitle("Phase standard deviation")
389 phase_std_plot.setLabel(axis="bottom", text="Distance [m]")
390 phase_std_plot.setLabel("left", "Phase std")
391 phase_std_plot.setMenuEnabled(False)
392 phase_std_plot.setMouseEnabled(x=False, y=False)
393 phase_std_plot.hideButtons()
394 phase_std_plot.showGrid(x=True, y=True, alpha=0.5)
395 phase_std_plot.setYRange(0, 4)
396
397 return phase_std_plot
398
399 @staticmethod
400 def _create_history_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
401 history_plot = parent.addPlot(row=0, col=1, colspan=2)
402 history_plot.setTitle("Level history")
403 history_plot.setMenuEnabled(False)
404 history_plot.showGrid(x=True, y=True)
405 history_plot.setLabel("left", "Estimated level (m)")
406 history_plot.setLabel("bottom", "Time (s)")
407
408 return history_plot
409
410
411if __name__ == "__main__":
412 main()
View this example on GitHub: acconeer/acconeer-python-exploration