examples/a121/algo/tank_level/tank_level_with_gui.py#
1# Copyright (c) Acconeer AB, 2022-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import logging
7
8import attrs
9import numpy as np
10
11# Added here to force pyqtgraph to choose PySide
12import PySide6 # noqa: F401
13
14import pyqtgraph as pg
15
16import acconeer.exptool as et
17from acconeer.exptool import a121
18from acconeer.exptool.a121.algo.distance import (
19 PeakSortingMethod,
20 ReflectorShape,
21 ThresholdMethod,
22)
23from acconeer.exptool.a121.algo.distance._processors import (
24 DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE,
25 DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE,
26)
27from acconeer.exptool.a121.algo.tank_level import RefApp, RefAppResult
28from acconeer.exptool.a121.algo.tank_level._processor import ProcessorLevelStatus
29from acconeer.exptool.a121.algo.tank_level._ref_app import RefAppConfig, RefAppContext
30
31
32log = logging.getLogger(__name__)
33
34
35TIME_HISTORY_S = 30
36
37
38@attrs.mutable(kw_only=True)
39class SharedState:
40 sensor_id: int = attrs.field(default=1)
41 config: RefAppConfig = attrs.field(factory=RefAppConfig)
42 context: RefAppContext = attrs.field(factory=RefAppContext)
43
44
45def main():
46 args = a121.ExampleArgumentParser().parse_args()
47 et.utils.config_logging(args)
48
49 # Setup the configurations
50 # Detailed at https://docs.acconeer.com/en/latest/exploration_tool/algo/a121/ref_apps/tank_level.html
51
52 # Sensor selections
53 sensor = 1
54
55 # Tank level configurations
56 ref_app_config = RefAppConfig(
57 start_m=0.03,
58 end_m=0.5,
59 max_step_length=2,
60 max_profile=a121.Profile.PROFILE_2,
61 close_range_leakage_cancellation=True,
62 signal_quality=20,
63 update_rate=None,
64 median_filter_length=5,
65 num_medians_to_average=5,
66 threshold_method=ThresholdMethod.CFAR,
67 reflector_shape=ReflectorShape.PLANAR,
68 peaksorting_method=PeakSortingMethod.CLOSEST,
69 num_frames_in_recorded_threshold=50,
70 fixed_threshold_value=DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE, # float
71 fixed_strength_threshold_value=DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE, # float
72 threshold_sensitivity=0.0, # float
73 )
74
75 # End setup configurations
76
77 # Preparation for client
78 client = a121.Client.open(**a121.get_client_args(args))
79
80 # Preparation for reference application processor
81 ref_app = RefApp(client=client, sensor_id=sensor, config=ref_app_config)
82 ref_app.calibrate()
83 ref_app.start()
84
85 pg_updater = PGUpdater(
86 config=ref_app_config, num_curves=len(ref_app._detector.processor_specs)
87 )
88 pg_process = et.PGProcess(pg_updater, max_freq=60)
89 pg_process.start()
90
91 interrupt_handler = et.utils.ExampleInterruptHandler()
92 print("Press Ctrl-C to end session")
93
94 while not interrupt_handler.got_signal:
95 processed_data = ref_app.get_next()
96 try:
97 pg_process.put_data(processed_data)
98 except et.PGProccessDiedException:
99 break
100
101 ref_app.stop()
102 client.close()
103 print("Disconnecting...")
104
105
106class PGUpdater:
107 STATUS_MSG_MAP = {
108 ProcessorLevelStatus.IN_RANGE: "In range",
109 ProcessorLevelStatus.NO_DETECTION: "Not available",
110 ProcessorLevelStatus.OVERFLOW: "Warning: Overflow",
111 ProcessorLevelStatus.OUT_OF_RANGE: "Out of range",
112 }
113
114 def __init__(
115 self,
116 config: RefAppConfig,
117 num_curves: int,
118 ) -> None:
119 self.num_curves = num_curves
120 self.start_m = config.start_m
121 self.end_m = config.end_m
122
123 def setup(self, win):
124 # Sweep plot
125 self.sweep_plot = win.addPlot(row=1, col=0, colspan=3)
126 self.sweep_plot.setMenuEnabled(False)
127 self.sweep_plot.showGrid(x=True, y=True)
128 self.sweep_plot.addLegend()
129 self.sweep_plot.setLabel("left", "Amplitude")
130 self.sweep_plot.setLabel("bottom", "Distance (m)")
131 self.sweep_plot.addItem(pg.PlotDataItem())
132
133 self.vertical_line_start = pg.InfiniteLine(
134 pen=et.utils.pg_pen_cycler(2),
135 label="Tank start",
136 labelOpts={
137 "position": 0.5,
138 "color": (0, 100, 0),
139 "fill": (200, 200, 200, 50),
140 "movable": True,
141 },
142 )
143 self.sweep_plot.addItem(self.vertical_line_start)
144 self.vertical_line_end = pg.InfiniteLine(
145 pen=et.utils.pg_pen_cycler(2),
146 label="Tank end",
147 labelOpts={
148 "position": 0.5,
149 "color": (0, 100, 0),
150 "fill": (200, 200, 200, 50),
151 "movable": True,
152 },
153 )
154 self.sweep_plot.addItem(self.vertical_line_end)
155
156 pen = et.utils.pg_pen_cycler(0)
157 brush = et.utils.pg_brush_cycler(0)
158 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
159 feat_kw = dict(pen=pen, **symbol_kw)
160 self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
161
162 pen = et.utils.pg_pen_cycler(1)
163 brush = et.utils.pg_brush_cycler(1)
164 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
165 feat_kw = dict(pen=pen, **symbol_kw)
166 self.threshold_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
167
168 sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
169 sweep_plot_legend.setParentItem(self.sweep_plot)
170 sweep_plot_legend.addItem(self.sweep_curves[0], "Sweep")
171 sweep_plot_legend.addItem(self.threshold_curves[0], "Threshold")
172
173 # Level history plot
174 self.level_history_plot = win.addPlot(row=0, col=1, colspan=2)
175 self.level_history_plot.setMenuEnabled(False)
176 self.level_history_plot.showGrid(x=True, y=True)
177 self.level_history_plot.addLegend()
178 self.level_history_plot.setLabel("left", "Estimated level (cm)")
179 self.level_history_plot.setLabel("bottom", "Time (s)")
180 self.level_history_plot.addItem(pg.PlotDataItem())
181
182 pen = et.utils.pg_pen_cycler(0)
183 brush = et.utils.pg_brush_cycler(0)
184 symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
185 feat_kw = dict(pen=pen, **symbol_kw)
186 self.level_history_curve = self.level_history_plot.plot(**feat_kw)
187
188 self.sweep_smooth_max = et.utils.SmoothMax()
189 self.distance_hist_smooth_lim = et.utils.SmoothLimits()
190
191 # text items
192 self.level_html_format = (
193 '<div style="text-align: center">'
194 '<span style="color: #FFFFFF;font-size:12pt;">'
195 "{}</span></div>"
196 )
197
198 self.level_text_item = pg.TextItem(
199 fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
200 anchor=(0.5, 0),
201 )
202 self.level_history_plot.addItem(self.level_text_item)
203 self.level_text_item.hide()
204
205 def update(
206 self,
207 result: RefAppResult,
208 ) -> None:
209 # Get the first element as the plugin only supports single sensor operation.
210 (detector_result,) = list(result.extra_result.detector_result.values())
211 assert detector_result.distances is not None
212
213 time_and_level_dict = (
214 result.extra_result.processor_extra_result.level_and_time_for_plotting
215 )
216
217 # update sweep plot
218 max_val_in_plot = 0
219 for idx, processor_result in enumerate(detector_result.processor_results):
220 assert processor_result.extra_result.used_threshold is not None
221 assert processor_result.extra_result.distances_m is not None
222 assert processor_result.extra_result.abs_sweep is not None
223
224 self.sweep_curves[idx].setData(
225 processor_result.extra_result.distances_m, processor_result.extra_result.abs_sweep
226 )
227
228 self.threshold_curves[idx].setData(
229 processor_result.extra_result.distances_m,
230 processor_result.extra_result.used_threshold,
231 )
232
233 max_val_in_subsweep = max(
234 max(processor_result.extra_result.used_threshold),
235 max(processor_result.extra_result.abs_sweep),
236 )
237
238 max_val_in_plot = max(max_val_in_plot, max_val_in_subsweep)
239
240 self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_plot))
241 self.vertical_line_start.setValue(self.start_m)
242 self.vertical_line_end.setValue(self.end_m)
243 self.vertical_line_start.show()
244 self.vertical_line_end.show()
245
246 # update level history plot
247 if any(~np.isnan(time_and_level_dict["level"])):
248 self.level_history_curve.setData(
249 time_and_level_dict["time"], time_and_level_dict["level"] * 100
250 )
251 self.level_history_plot.setXRange(-TIME_HISTORY_S + 1, 0)
252 self.level_history_plot.setYRange(0, (self.end_m - self.start_m + 0.01) * 100)
253
254 # update level plot
255 if (
256 result.level is not None
257 and result.peak_detected is not None
258 and result.peak_status is not None
259 ):
260 current_level = result.level
261 peak_detected = result.peak_detected
262 peak_status = result.peak_status
263 level_text = self.STATUS_MSG_MAP[peak_status]
264 if peak_detected:
265 level_text = "Level: {:.1f} cm, {:.0f} %".format(
266 current_level * 100,
267 current_level / (self.end_m - self.start_m) * 100,
268 )
269
270 level_html = self.level_html_format.format(level_text)
271 self.level_text_item.setHtml(level_html)
272
273 x_pos = -(TIME_HISTORY_S) / 2
274 y_max_cm = self.end_m * 100
275 self.level_text_item.setPos(x_pos, 0.95 * y_max_cm)
276 self.level_text_item.show()
277
278 else:
279 self.level_text_item.hide()
280
281
282if __name__ == "__main__":
283 main()
View this example on GitHub: acconeer/acconeer-python-exploration