examples/a121/algo/speed/detector.py#
1# Copyright (c) Acconeer AB, 2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10
11import pyqtgraph as pg
12
13import acconeer.exptool as et
14from acconeer.exptool import a121
15from acconeer.exptool.a121.algo._utils import estimate_frame_rate
16from acconeer.exptool.a121.algo.speed import (
17 Detector,
18 DetectorConfig,
19 DetectorMetadata,
20 DetectorResult,
21)
22from acconeer.exptool.app.new.ui.stream_tab.plugin_widget import PluginPlotArea
23
24
25def main():
26 args = a121.ExampleArgumentParser().parse_args()
27 et.utils.config_logging(args)
28
29 client = a121.Client.open(**a121.get_client_args(args))
30
31 # Detector config set to default values for clarity.
32 detector_config = DetectorConfig(
33 start_point=200,
34 num_points=1,
35 step_length=None,
36 profile=None,
37 frame_rate=None,
38 sweep_rate=None,
39 hwaas=None,
40 num_bins=50,
41 max_speed=10.0,
42 threshold=100.0,
43 )
44
45 SENSOR_ID = 1
46
47 detector = Detector(client=client, sensor_id=SENSOR_ID, detector_config=detector_config)
48 sensor_config = detector._get_sensor_config(detector_config)
49 session_config = a121.SessionConfig(
50 {SENSOR_ID: sensor_config},
51 extended=False,
52 )
53
54 estimated_frame_rate = estimate_frame_rate(client, session_config)
55
56 detector.start()
57
58 pg_updater = PGUpdater(
59 detector_config,
60 sensor_config,
61 estimated_frame_rate,
62 )
63 pg_process = et.PGProcess(pg_updater)
64 pg_process.start()
65
66 interrupt_handler = et.utils.ExampleInterruptHandler()
67 print("Press Ctrl-C to end session")
68
69 while not interrupt_handler.got_signal:
70 detector_result = detector.get_next()
71 try:
72 pg_process.put_data(detector_result)
73 except et.PGProccessDiedException:
74 break
75
76 detector.stop()
77
78 print("Disconnecting...")
79 client.close()
80
81
82class PGUpdater:
83 def __init__(
84 self,
85 detector_config: DetectorConfig,
86 detector_metadata: DetectorMetadata,
87 estimated_frame_rate: float,
88 ):
89 self.detector_config = detector_config
90
91 self.n_depths = detector_metadata.num_points
92
93 max_update_rate = PluginPlotArea._FPS
94
95 if estimated_frame_rate > max_update_rate:
96 plugin_frame_rate = float(max_update_rate)
97 else:
98 plugin_frame_rate = estimated_frame_rate
99
100 self.history_length_s = 10.0
101 self.history_length = int(self.history_length_s * plugin_frame_rate)
102
103 self.time_window_length_s = 3.0
104 self.time_window_length_n = int(self.time_window_length_s * plugin_frame_rate)
105
106 self.speed_history = np.zeros(self.history_length)
107 self.speed_history_xs = np.array([i for i in range(-self.history_length, 0)])
108
109 n_ticks_to_display = 10
110 x_labels = np.linspace(-self.history_length_s, 0, self.history_length)
111 all_ticks = [
112 (t, "{:.0f}".format(label)) for t, label in zip(self.speed_history_xs, x_labels)
113 ]
114 subsample_step = self.history_length // n_ticks_to_display
115 self.display_ticks = [all_ticks[::subsample_step]]
116
117 self.setup_is_done = False
118
119 def setup(self, win):
120 win.setWindowTitle("Acconeer speed detection example")
121
122 self.raw_fft_plot = win.addPlot(row=1, col=0)
123 self.raw_fft_plot.setTitle("Frequency data")
124 self.raw_fft_plot.setLabel(axis="left", text="Amplitude")
125 self.raw_fft_plot.setLabel(axis="bottom", text="Frequency", units="Hz")
126 self.raw_fft_plot.addLegend(labelTextSize="10pt")
127 self.raw_fft_limits = et.utils.SmoothLimits()
128 self.raw_fft_plot.setMenuEnabled(False)
129 self.raw_fft_plot.setMouseEnabled(x=False, y=False)
130 self.raw_fft_plot.hideButtons()
131 self.raw_fft_plot.showGrid(x=True, y=True)
132 self.raw_fft_plot.setLogMode(x=False, y=True)
133 self.raw_fft_curves = []
134 self.raw_fft_smooth_max = et.utils.SmoothMax(self.detector_config.frame_rate)
135 self.raw_thresholds_curves = []
136
137 for i in range(self.n_depths):
138 raw_fft_curve = self.raw_fft_plot.plot(pen=et.utils.pg_pen_cycler(i), name="Fft")
139 threshold_curve = self.raw_fft_plot.plot(
140 pen=et.utils.pg_pen_cycler(i), name="Threshold"
141 )
142 self.raw_fft_curves.append(raw_fft_curve)
143 self.raw_thresholds_curves.append(threshold_curve)
144
145 self.speed_history_plot = win.addPlot(row=0, col=0)
146 self.speed_history_plot.setTitle("Speed history")
147 self.speed_history_plot.setLabel(axis="left", text="Speed", units="m/s")
148 self.speed_history_plot.setLabel(axis="bottom", text="Time", units="Seconds")
149 self.speed_history_plot.addLegend(labelTextSize="10pt")
150 self.speed_history_curve = self.speed_history_plot.plot(
151 pen=None,
152 name="speed",
153 symbol="o",
154 symbolsize=3,
155 )
156
157 if self.detector_config.sweep_rate is not None:
158 actual_max_speed = DetectorConfig._get_max_speed(self.detector_config.sweep_rate)
159 self.speed_history_plot.setYRange(-actual_max_speed, actual_max_speed)
160 else:
161 self.speed_history_plot.setYRange(
162 -self.detector_config.max_speed, self.detector_config.max_speed
163 )
164 self.speed_history_plot.setXRange(-self.history_length, 0)
165 ay = self.speed_history_plot.getAxis("bottom")
166 ay.setTicks(self.display_ticks)
167
168 self.speed_html_format = (
169 '<div style="text-align: center">'
170 '<span style="color: #FFFFFF;font-size:15pt;">'
171 "{}</span></div>"
172 )
173
174 self.speed_text_item = pg.TextItem(
175 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
176 anchor=(0.5, 0.5),
177 )
178
179 self.speed_text_item.setPos(-self.history_length / 2, -self.detector_config.max_speed / 2)
180 brush = et.utils.pg_brush_cycler(1)
181 self.speed_history_peak_plot_item = pg.PlotDataItem(
182 pen=None, symbol="o", symbolSize=8, symbolBrush=brush, symbolPen="k"
183 )
184 self.speed_history_plot.addItem(self.speed_history_peak_plot_item)
185 self.speed_history_plot.addItem(self.speed_text_item)
186
187 self.speed_text_item.hide()
188
189 self.setup_is_done = True
190
191 def update(self, data: DetectorResult) -> None:
192 psd = data.extra_result.psd
193 speed_guess = data.max_speed
194 x_speeds = data.extra_result.velocities
195 thresholds = data.extra_result.actual_thresholds
196
197 self.speed_history = np.roll(self.speed_history, -1)
198
199 self.speed_history[-1] = speed_guess
200
201 if self.time_window_length_n > 0:
202 pos_speed = np.max(self.speed_history[-self.time_window_length_n :])
203 pos_ind = int(np.argmax(self.speed_history[-self.time_window_length_n :]))
204 neg_speed = np.min(self.speed_history[-self.time_window_length_n :])
205 neg_ind = int(np.argmin(self.speed_history[-self.time_window_length_n :]))
206
207 if abs(neg_speed) > abs(pos_speed):
208 max_display_speed = neg_speed
209 max_display_ind = neg_ind
210 else:
211 max_display_speed = pos_speed
212 max_display_ind = pos_ind
213 else:
214 max_display_speed = self.speed_history[-1]
215 max_display_ind = -1
216
217 if max_display_speed != 0.0:
218 speed_text = "Max speed estimate {:.4f} m/s".format(max_display_speed)
219 speed_html = self.speed_html_format.format(speed_text)
220
221 self.speed_text_item.setHtml(speed_html)
222 self.speed_text_item.show()
223
224 sub_xs = self.speed_history_xs[-self.time_window_length_n :]
225 self.speed_history_peak_plot_item.setData(
226 [sub_xs[max_display_ind]], [max_display_speed]
227 )
228 else:
229 self.speed_history_peak_plot_item.clear()
230 self.speed_text_item.hide()
231
232 display_inds = np.array([i for i, x in enumerate(self.speed_history) if x != 0.0])
233 if len(display_inds) > 0:
234 display_xs = self.speed_history_xs[display_inds]
235 display_data = self.speed_history[display_inds]
236 else:
237 display_xs = []
238 display_data = []
239 self.speed_history_curve.setData(display_xs, display_data)
240
241 assert psd is not None
242 assert thresholds is not None
243
244 top_max = max(np.max(psd), np.max(thresholds))
245
246 smooth_max_val = np.log10(self.raw_fft_smooth_max.update(top_max))
247 self.raw_fft_plot.setYRange(-2, smooth_max_val)
248 for i in range(psd.shape[1]):
249 self.raw_fft_curves[i].setData(x_speeds, psd[:, i])
250
251 threshold_line = np.full(x_speeds.shape[0], thresholds[i])
252 self.raw_thresholds_curves[i].setData(x_speeds, threshold_line)
253
254
255if __name__ == "__main__":
256 main()
View this example on GitHub: acconeer/acconeer-python-exploration