examples/a121/algo/speed/detector.py

examples/a121/algo/speed/detector.py#

  1# Copyright (c) Acconeer AB, 2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo._utils import estimate_frame_rate
 16from acconeer.exptool.a121.algo.speed import (
 17    Detector,
 18    DetectorConfig,
 19    DetectorMetadata,
 20    DetectorResult,
 21)
 22from acconeer.exptool.app.new.ui.stream_tab.plugin_widget import PluginPlotArea
 23
 24
 25def main():
 26    args = a121.ExampleArgumentParser().parse_args()
 27    et.utils.config_logging(args)
 28
 29    client = a121.Client.open(**a121.get_client_args(args))
 30
 31    # Detector config set to default values for clarity.
 32    detector_config = DetectorConfig(
 33        start_point=200,
 34        num_points=1,
 35        step_length=None,
 36        profile=None,
 37        frame_rate=None,
 38        sweep_rate=None,
 39        hwaas=None,
 40        num_bins=50,
 41        max_speed=10.0,
 42        threshold=100.0,
 43    )
 44
 45    SENSOR_ID = 1
 46
 47    detector = Detector(client=client, sensor_id=SENSOR_ID, detector_config=detector_config)
 48    sensor_config = detector._get_sensor_config(detector_config)
 49    session_config = a121.SessionConfig(
 50        {SENSOR_ID: sensor_config},
 51        extended=False,
 52    )
 53
 54    estimated_frame_rate = estimate_frame_rate(client, session_config)
 55
 56    detector.start()
 57
 58    pg_updater = PGUpdater(
 59        detector_config,
 60        sensor_config,
 61        estimated_frame_rate,
 62    )
 63    pg_process = et.PGProcess(pg_updater)
 64    pg_process.start()
 65
 66    interrupt_handler = et.utils.ExampleInterruptHandler()
 67    print("Press Ctrl-C to end session")
 68
 69    while not interrupt_handler.got_signal:
 70        detector_result = detector.get_next()
 71        try:
 72            pg_process.put_data(detector_result)
 73        except et.PGProccessDiedException:
 74            break
 75
 76    detector.stop()
 77
 78    print("Disconnecting...")
 79    client.close()
 80
 81
 82class PGUpdater:
 83    def __init__(
 84        self,
 85        detector_config: DetectorConfig,
 86        detector_metadata: DetectorMetadata,
 87        estimated_frame_rate: float,
 88    ):
 89        self.detector_config = detector_config
 90
 91        self.n_depths = detector_metadata.num_points
 92
 93        max_update_rate = PluginPlotArea._FPS
 94
 95        if estimated_frame_rate > max_update_rate:
 96            plugin_frame_rate = float(max_update_rate)
 97        else:
 98            plugin_frame_rate = estimated_frame_rate
 99
100        self.history_length_s = 10.0
101        self.history_length = int(self.history_length_s * plugin_frame_rate)
102
103        self.time_window_length_s = 3.0
104        self.time_window_length_n = int(self.time_window_length_s * plugin_frame_rate)
105
106        self.speed_history = np.zeros(self.history_length)
107        self.speed_history_xs = np.array([i for i in range(-self.history_length, 0)])
108
109        n_ticks_to_display = 10
110        x_labels = np.linspace(-self.history_length_s, 0, self.history_length)
111        all_ticks = [
112            (t, "{:.0f}".format(label)) for t, label in zip(self.speed_history_xs, x_labels)
113        ]
114        subsample_step = self.history_length // n_ticks_to_display
115        self.display_ticks = [all_ticks[::subsample_step]]
116
117        self.setup_is_done = False
118
119    def setup(self, win):
120        win.setWindowTitle("Acconeer speed detection example")
121
122        self.raw_fft_plot = win.addPlot(row=1, col=0)
123        self.raw_fft_plot.setTitle("Frequency data")
124        self.raw_fft_plot.setLabel(axis="left", text="Amplitude")
125        self.raw_fft_plot.setLabel(axis="bottom", text="Frequency", units="Hz")
126        self.raw_fft_plot.addLegend(labelTextSize="10pt")
127        self.raw_fft_limits = et.utils.SmoothLimits()
128        self.raw_fft_plot.setMenuEnabled(False)
129        self.raw_fft_plot.setMouseEnabled(x=False, y=False)
130        self.raw_fft_plot.hideButtons()
131        self.raw_fft_plot.showGrid(x=True, y=True)
132        self.raw_fft_plot.setLogMode(x=False, y=True)
133        self.raw_fft_curves = []
134        self.raw_fft_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
135        self.raw_thresholds_curves = []
136
137        for i in range(self.n_depths):
138            raw_fft_curve = self.raw_fft_plot.plot(pen=et.utils.pg_pen_cycler(i), name="Fft")
139            threshold_curve = self.raw_fft_plot.plot(
140                pen=et.utils.pg_pen_cycler(i), name="Threshold"
141            )
142            self.raw_fft_curves.append(raw_fft_curve)
143            self.raw_thresholds_curves.append(threshold_curve)
144
145        self.speed_history_plot = win.addPlot(row=0, col=0)
146        self.speed_history_plot.setTitle("Speed history")
147        self.speed_history_plot.setLabel(axis="left", text="Speed", units="m/s")
148        self.speed_history_plot.setLabel(axis="bottom", text="Time", units="Seconds")
149        self.speed_history_plot.addLegend(labelTextSize="10pt")
150        self.speed_history_curve = self.speed_history_plot.plot(
151            pen=None,
152            name="speed",
153            symbol="o",
154            symbolsize=3,
155        )
156
157        if self.detector_config.sweep_rate is not None:
158            actual_max_speed = DetectorConfig._get_max_speed(self.detector_config.sweep_rate)
159            self.speed_history_plot.setYRange(-actual_max_speed, actual_max_speed)
160        else:
161            self.speed_history_plot.setYRange(
162                -self.detector_config.max_speed, self.detector_config.max_speed
163            )
164        self.speed_history_plot.setXRange(-self.history_length, 0)
165        ay = self.speed_history_plot.getAxis("bottom")
166        ay.setTicks(self.display_ticks)
167
168        self.speed_html_format = (
169            '<div style="text-align: center">'
170            '<span style="color: #FFFFFF;font-size:15pt;">'
171            "{}</span></div>"
172        )
173
174        self.speed_text_item = pg.TextItem(
175            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
176            anchor=(0.5, 0.5),
177        )
178
179        self.speed_text_item.setPos(-self.history_length / 2, -self.detector_config.max_speed / 2)
180        brush = et.utils.pg_brush_cycler(1)
181        self.speed_history_peak_plot_item = pg.PlotDataItem(
182            pen=None, symbol="o", symbolSize=8, symbolBrush=brush, symbolPen="k"
183        )
184        self.speed_history_plot.addItem(self.speed_history_peak_plot_item)
185        self.speed_history_plot.addItem(self.speed_text_item)
186
187        self.speed_text_item.hide()
188
189        self.setup_is_done = True
190
191    def update(self, data: DetectorResult) -> None:
192        psd = data.extra_result.psd
193        speed_guess = data.max_speed
194        x_speeds = data.extra_result.velocities
195        thresholds = data.extra_result.actual_thresholds
196
197        self.speed_history = np.roll(self.speed_history, -1)
198
199        self.speed_history[-1] = speed_guess
200
201        if self.time_window_length_n > 0:
202            pos_speed = np.max(self.speed_history[-self.time_window_length_n :])
203            pos_ind = int(np.argmax(self.speed_history[-self.time_window_length_n :]))
204            neg_speed = np.min(self.speed_history[-self.time_window_length_n :])
205            neg_ind = int(np.argmin(self.speed_history[-self.time_window_length_n :]))
206
207            if abs(neg_speed) > abs(pos_speed):
208                max_display_speed = neg_speed
209                max_display_ind = neg_ind
210            else:
211                max_display_speed = pos_speed
212                max_display_ind = pos_ind
213        else:
214            max_display_speed = self.speed_history[-1]
215            max_display_ind = -1
216
217        if max_display_speed != 0.0:
218            speed_text = "Max speed estimate {:.4f} m/s".format(max_display_speed)
219            speed_html = self.speed_html_format.format(speed_text)
220
221            self.speed_text_item.setHtml(speed_html)
222            self.speed_text_item.show()
223
224            sub_xs = self.speed_history_xs[-self.time_window_length_n :]
225            self.speed_history_peak_plot_item.setData(
226                [sub_xs[max_display_ind]], [max_display_speed]
227            )
228        else:
229            self.speed_history_peak_plot_item.clear()
230            self.speed_text_item.hide()
231
232        display_inds = np.array([i for i, x in enumerate(self.speed_history) if x != 0.0])
233        if len(display_inds) > 0:
234            display_xs = self.speed_history_xs[display_inds]
235            display_data = self.speed_history[display_inds]
236        else:
237            display_xs = []
238            display_data = []
239        self.speed_history_curve.setData(display_xs, display_data)
240
241        assert psd is not None
242        assert thresholds is not None
243
244        top_max = max(np.max(psd), np.max(thresholds))
245
246        smooth_max_val = np.log10(self.raw_fft_smooth_max.update(top_max))
247        self.raw_fft_plot.setYRange(-2, smooth_max_val)
248        for i in range(psd.shape[1]):
249            self.raw_fft_curves[i].setData(x_speeds, psd[:, i])
250
251            threshold_line = np.full(x_speeds.shape[0], thresholds[i])
252            self.raw_thresholds_curves[i].setData(x_speeds, threshold_line)
253
254
255if __name__ == "__main__":
256    main()

View this example on GitHub: acconeer/acconeer-python-exploration