examples/a121/algo/touchless_button/processor.py

examples/a121/algo/touchless_button/processor.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8import pyqtgraph as pg
  9
 10import acconeer.exptool as et
 11from acconeer.exptool import a121
 12from acconeer.exptool.a121.algo.touchless_button import (
 13    MeasurementType,
 14    Processor,
 15    ProcessorConfig,
 16    ProcessorResult,
 17    get_close_sensor_config,
 18)
 19
 20
 21def main():
 22    args = a121.ExampleArgumentParser().parse_args()
 23    et.utils.config_logging(args)
 24
 25    client = a121.Client.open(**a121.get_client_args(args))
 26
 27    processor_config = ProcessorConfig()
 28
 29    sensor_config = get_close_sensor_config()
 30
 31    metadata = client.setup_session(sensor_config)
 32    client.start_session()
 33
 34    processor = Processor(
 35        sensor_config=sensor_config,
 36        metadata=metadata,
 37        processor_config=processor_config,
 38    )
 39
 40    pg_updater = PGUpdater(sensor_config=sensor_config, processor_config=processor_config)
 41    pg_process = et.PGProcess(pg_updater)
 42    pg_process.start()
 43
 44    interrupt_handler = et.utils.ExampleInterruptHandler()
 45    print("Press Ctrl-C to end session")
 46
 47    while not interrupt_handler.got_signal:
 48        result = client.get_next()
 49        processor_result = processor.process(result)
 50        try:
 51            pg_process.put_data(processor_result)
 52        except et.PGProccessDiedException:
 53            break
 54
 55    print("Disconnecting...")
 56    pg_process.close()
 57    client.stop_session()
 58    client.close()
 59
 60
 61class PGUpdater:
 62    def __init__(self, sensor_config, processor_config):
 63        self.detection_history = np.full((2, 100), np.NaN)
 64        self.sensor_config = sensor_config
 65        self.processor_config = processor_config
 66
 67    def setup(self, win):
 68        self.detection_history_plot = self._create_detection_plot(win)
 69
 70        self.detection_history_curve_close = self.detection_history_plot.plot(
 71            pen=et.utils.pg_pen_cycler(1, width=5)
 72        )
 73        self.detection_history_curve_far = self.detection_history_plot.plot(
 74            pen=et.utils.pg_pen_cycler(0, width=5)
 75        )
 76
 77        close_html = (
 78            '<div style="text-align: center">'
 79            '<span style="color: #FFFFFF;font-size:15pt;">'
 80            "{}</span></div>".format("Close detection")
 81        )
 82        far_html = (
 83            '<div style="text-align: center">'
 84            '<span style="color: #FFFFFF;font-size:15pt;">'
 85            "{}</span></div>".format("Far detection")
 86        )
 87        self.close_text_item = pg.TextItem(
 88            html=close_html,
 89            fill=pg.mkColor(0xFF, 0x7F, 0x0E),
 90            anchor=(0.5, 0),
 91        )
 92        self.far_text_item = pg.TextItem(
 93            html=far_html,
 94            fill=pg.mkColor(0x1F, 0x77, 0xB4),
 95            anchor=(0.5, 0),
 96        )
 97        pos_left = (100 / 3, 1.8)
 98        pos_right = (2 * 100 / 3, 1.8)
 99        self.close_text_item.setPos(*pos_left)
100        self.far_text_item.setPos(*pos_right)
101        self.detection_history_plot.addItem(self.close_text_item)
102        self.detection_history_plot.addItem(self.far_text_item)
103        self.close_text_item.hide()
104        self.far_text_item.hide()
105
106        self.detection_history = np.full((2, 100), np.NaN)
107
108        self.score_history_plot = self._create_score_plot(win)
109        score_plot_legend = self.score_history_plot.legend
110        self.score_smooth_max = et.utils.SmoothMax()
111
112        self.threshold_history_curve_close = self.score_history_plot.plot(
113            pen=et.utils.pg_pen_cycler(1, width=2.5, style="--"),
114        )
115        self.threshold_history_curve_far = self.score_history_plot.plot(
116            pen=et.utils.pg_pen_cycler(0, width=2.5, style="--"),
117        )
118
119        self.threshold_history = np.full((2, 100), np.NaN)
120
121        cycle_index = 2  # To not have same colors as thresholds
122        if self.processor_config.measurement_type == MeasurementType.CLOSE_RANGE:
123            measurement_type = "Close"
124            score_plot_legend.addItem(self.threshold_history_curve_close, "Close range threshold")
125        elif self.processor_config.measurement_type == MeasurementType.FAR_RANGE:
126            measurement_type = "Far"
127            score_plot_legend.addItem(self.threshold_history_curve_far, "Far range threshold")
128
129        if self.processor_config.measurement_type != MeasurementType.CLOSE_AND_FAR_RANGE:
130            score_history = np.full((self.sensor_config.subsweep.num_points, 100), np.NaN)
131            score_history_curves = np.empty(
132                (self.sensor_config.subsweep.num_points,), dtype=object
133            )
134            for i in range(self.sensor_config.subsweep.num_points):
135                score_history_curves[i] = pg.ScatterPlotItem(
136                    brush=et.utils.pg_brush_cycler(cycle_index),
137                    name=f"{measurement_type} range, point {i}",
138                )
139                self.score_history_plot.addItem(score_history_curves[i])
140                cycle_index += 1
141
142            if self.processor_config.measurement_type == MeasurementType.CLOSE_RANGE:
143                self.score_history_close = score_history
144                self.score_history_curves_close = score_history_curves
145                self.score_history_far = None
146                self.score_history_curves_far = None
147            elif self.processor_config.measurement_type == MeasurementType.FAR_RANGE:
148                self.score_history_close = None
149                self.score_history_curves_close = None
150                self.score_history_far = score_history
151                self.score_history_curves_far = score_history_curves
152
153        elif self.processor_config.measurement_type == MeasurementType.CLOSE_AND_FAR_RANGE:
154            score_plot_legend.addItem(self.threshold_history_curve_close, "Close range threshold")
155            score_plot_legend.addItem(self.threshold_history_curve_far, "Far range threshold")
156            self.score_history_close = np.full(
157                (self.sensor_config.subsweeps[0].num_points, 100), np.NaN
158            )
159            self.score_history_far = np.full(
160                (self.sensor_config.subsweeps[1].num_points, 100), np.NaN
161            )
162            self.score_history_curves_close = np.empty(
163                (self.sensor_config.subsweeps[0].num_points,), dtype=object
164            )
165            self.score_history_curves_far = np.empty(
166                (self.sensor_config.subsweeps[1].num_points,), dtype=object
167            )
168
169            range_labels = ["Close", "Far"]
170            for n, subsweep in enumerate(self.sensor_config.subsweeps):
171                measurement_type = range_labels[n]
172                score_history_curve_list = [
173                    self.score_history_curves_close,
174                    self.score_history_curves_far,
175                ]
176                for i in range(subsweep.num_points):
177                    score_history_curve_list[n][i] = pg.ScatterPlotItem(
178                        brush=et.utils.pg_brush_cycler(cycle_index),
179                        name=f"{measurement_type} range, point {i}",
180                    )
181                    self.score_history_plot.addItem(score_history_curve_list[n][i])
182                    cycle_index += 1
183            self.score_history_curves_close = score_history_curve_list[0]
184            self.score_history_curves_far = score_history_curve_list[1]
185
186    def update(self, processor_result: ProcessorResult):
187        def is_none_or_detection(x):
188            return x.detection if x is not None else None
189
190        detection = np.array(
191            [
192                is_none_or_detection(processor_result.close),
193                is_none_or_detection(processor_result.far),
194            ]
195        )
196        self.detection_history = np.roll(self.detection_history, -1, axis=1)
197        self.detection_history[:, -1] = detection
198
199        self.detection_history_curve_close.setData(self.detection_history[0])
200        self.detection_history_curve_far.setData(self.detection_history[1])
201
202        if processor_result.close is not None:
203            if processor_result.close.detection:
204                self.close_text_item.show()
205            else:
206                self.close_text_item.hide()
207
208        if processor_result.far is not None:
209            if processor_result.far.detection:
210                self.far_text_item.show()
211            else:
212                self.far_text_item.hide()
213
214        max_val = 0.0
215
216        def is_none_or_threshold(x):
217            return x.threshold if x is not None else None
218
219        threshold = np.array(
220            [
221                is_none_or_threshold(processor_result.close),
222                is_none_or_threshold(processor_result.far),
223            ]
224        )
225        if np.nanmax(np.array(threshold, dtype=float)) > max_val:
226            max_val = np.nanmax(np.array(threshold, dtype=float))
227        self.threshold_history = np.roll(self.threshold_history, -1, axis=1)
228        self.threshold_history[:, -1] = threshold
229
230        self.threshold_history_curve_close.setData(self.threshold_history[0])
231        self.threshold_history_curve_far.setData(self.threshold_history[1])
232
233        if self.score_history_close is not None:
234            self.score_history_close = np.roll(self.score_history_close, -1, axis=1)
235            assert processor_result.close is not None
236            # Plot the second highest score
237            self.score_history_close[:, -1] = np.sort(processor_result.close.score, axis=0)[-2, :]
238
239            assert self.score_history_curves_close is not None
240            for i, curve in enumerate(self.score_history_curves_close):
241                # Assign x-values so that setData() doesn't give error when y-values are NaN
242                curve.setData(np.arange(0, 100), self.score_history_close[i, :].flatten())
243
244            if np.max(processor_result.close.score) > max_val:
245                max_val = np.max(processor_result.close.score)
246
247        if self.score_history_far is not None:
248            self.score_history_far = np.roll(self.score_history_far, -1, axis=1)
249            assert processor_result.far is not None
250            # Plot the second highest score
251            self.score_history_far[:, -1] = np.sort(processor_result.far.score, axis=0)[-2, :]
252
253            assert self.score_history_curves_far is not None
254            for i, curve in enumerate(self.score_history_curves_far):
255                # Assign x-values so that setData() doesn't give error when y-values are NaN
256                curve.setData(np.arange(0, 100), self.score_history_far[i, :].flatten())
257
258            if np.max(processor_result.far.score) > max_val:
259                max_val = np.max(processor_result.far.score)
260
261        if max_val != 0.0:
262            self.score_history_plot.setYRange(0.0, self.score_smooth_max.update(max_val))
263
264    @staticmethod
265    def _create_detection_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
266        detection_history_plot = parent.addPlot(row=0, col=0)
267        detection_history_plot.setTitle("Detection")
268        detection_history_plot.setLabel(axis="bottom", text="Frames")
269        detection_history_plot.setMenuEnabled(False)
270        detection_history_plot.setMouseEnabled(x=False, y=False)
271        detection_history_plot.hideButtons()
272        detection_history_plot.showGrid(x=True, y=True, alpha=0.5)
273        detection_history_plot.setYRange(-0.1, 1.8)
274        return detection_history_plot
275
276    @staticmethod
277    def _create_score_plot(parent: pg.GraphicsLayout) -> pg.PlotItem:
278        score_history_plot = parent.addPlot(row=1, col=0)
279        score_history_plot.setTitle("Detection score")
280        score_history_plot.setLabel(axis="bottom", text="Frames")
281        score_history_plot.addLegend()
282        score_history_plot.setMenuEnabled(False)
283        score_history_plot.setMouseEnabled(x=False, y=False)
284        score_history_plot.hideButtons()
285        score_history_plot.showGrid(x=True, y=True, alpha=0.5)
286        return score_history_plot
287
288
289if __name__ == "__main__":
290    main()

View this example on GitHub: acconeer/acconeer-python-exploration