examples/a121/algo/tank_level/tank_level_with_gui.py#
1# Copyright (c) Acconeer AB, 2022-2025
2# All rights reserved
3
4from __future__ import annotations
5
6import logging
7
8import attrs
9import numpy as np
10
11# Added here to force pyqtgraph to choose PySide
12import PySide6 # noqa: F401
13
14import pyqtgraph as pg
15
16import acconeer.exptool as et
17from acconeer.exptool import a121
18from acconeer.exptool.a121.algo.distance import (
19 PeakSortingMethod,
20 ReflectorShape,
21 ThresholdMethod,
22)
23from acconeer.exptool.a121.algo.distance._processors import (
24 DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE,
25 DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE,
26)
27from acconeer.exptool.a121.algo.tank_level import RefApp, RefAppResult
28from acconeer.exptool.a121.algo.tank_level._processor import ProcessorLevelStatus
29from acconeer.exptool.a121.algo.tank_level._ref_app import RefAppConfig, RefAppContext
30
31
32log = logging.getLogger(__name__)
33
34
35TIME_HISTORY_S = 30
36
37
38@attrs.mutable(kw_only=True)
39class SharedState:
40 sensor_id: int = attrs.field(default=1)
41 config: RefAppConfig = attrs.field(factory=RefAppConfig)
42 context: RefAppContext = attrs.field(factory=RefAppContext)
43
44
45def main():
46 args = a121.ExampleArgumentParser().parse_args()
47 et.utils.config_logging(args)
48
49 # Setup the configurations
50 # Detailed at https://docs.acconeer.com/en/latest/exploration_tool/algo/a121/ref_apps/tank_level.html
51
52 # Sensor selections
53 sensor = 1
54
55 # Tank level configurations
56 ref_app_config = RefAppConfig(
57 start_m=0.03,
58 end_m=0.5,
59 max_step_length=2,
60 max_profile=a121.Profile.PROFILE_2,
61 close_range_leakage_cancellation=True,
62 signal_quality=20,
63 update_rate=None,
64 median_filter_length=5,
65 num_medians_to_average=5,
66 threshold_method=ThresholdMethod.CFAR,
67 reflector_shape=ReflectorShape.PLANAR,
68 peaksorting_method=PeakSortingMethod.CLOSEST,
69 num_frames_in_recorded_threshold=50,
70 fixed_threshold_value=DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE, # float
71 fixed_strength_threshold_value=DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE, # float
72 threshold_sensitivity=0.0, # float
73 level_tracking_active=False,
74 partial_tracking_range_m=0.0,
75 )
76
77 # End setup configurations
78
79 # Preparation for client
80 client = a121.Client.open(**a121.get_client_args(args))
81
82 # Preparation for reference application processor
83 ref_app = RefApp(client=client, sensor_id=sensor, config=ref_app_config)
84 ref_app.calibrate()
85 ref_app.start()
86
87 pg_updater = PGUpdater(
88 config=ref_app_config, num_curves=len(ref_app._detector.processor_specs)
89 )
90 pg_process = et.PGProcess(pg_updater, max_freq=60)
91 pg_process.start()
92
93 interrupt_handler = et.utils.ExampleInterruptHandler()
94 print("Press Ctrl-C to end session")
95
96 while not interrupt_handler.got_signal:
97 processed_data = ref_app.get_next()
98 try:
99 pg_process.put_data(processed_data)
100 except et.PGProccessDiedException:
101 break
102
103 ref_app.stop()
104 client.close()
105 print("Disconnecting...")
106
107
108class PGUpdater:
109 STATUS_MSG_MAP = {
110 ProcessorLevelStatus.IN_RANGE: "In range",
111 ProcessorLevelStatus.NO_DETECTION: "Not available",
112 ProcessorLevelStatus.OVERFLOW: "Warning: Overflow",
113 ProcessorLevelStatus.OUT_OF_RANGE: "Out of range",
114 }
115
116 def __init__(
117 self,
118 config: RefAppConfig,
119 num_curves: int,
120 ) -> None:
121 self.num_curves = num_curves
122 self.start_m = config.start_m
123 self.end_m = config.end_m
124
125 def setup(self, win):
126 # Sweep plot
127 self.sweep_plot = win.addPlot(row=1, col=0, colspan=3)
128 self.sweep_plot.setMenuEnabled(False)
129 self.sweep_plot.showGrid(x=True, y=True)
130 self.sweep_plot.addLegend()
131 self.sweep_plot.setLabel("left", "Amplitude")
132 self.sweep_plot.setLabel("bottom", "Distance (m)")
133 self.sweep_plot.addItem(pg.PlotDataItem())
134
135 self.vertical_line_start = pg.InfiniteLine(
136 pen=et.utils.pg_pen_cycler(2),
137 label="Tank start",
138 labelOpts={
139 "position": 0.5,
140 "color": (0, 100, 0),
141 "fill": (200, 200, 200, 50),
142 "movable": True,
143 },
144 )
145 self.sweep_plot.addItem(self.vertical_line_start)
146 self.vertical_line_end = pg.InfiniteLine(
147 pen=et.utils.pg_pen_cycler(2),
148 label="Tank end",
149 labelOpts={
150 "position": 0.5,
151 "color": (0, 100, 0),
152 "fill": (200, 200, 200, 50),
153 "movable": True,
154 },
155 )
156 self.sweep_plot.addItem(self.vertical_line_end)
157
158 pen = et.utils.pg_pen_cycler(0)
159 brush = et.utils.pg_brush_cycler(0)
160 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
161 feat_kw = dict(pen=pen, **symbol_kw)
162 self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
163
164 pen = et.utils.pg_pen_cycler(1)
165 brush = et.utils.pg_brush_cycler(1)
166 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
167 feat_kw = dict(pen=pen, **symbol_kw)
168 self.threshold_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
169
170 sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
171 sweep_plot_legend.setParentItem(self.sweep_plot)
172 sweep_plot_legend.addItem(self.sweep_curves[0], "Sweep")
173 sweep_plot_legend.addItem(self.threshold_curves[0], "Threshold")
174
175 # Level history plot
176 self.level_history_plot = win.addPlot(row=0, col=1, colspan=2)
177 self.level_history_plot.setMenuEnabled(False)
178 self.level_history_plot.showGrid(x=True, y=True)
179 self.level_history_plot.addLegend()
180 self.level_history_plot.setLabel("left", "Estimated level (cm)")
181 self.level_history_plot.setLabel("bottom", "Time (s)")
182 self.level_history_plot.addItem(pg.PlotDataItem())
183
184 pen = et.utils.pg_pen_cycler(0)
185 brush = et.utils.pg_brush_cycler(0)
186 symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
187 feat_kw = dict(pen=pen, **symbol_kw)
188 self.level_history_curve = self.level_history_plot.plot(**feat_kw)
189
190 self.sweep_smooth_max = et.utils.SmoothMax()
191 self.distance_hist_smooth_lim = et.utils.SmoothLimits()
192
193 # text items
194 self.level_html_format = (
195 '<div style="text-align: center">'
196 '<span style="color: #FFFFFF;font-size:12pt;">'
197 "{}</span></div>"
198 )
199
200 self.level_text_item = pg.TextItem(
201 fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
202 anchor=(0.5, 0),
203 )
204 self.level_history_plot.addItem(self.level_text_item)
205 self.level_text_item.hide()
206
207 def update(
208 self,
209 result: RefAppResult,
210 ) -> None:
211 # Get the first element as the plugin only supports single sensor operation.
212 (detector_result,) = list(result.extra_result.detector_result.values())
213 assert detector_result.distances is not None
214
215 time_and_level_dict = (
216 result.extra_result.processor_extra_result.level_and_time_for_plotting
217 )
218
219 # clear sweep curves
220 for idx in range(len(self.sweep_curves)):
221 self.sweep_curves[idx].clear()
222 self.threshold_curves[idx].clear()
223 # update sweep plot
224 max_val_in_plot = 0
225 for idx, processor_result in enumerate(detector_result.processor_results):
226 assert processor_result.extra_result.used_threshold is not None
227 assert processor_result.extra_result.distances_m is not None
228 assert processor_result.extra_result.abs_sweep is not None
229
230 self.sweep_curves[idx].setData(
231 processor_result.extra_result.distances_m, processor_result.extra_result.abs_sweep
232 )
233
234 self.threshold_curves[idx].setData(
235 processor_result.extra_result.distances_m,
236 processor_result.extra_result.used_threshold,
237 )
238
239 max_val_in_subsweep = max(
240 max(processor_result.extra_result.used_threshold),
241 max(processor_result.extra_result.abs_sweep),
242 )
243
244 max_val_in_plot = max(max_val_in_plot, max_val_in_subsweep)
245
246 self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_plot))
247 self.vertical_line_start.setValue(self.start_m)
248 self.vertical_line_end.setValue(self.end_m)
249 self.vertical_line_start.show()
250 self.vertical_line_end.show()
251
252 # update level history plot
253 if any(~np.isnan(time_and_level_dict["level"])):
254 self.level_history_curve.setData(
255 time_and_level_dict["time"], time_and_level_dict["level"] * 100
256 )
257 self.level_history_plot.setXRange(-TIME_HISTORY_S + 1, 0)
258 self.level_history_plot.setYRange(0, (self.end_m - self.start_m + 0.01) * 100)
259
260 # update level plot
261 if (
262 result.level is not None
263 and result.peak_detected is not None
264 and result.peak_status is not None
265 ):
266 current_level = result.level
267 peak_detected = result.peak_detected
268 peak_status = result.peak_status
269 level_text = self.STATUS_MSG_MAP[peak_status]
270 if peak_detected:
271 level_text = "Level: {:.1f} cm, {:.0f} %".format(
272 current_level * 100,
273 current_level / (self.end_m - self.start_m) * 100,
274 )
275
276 level_html = self.level_html_format.format(level_text)
277 self.level_text_item.setHtml(level_html)
278
279 x_pos = -(TIME_HISTORY_S) / 2
280 y_max_cm = self.end_m * 100
281 self.level_text_item.setPos(x_pos, 0.95 * y_max_cm)
282 self.level_text_item.show()
283
284 else:
285 self.level_text_item.hide()
286
287
288if __name__ == "__main__":
289 main()
View this example on GitHub: acconeer/acconeer-python-exploration