examples/a121/algo/speed/processor.py

examples/a121/algo/speed/processor.py#

  1# Copyright (c) Acconeer AB, 2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo._utils import estimate_frame_rate
 16from acconeer.exptool.a121.algo.speed import (
 17    DetectorConfig,
 18    Processor,
 19    ProcessorConfig,
 20    ProcessorResult,
 21)
 22from acconeer.exptool.app.new.ui.stream_tab.plugin_widget import PluginPlotArea
 23
 24
 25def main():
 26    args = a121.ExampleArgumentParser().parse_args()
 27    et.utils.config_logging(args)
 28
 29    client = a121.Client.open(**a121.get_client_args(args))
 30
 31    sensor_config = a121.SensorConfig(
 32        start_point=200,
 33        step_length=72,
 34        num_points=1,
 35        profile=a121.Profile.PROFILE_4,
 36        sweeps_per_frame=200,
 37        hwaas=90,
 38        frame_rate=None,
 39        sweep_rate=8880.0,
 40        continuous_sweep_mode=False,
 41        double_buffering=False,
 42        inter_frame_idle_state=a121.IdleState.READY,
 43        inter_sweep_idle_state=a121.IdleState.READY,
 44        receiver_gain=16,
 45        enable_tx=True,
 46        enable_loopback=False,
 47        phase_enhancement=False,
 48        prf=a121.PRF.PRF_15_6_MHz,
 49    )
 50
 51    SENSOR_ID = 1
 52
 53    session_config = a121.SessionConfig(
 54        {SENSOR_ID: sensor_config},
 55        extended=False,
 56    )
 57    estimated_frame_rate = estimate_frame_rate(client, session_config)
 58
 59    metadata = client.setup_session(sensor_config)
 60
 61    processor_config = ProcessorConfig(
 62        threshold=100.0,
 63        num_segments=4,
 64    )
 65
 66    speed_processor = Processor(
 67        sensor_config=sensor_config,
 68        metadata=metadata,
 69        processor_config=processor_config,
 70    )
 71
 72    pg_updater = PGUpdater(sensor_config, processor_config, estimated_frame_rate)
 73    pg_process = et.PGProcess(pg_updater)
 74    pg_process.start()
 75
 76    client.start_session()
 77
 78    interrupt_handler = et.utils.ExampleInterruptHandler()
 79    print("Press Ctrl-C to end session")
 80
 81    while not interrupt_handler.got_signal:
 82        result = client.get_next()
 83        processed_data = speed_processor.process(result)
 84        try:
 85            pg_process.put_data(processed_data)
 86        except et.PGProccessDiedException:
 87            break
 88
 89    print("Disconnecting...")
 90    pg_process.close()
 91    client.close()
 92
 93
 94class PGUpdater:
 95    def __init__(
 96        self,
 97        sensor_config: a121.SensorConfig,
 98        processor_config: ProcessorConfig,
 99        estimated_frame_rate: float,
100    ):
101        self.sensor_config = sensor_config
102
103        self.max_speed = DetectorConfig._get_max_speed(sensor_config.sweep_rate)
104
105        self.n_depths = sensor_config.num_points
106
107        max_update_rate = PluginPlotArea._FPS
108
109        if estimated_frame_rate > max_update_rate:
110            plugin_frame_rate = float(max_update_rate)
111        else:
112            plugin_frame_rate = estimated_frame_rate
113
114        self.history_length_s = 10.0
115        self.history_length = int(self.history_length_s * plugin_frame_rate)
116
117        self.time_window_length_s = 3.0
118        self.time_window_length_n = int(self.time_window_length_s * plugin_frame_rate)
119
120        self.speed_history = np.zeros(self.history_length)
121        self.speed_history_xs = np.array([i for i in range(-self.history_length, 0)])
122
123        n_ticks_to_display = 10
124        x_labels = np.linspace(-self.history_length_s, 0, self.history_length)
125        all_ticks = [
126            (t, "{:.0f}".format(label)) for t, label in zip(self.speed_history_xs, x_labels)
127        ]
128        subsample_step = self.history_length // n_ticks_to_display
129        self.display_ticks = [all_ticks[::subsample_step]]
130
131        self.setup_is_done = False
132
133    def setup(self, win):
134        win.setWindowTitle("Acconeer speed detection example")
135
136        self.raw_fft_plot = win.addPlot(row=1, col=0)
137        self.raw_fft_plot.setTitle("Frequency data")
138        self.raw_fft_plot.setLabel(axis="left", text="Amplitude")
139        self.raw_fft_plot.setLabel(axis="bottom", text="Frequency", units="Hz")
140        self.raw_fft_plot.addLegend(labelTextSize="10pt")
141        self.raw_fft_limits = et.utils.SmoothLimits()
142        self.raw_fft_plot.setMenuEnabled(False)
143        self.raw_fft_plot.setMouseEnabled(x=False, y=False)
144        self.raw_fft_plot.hideButtons()
145        self.raw_fft_plot.showGrid(x=True, y=True)
146        self.raw_fft_plot.setLogMode(x=False, y=True)
147        self.raw_fft_curves = []
148        self.raw_fft_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
149        self.raw_thresholds_curves = []
150
151        for i in range(self.n_depths):
152            raw_fft_curve = self.raw_fft_plot.plot(pen=et.utils.pg_pen_cycler(i), name="Fft")
153            threshold_curve = self.raw_fft_plot.plot(
154                pen=et.utils.pg_pen_cycler(i), name="Threshold"
155            )
156            self.raw_fft_curves.append(raw_fft_curve)
157            self.raw_thresholds_curves.append(threshold_curve)
158
159        self.speed_history_plot = win.addPlot(row=0, col=0)
160        self.speed_history_plot.setTitle("Speed history")
161        self.speed_history_plot.setLabel(axis="left", text="Speed", units="m/s")
162        self.speed_history_plot.setLabel(axis="bottom", text="Time", units="Seconds")
163        self.speed_history_plot.addLegend(labelTextSize="10pt")
164        self.speed_history_curve = self.speed_history_plot.plot(
165            pen=None,
166            name="speed",
167            symbol="o",
168            symbolsize=3,
169        )
170
171        self.speed_history_plot.setYRange(-self.max_speed, self.max_speed)
172        self.speed_history_plot.setXRange(-self.history_length, 0)
173
174        ay = self.speed_history_plot.getAxis("bottom")
175        ay.setTicks(self.display_ticks)
176
177        self.speed_html_format = (
178            '<div style="text-align: center">'
179            '<span style="color: #FFFFFF;font-size:15pt;">'
180            "{}</span></div>"
181        )
182
183        self.speed_text_item = pg.TextItem(
184            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
185            anchor=(0.5, 0.5),
186        )
187
188        self.speed_text_item.setPos(-self.history_length / 2, -self.max_speed / 2)
189        brush = et.utils.pg_brush_cycler(1)
190        self.speed_history_peak_plot_item = pg.PlotDataItem(
191            pen=None, symbol="o", symbolSize=8, symbolBrush=brush, symbolPen="k"
192        )
193        self.speed_history_plot.addItem(self.speed_history_peak_plot_item)
194        self.speed_history_plot.addItem(self.speed_text_item)
195
196        self.speed_text_item.hide()
197
198        self.setup_is_done = True
199
200    def update(self, data: ProcessorResult) -> None:
201        psd = data.extra_result.psd
202        speed_guess = data.max_speed
203        x_speeds = data.extra_result.velocities
204        thresholds = data.extra_result.actual_thresholds
205
206        self.speed_history = np.roll(self.speed_history, -1)
207
208        self.speed_history[-1] = speed_guess
209
210        if self.time_window_length_n > 0:
211            pos_speed = np.max(self.speed_history[-self.time_window_length_n :])
212            pos_ind = int(np.argmax(self.speed_history[-self.time_window_length_n :]))
213            neg_speed = np.min(self.speed_history[-self.time_window_length_n :])
214            neg_ind = int(np.argmin(self.speed_history[-self.time_window_length_n :]))
215
216            if abs(neg_speed) > abs(pos_speed):
217                max_display_speed = neg_speed
218                max_display_ind = neg_ind
219            else:
220                max_display_speed = pos_speed
221                max_display_ind = pos_ind
222        else:
223            max_display_speed = self.speed_history[-1]
224            max_display_ind = -1
225
226        if max_display_speed != 0.0:
227            speed_text = "Max speed estimate {:.4f} m/s".format(max_display_speed)
228            speed_html = self.speed_html_format.format(speed_text)
229
230            self.speed_text_item.setHtml(speed_html)
231            self.speed_text_item.show()
232
233            sub_xs = self.speed_history_xs[-self.time_window_length_n :]
234            self.speed_history_peak_plot_item.setData(
235                [sub_xs[max_display_ind]], [max_display_speed]
236            )
237        else:
238            self.speed_history_peak_plot_item.clear()
239            self.speed_text_item.hide()
240
241        display_inds = np.array([i for i, x in enumerate(self.speed_history) if x != 0.0])
242        if len(display_inds) > 0:
243            display_xs = self.speed_history_xs[display_inds]
244            display_data = self.speed_history[display_inds]
245        else:
246            display_xs = []
247            display_data = []
248        self.speed_history_curve.setData(display_xs, display_data)
249
250        assert psd is not None
251        assert thresholds is not None
252
253        top_max = max(np.max(psd), np.max(thresholds))
254
255        smooth_max_val = np.log10(self.raw_fft_smooth_max.update(top_max))
256        self.raw_fft_plot.setYRange(-2, smooth_max_val)
257        for i in range(psd.shape[1]):
258            self.raw_fft_curves[i].setData(x_speeds, psd[:, i])
259
260            threshold_line = np.full(x_speeds.shape[0], thresholds[i])
261            self.raw_thresholds_curves[i].setData(x_speeds, threshold_line)
262
263
264if __name__ == "__main__":
265    main()

View this example on GitHub: acconeer/acconeer-python-exploration