examples/a121/algo/speed/processor.py#
1# Copyright (c) Acconeer AB, 2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10
11import pyqtgraph as pg
12
13import acconeer.exptool as et
14from acconeer.exptool import a121
15from acconeer.exptool.a121.algo._utils import estimate_frame_rate
16from acconeer.exptool.a121.algo.speed import (
17 DetectorConfig,
18 Processor,
19 ProcessorConfig,
20 ProcessorResult,
21)
22from acconeer.exptool.app.new.ui.stream_tab.plugin_widget import PluginPlotArea
23
24
25def main():
26 args = a121.ExampleArgumentParser().parse_args()
27 et.utils.config_logging(args)
28
29 client = a121.Client.open(**a121.get_client_args(args))
30
31 sensor_config = a121.SensorConfig(
32 start_point=200,
33 step_length=72,
34 num_points=1,
35 profile=a121.Profile.PROFILE_4,
36 sweeps_per_frame=200,
37 hwaas=90,
38 frame_rate=None,
39 sweep_rate=8880.0,
40 continuous_sweep_mode=False,
41 double_buffering=False,
42 inter_frame_idle_state=a121.IdleState.READY,
43 inter_sweep_idle_state=a121.IdleState.READY,
44 receiver_gain=16,
45 enable_tx=True,
46 enable_loopback=False,
47 phase_enhancement=False,
48 prf=a121.PRF.PRF_15_6_MHz,
49 )
50
51 SENSOR_ID = 1
52
53 session_config = a121.SessionConfig(
54 {SENSOR_ID: sensor_config},
55 extended=False,
56 )
57 estimated_frame_rate = estimate_frame_rate(client, session_config)
58
59 metadata = client.setup_session(sensor_config)
60
61 processor_config = ProcessorConfig(
62 threshold=100.0,
63 num_segments=4,
64 )
65
66 speed_processor = Processor(
67 sensor_config=sensor_config,
68 metadata=metadata,
69 processor_config=processor_config,
70 )
71
72 pg_updater = PGUpdater(sensor_config, processor_config, estimated_frame_rate)
73 pg_process = et.PGProcess(pg_updater)
74 pg_process.start()
75
76 client.start_session()
77
78 interrupt_handler = et.utils.ExampleInterruptHandler()
79 print("Press Ctrl-C to end session")
80
81 while not interrupt_handler.got_signal:
82 result = client.get_next()
83 processed_data = speed_processor.process(result)
84 try:
85 pg_process.put_data(processed_data)
86 except et.PGProccessDiedException:
87 break
88
89 print("Disconnecting...")
90 pg_process.close()
91 client.close()
92
93
94class PGUpdater:
95 def __init__(
96 self,
97 sensor_config: a121.SensorConfig,
98 processor_config: ProcessorConfig,
99 estimated_frame_rate: float,
100 ):
101 self.sensor_config = sensor_config
102
103 self.max_speed = DetectorConfig._get_max_speed(sensor_config.sweep_rate)
104
105 self.n_depths = sensor_config.num_points
106
107 max_update_rate = PluginPlotArea._FPS
108
109 if estimated_frame_rate > max_update_rate:
110 plugin_frame_rate = float(max_update_rate)
111 else:
112 plugin_frame_rate = estimated_frame_rate
113
114 self.history_length_s = 10.0
115 self.history_length = int(self.history_length_s * plugin_frame_rate)
116
117 self.time_window_length_s = 3.0
118 self.time_window_length_n = int(self.time_window_length_s * plugin_frame_rate)
119
120 self.speed_history = np.zeros(self.history_length)
121 self.speed_history_xs = np.array([i for i in range(-self.history_length, 0)])
122
123 n_ticks_to_display = 10
124 x_labels = np.linspace(-self.history_length_s, 0, self.history_length)
125 all_ticks = [
126 (t, "{:.0f}".format(label)) for t, label in zip(self.speed_history_xs, x_labels)
127 ]
128 subsample_step = self.history_length // n_ticks_to_display
129 self.display_ticks = [all_ticks[::subsample_step]]
130
131 self.setup_is_done = False
132
133 def setup(self, win):
134 win.setWindowTitle("Acconeer speed detection example")
135
136 self.raw_fft_plot = win.addPlot(row=1, col=0)
137 self.raw_fft_plot.setTitle("Frequency data")
138 self.raw_fft_plot.setLabel(axis="left", text="Amplitude")
139 self.raw_fft_plot.setLabel(axis="bottom", text="Frequency", units="Hz")
140 self.raw_fft_plot.addLegend(labelTextSize="10pt")
141 self.raw_fft_limits = et.utils.SmoothLimits()
142 self.raw_fft_plot.setMenuEnabled(False)
143 self.raw_fft_plot.setMouseEnabled(x=False, y=False)
144 self.raw_fft_plot.hideButtons()
145 self.raw_fft_plot.showGrid(x=True, y=True)
146 self.raw_fft_plot.setLogMode(x=False, y=True)
147 self.raw_fft_curves = []
148 self.raw_fft_smooth_max = et.utils.SmoothMax(self.sensor_config.frame_rate)
149 self.raw_thresholds_curves = []
150
151 for i in range(self.n_depths):
152 raw_fft_curve = self.raw_fft_plot.plot(pen=et.utils.pg_pen_cycler(i), name="Fft")
153 threshold_curve = self.raw_fft_plot.plot(
154 pen=et.utils.pg_pen_cycler(i), name="Threshold"
155 )
156 self.raw_fft_curves.append(raw_fft_curve)
157 self.raw_thresholds_curves.append(threshold_curve)
158
159 self.speed_history_plot = win.addPlot(row=0, col=0)
160 self.speed_history_plot.setTitle("Speed history")
161 self.speed_history_plot.setLabel(axis="left", text="Speed", units="m/s")
162 self.speed_history_plot.setLabel(axis="bottom", text="Time", units="Seconds")
163 self.speed_history_plot.addLegend(labelTextSize="10pt")
164 self.speed_history_curve = self.speed_history_plot.plot(
165 pen=None,
166 name="speed",
167 symbol="o",
168 symbolsize=3,
169 )
170
171 self.speed_history_plot.setYRange(-self.max_speed, self.max_speed)
172 self.speed_history_plot.setXRange(-self.history_length, 0)
173
174 ay = self.speed_history_plot.getAxis("bottom")
175 ay.setTicks(self.display_ticks)
176
177 self.speed_html_format = (
178 '<div style="text-align: center">'
179 '<span style="color: #FFFFFF;font-size:15pt;">'
180 "{}</span></div>"
181 )
182
183 self.speed_text_item = pg.TextItem(
184 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
185 anchor=(0.5, 0.5),
186 )
187
188 self.speed_text_item.setPos(-self.history_length / 2, -self.max_speed / 2)
189 brush = et.utils.pg_brush_cycler(1)
190 self.speed_history_peak_plot_item = pg.PlotDataItem(
191 pen=None, symbol="o", symbolSize=8, symbolBrush=brush, symbolPen="k"
192 )
193 self.speed_history_plot.addItem(self.speed_history_peak_plot_item)
194 self.speed_history_plot.addItem(self.speed_text_item)
195
196 self.speed_text_item.hide()
197
198 self.setup_is_done = True
199
200 def update(self, data: ProcessorResult) -> None:
201 psd = data.extra_result.psd
202 speed_guess = data.max_speed
203 x_speeds = data.extra_result.velocities
204 thresholds = data.extra_result.actual_thresholds
205
206 self.speed_history = np.roll(self.speed_history, -1)
207
208 self.speed_history[-1] = speed_guess
209
210 if self.time_window_length_n > 0:
211 pos_speed = np.max(self.speed_history[-self.time_window_length_n :])
212 pos_ind = int(np.argmax(self.speed_history[-self.time_window_length_n :]))
213 neg_speed = np.min(self.speed_history[-self.time_window_length_n :])
214 neg_ind = int(np.argmin(self.speed_history[-self.time_window_length_n :]))
215
216 if abs(neg_speed) > abs(pos_speed):
217 max_display_speed = neg_speed
218 max_display_ind = neg_ind
219 else:
220 max_display_speed = pos_speed
221 max_display_ind = pos_ind
222 else:
223 max_display_speed = self.speed_history[-1]
224 max_display_ind = -1
225
226 if max_display_speed != 0.0:
227 speed_text = "Max speed estimate {:.4f} m/s".format(max_display_speed)
228 speed_html = self.speed_html_format.format(speed_text)
229
230 self.speed_text_item.setHtml(speed_html)
231 self.speed_text_item.show()
232
233 sub_xs = self.speed_history_xs[-self.time_window_length_n :]
234 self.speed_history_peak_plot_item.setData(
235 [sub_xs[max_display_ind]], [max_display_speed]
236 )
237 else:
238 self.speed_history_peak_plot_item.clear()
239 self.speed_text_item.hide()
240
241 display_inds = np.array([i for i, x in enumerate(self.speed_history) if x != 0.0])
242 if len(display_inds) > 0:
243 display_xs = self.speed_history_xs[display_inds]
244 display_data = self.speed_history[display_inds]
245 else:
246 display_xs = []
247 display_data = []
248 self.speed_history_curve.setData(display_xs, display_data)
249
250 assert psd is not None
251 assert thresholds is not None
252
253 top_max = max(np.max(psd), np.max(thresholds))
254
255 smooth_max_val = np.log10(self.raw_fft_smooth_max.update(top_max))
256 self.raw_fft_plot.setYRange(-2, smooth_max_val)
257 for i in range(psd.shape[1]):
258 self.raw_fft_curves[i].setData(x_speeds, psd[:, i])
259
260 threshold_line = np.full(x_speeds.shape[0], thresholds[i])
261 self.raw_thresholds_curves[i].setData(x_speeds, threshold_line)
262
263
264if __name__ == "__main__":
265 main()
View this example on GitHub: acconeer/acconeer-python-exploration