examples/a121/algo/waste_level/processor.py

examples/a121/algo/waste_level/processor.py#

  1# Copyright (c) Acconeer AB, 2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8from PySide6 import QtCore
  9
 10import pyqtgraph as pg
 11
 12import acconeer.exptool as et
 13from acconeer.exptool import a121
 14from acconeer.exptool.a121.algo._utils import APPROX_BASE_STEP_LENGTH_M, get_distances_m
 15from acconeer.exptool.a121.algo.waste_level import (
 16    Processor,
 17    ProcessorConfig,
 18    ProcessorResult,
 19    get_sensor_config,
 20)
 21
 22
 23def main():
 24    args = a121.ExampleArgumentParser().parse_args()
 25    et.utils.config_logging(args)
 26
 27    client = a121.Client.open(**a121.get_client_args(args))
 28
 29    processor_config = ProcessorConfig()
 30
 31    sensor_config = get_sensor_config()
 32
 33    metadata = client.setup_session(sensor_config)
 34    client.start_session()
 35
 36    processor = Processor(
 37        sensor_config=sensor_config,
 38        metadata=metadata,
 39        processor_config=processor_config,
 40    )
 41
 42    pg_updater = PGUpdater(
 43        metadata=metadata, sensor_config=sensor_config, processor_config=processor_config
 44    )
 45    pg_process = et.PGProcess(pg_updater)
 46    pg_process.start()
 47
 48    interrupt_handler = et.utils.ExampleInterruptHandler()
 49    print("Press Ctrl-C to end session")
 50
 51    while not interrupt_handler.got_signal:
 52        result = client.get_next()
 53        processor_result = processor.process(result)
 54        try:
 55            pg_process.put_data(processor_result)
 56        except et.PGProccessDiedException:
 57            break
 58
 59    print("Disconnecting...")
 60    pg_process.close()
 61    client.stop_session()
 62    client.close()
 63
 64
 65class PGUpdater:
 66    def __init__(self, metadata, sensor_config, processor_config):
 67        self.metadata = metadata
 68        self.sensor_config = sensor_config
 69        self.processor_config = processor_config
 70
 71    def setup(self, win) -> None:
 72        # Phase standard deviation plot
 73        self.phase_std_plot = self._create_phase_std_plot(win)
 74        pen = et.utils.pg_pen_cycler(0)
 75        brush = et.utils.pg_brush_cycler(0)
 76        self.phase_std_curve = self.phase_std_plot.plot(pen=pen)
 77        self.phase_std_dots_above = pg.ScatterPlotItem(symbol="o", size=10, brush=brush, pen="k")
 78        self.phase_std_plot.addItem(self.phase_std_dots_above)
 79        brush = et.utils.pg_brush_cycler(1)
 80        self.phase_std_dots_below = pg.ScatterPlotItem(symbol="o", size=10, brush=brush, pen="k")
 81        self.phase_std_plot.addItem(self.phase_std_dots_below)
 82        brush = et.utils.pg_brush_cycler(2)
 83        self.phase_std_dots_detection = pg.ScatterPlotItem(
 84            symbol="o", size=10, brush=brush, pen="k"
 85        )
 86        self.phase_std_plot.addItem(self.phase_std_dots_detection)
 87
 88        dashed_pen = pg.mkPen("k", width=2.5, style=QtCore.Qt.PenStyle.DashLine)
 89        threshold_line = pg.InfiniteLine(
 90            pos=self.processor_config.threshold, angle=0, pen=dashed_pen
 91        )
 92        self.phase_std_plot.addItem(threshold_line)
 93
 94        vertical_line_start = pg.InfiniteLine(
 95            pos=self.processor_config.bin_start_m,
 96            pen=et.utils.pg_pen_cycler(7),
 97            label="Bin top",
 98            labelOpts={
 99                "position": 0.6,
100                "color": (0, 100, 0),
101                "fill": (200, 200, 200, 50),
102                "movable": True,
103            },
104        )
105        self.phase_std_plot.addItem(vertical_line_start)
106
107        vertical_line_end = pg.InfiniteLine(
108            pos=self.processor_config.bin_end_m,
109            pen=et.utils.pg_pen_cycler(7),
110            label="Bin bottom",
111            labelOpts={
112                "position": 0.6,
113                "color": (0, 100, 0),
114                "fill": (200, 200, 200, 50),
115                "movable": True,
116            },
117        )
118        self.phase_std_plot.addItem(vertical_line_end)
119
120        self.level_line = pg.InfiniteLine(
121            pen=et.utils.pg_pen_cycler(2),
122            label="Fill level",
123            labelOpts={
124                "position": 0.8,
125                "color": (0, 100, 0),
126                "fill": (200, 200, 200, 50),
127                "movable": True,
128            },
129        )
130        self.phase_std_plot.addItem(self.level_line)
131        self.level_line.hide()
132
133        self.distances_m = get_distances_m(self.sensor_config, self.metadata)
134        self.threshold = self.processor_config.threshold
135        self.sequence_ones = np.ones(self.processor_config.distance_sequence_n)
136
137        # Level history plot
138        self.level_history_plot = self._create_history_plot(win)
139
140        if self.sensor_config.frame_rate is not None:
141            history_length_s = 5
142            history_length_n = int(round(history_length_s * self.sensor_config.frame_rate))
143            self.hist_xs = np.linspace(-history_length_s, 0, history_length_n)
144        else:
145            history_length_n = 100
146            self.hist_xs = np.linspace(-history_length_n, 0, history_length_n)
147            self.level_history_plot.setLabel("bottom", "Frame")
148
149        self.level_history = np.full(history_length_n, np.nan)
150        self.level_history_plot.setYRange(
151            0,
152            self.processor_config.bin_end_m
153            - np.minimum(
154                self.processor_config.bin_start_m,
155                self.sensor_config.subsweeps[0].start_point * APPROX_BASE_STEP_LENGTH_M,
156            ),
157        )
158
159        pen = et.utils.pg_pen_cycler(0)
160        brush = et.utils.pg_brush_cycler(0)
161        symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
162        feat_kw = dict(pen=pen, **symbol_kw)
163        self.level_history_curve = self.level_history_plot.plot(**feat_kw, connect="finite")
164
165        top_bin_horizontal_line = pg.InfiniteLine(
166            pos=self.processor_config.bin_end_m - self.processor_config.bin_start_m,
167            pen=et.utils.pg_pen_cycler(7),
168            angle=0,
169            label="Bin top",
170            labelOpts={
171                "position": 0.5,
172                "color": (0, 100, 0),
173                "fill": (200, 200, 200, 50),
174                "movable": True,
175            },
176        )
177        self.level_history_plot.addItem(top_bin_horizontal_line)
178
179        bottom_bin_horizontal_line = pg.InfiniteLine(
180            pos=0,
181            pen=et.utils.pg_pen_cycler(7),
182            angle=0,
183            label="Bin bottom",
184            labelOpts={
185                "position": 0.5,
186                "color": (0, 100, 0),
187                "fill": (200, 200, 200, 50),
188                "movable": True,
189            },
190        )
191        self.level_history_plot.addItem(bottom_bin_horizontal_line)
192
193        # Level plot
194        self.num_rects = 16
195        self.rect_plot = pg.PlotItem()
196        self.rect_plot.setAspectLocked()
197        self.rect_plot.hideAxis("left")
198        self.rect_plot.hideAxis("bottom")
199        self.rects = []
200
201        pen = pg.mkPen(None)
202        rect_width = self.num_rects / 2.0
203        for r in np.arange(self.num_rects) + 1:
204            rect = pg.QtWidgets.QGraphicsRectItem(0, r, rect_width, 1)
205            rect.setPen(pen)
206            self.rect_plot.addItem(rect)
207            self.rects.append(rect)
208
209        self.level_html_format = (
210            '<div style="text-align: center">'
211            '<span style="color: #FFFFFF;font-size:12pt;">'
212            "{}</span></div>"
213        )
214
215        self.level_text_item = pg.TextItem(
216            fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
217            anchor=(0.5, 0),
218        )
219
220        no_detection_html = (
221            '<div style="text-align: center">'
222            '<span style="color: #FFFFFF;font-size:12pt;">'
223            "{}</span></div>".format("No detection")
224        )
225
226        self.no_detection_text_item = pg.TextItem(
227            html=no_detection_html,
228            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
229            anchor=(0.5, 0),
230        )
231
232        self.rect_plot.addItem(self.level_text_item)
233        self.rect_plot.addItem(self.no_detection_text_item)
234        self.level_text_item.setPos(self.num_rects / 4.0, self.num_rects + 4.0)
235        self.level_text_item.hide()
236        self.no_detection_text_item.setPos(self.num_rects / 4.0, self.num_rects + 4.0)
237        self.no_detection_text_item.show()
238
239        win.addItem(self.rect_plot, row=0, col=0)
240        self.win = win
241
242    def update(self, processor_result: ProcessorResult) -> None:
243        # Phase standard deviation plot
244        self.phase_std_curve.setData(self.distances_m, processor_result.extra_result.phase_std)
245        if processor_result.extra_result.distance_m is not None:
246            self.level_line.setPos(processor_result.extra_result.distance_m)
247            self.level_line.show()
248        else:
249            self.level_line.hide()
250
251        detection_array = processor_result.extra_result.phase_std < self.threshold
252        if np.all(detection_array):
253            self.phase_std_dots_below.setData(
254                self.distances_m, processor_result.extra_result.phase_std
255            )
256            self.phase_std_dots_above.hide()
257            self.phase_std_dots_detection.hide()
258        elif np.any(detection_array):
259            above_idxs = np.argwhere(~detection_array)
260            above_idxs = above_idxs.reshape(above_idxs.shape[0])
261            below_idxs = np.argwhere(detection_array)
262            below_idxs = below_idxs.reshape(below_idxs.shape[0])
263            consecutive_true_indices = np.where(
264                np.convolve(detection_array, self.sequence_ones, mode="valid")
265                == self.sequence_ones.shape[0]
266            )[0]
267            detection_idxs = []
268            for i in consecutive_true_indices:
269                for j in np.arange(i, i + self.sequence_ones.shape[0]):
270                    if j not in detection_idxs and j < detection_array.shape[0]:
271                        detection_idxs.append(j)
272
273            remove = np.isin(below_idxs, detection_idxs)
274            below_idxs = below_idxs[~remove]
275
276            self.phase_std_dots_above.setData(
277                self.distances_m[above_idxs], processor_result.extra_result.phase_std[above_idxs]
278            )
279            self.phase_std_dots_below.setData(
280                self.distances_m[below_idxs], processor_result.extra_result.phase_std[below_idxs]
281            )
282            if len(detection_idxs) > 0:
283                self.phase_std_dots_detection.setData(
284                    self.distances_m[detection_idxs],
285                    processor_result.extra_result.phase_std[detection_idxs],
286                )
287                self.phase_std_dots_detection.show()
288            else:
289                self.phase_std_dots_detection.hide()
290
291            self.phase_std_dots_above.show()
292            self.phase_std_dots_below.show()
293        else:
294            self.phase_std_dots_above.setData(
295                self.distances_m, processor_result.extra_result.phase_std
296            )
297            self.phase_std_dots_below.hide()
298            self.phase_std_dots_detection.hide()
299
300        # History plot
301
302        self.level_history = np.roll(self.level_history, -1)
303        if processor_result.level_m is not None:
304            self.level_history[-1] = processor_result.level_m
305        else:
306            self.level_history[-1] = np.nan
307
308        if np.all(np.isnan(self.level_history)):
309            self.level_history_curve.hide()
310        else:
311            self.level_history_curve.setData(self.hist_xs, self.level_history)
312            self.level_history_curve.show()
313
314        # Level plot
315
316        # Show the percentage level plot if the plot width is greater than 600 pixels,
317        # otherwise display the level as text.
318        if self.win.width() < 600:
319            if processor_result.level_percent is None:
320                self.level_text_item.hide()
321                self.no_detection_text_item.show()
322            elif processor_result.level_percent > 100:  # Overflow
323                level_text = "Overflow"
324                level_html = self.level_html_format.format(level_text)
325                self.level_text_item.setHtml(level_html)
326                self.level_text_item.show()
327                self.no_detection_text_item.hide()
328            elif processor_result.level_percent > 0:  # In bin detection
329                assert processor_result.level_m is not None
330                assert processor_result.level_percent is not None
331                level_text = "Level: {:.2f} m, {:.0f} %".format(
332                    processor_result.level_m,
333                    processor_result.level_percent,
334                )
335                level_html = self.level_html_format.format(level_text)
336                self.level_text_item.setHtml(level_html)
337                self.level_text_item.show()
338                self.no_detection_text_item.hide()
339            else:  # No detection
340                self.level_text_item.hide()
341                self.no_detection_text_item.show()
342
343            for rect in self.rects:
344                rect.setVisible(False)
345        else:
346            if processor_result.level_percent is None:  # No detection
347                level_text = "No detection"
348                for rect in self.rects:
349                    rect.setBrush(et.utils.pg_brush_cycler(7))
350                self.level_text_item.hide()
351                self.no_detection_text_item.show()
352            elif processor_result.level_percent > 100:  # Overflow
353                for rect in self.rects:
354                    rect.setBrush(et.utils.pg_brush_cycler(0))
355
356                level_text = "Overflow"
357                level_html = self.level_html_format.format(level_text)
358                self.level_text_item.setHtml(level_html)
359                self.level_text_item.show()
360                self.no_detection_text_item.hide()
361            else:  # In bin detection
362                self.bar_loc = int(
363                    np.around(processor_result.level_percent / 100 * self.num_rects)
364                )
365                for rect in self.rects[: self.bar_loc]:
366                    rect.setBrush(et.utils.pg_brush_cycler(0))
367
368                for rect in self.rects[self.bar_loc :]:
369                    rect.setBrush(et.utils.pg_brush_cycler(7))
370
371                assert processor_result.level_m is not None
372                assert processor_result.level_percent is not None
373                level_text = "Level: {:.2f} m, {:.0f} %".format(
374                    processor_result.level_m,
375                    processor_result.level_percent,
376                )
377                level_html = self.level_html_format.format(level_text)
378                self.level_text_item.setHtml(level_html)
379                self.level_text_item.show()
380                self.no_detection_text_item.hide()
381
382            for rect in self.rects:
383                rect.setVisible(True)
384
385    @staticmethod
386    def _create_phase_std_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
387        phase_std_plot = parent.addPlot(row=1, col=0, colspan=3)
388        phase_std_plot.setTitle("Phase standard deviation")
389        phase_std_plot.setLabel(axis="bottom", text="Distance [m]")
390        phase_std_plot.setLabel("left", "Phase std")
391        phase_std_plot.setMenuEnabled(False)
392        phase_std_plot.setMouseEnabled(x=False, y=False)
393        phase_std_plot.hideButtons()
394        phase_std_plot.showGrid(x=True, y=True, alpha=0.5)
395        phase_std_plot.setYRange(0, 4)
396
397        return phase_std_plot
398
399    @staticmethod
400    def _create_history_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
401        history_plot = parent.addPlot(row=0, col=1, colspan=2)
402        history_plot.setTitle("Level history")
403        history_plot.setMenuEnabled(False)
404        history_plot.showGrid(x=True, y=True)
405        history_plot.setLabel("left", "Estimated level (m)")
406        history_plot.setLabel("bottom", "Time (s)")
407
408        return history_plot
409
410
411if __name__ == "__main__":
412    main()

View this example on GitHub: acconeer/acconeer-python-exploration