examples/a121/algo/tank_level/tank_level_with_gui.py

examples/a121/algo/tank_level/tank_level_with_gui.py#

  1# Copyright (c) Acconeer AB, 2022-2025
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import logging
  7
  8import attrs
  9import numpy as np
 10
 11# Added here to force pyqtgraph to choose PySide
 12import PySide6  # noqa: F401
 13
 14import pyqtgraph as pg
 15
 16import acconeer.exptool as et
 17from acconeer.exptool import a121
 18from acconeer.exptool.a121.algo.distance import (
 19    PeakSortingMethod,
 20    ReflectorShape,
 21    ThresholdMethod,
 22)
 23from acconeer.exptool.a121.algo.distance._processors import (
 24    DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE,
 25    DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE,
 26)
 27from acconeer.exptool.a121.algo.tank_level import RefApp, RefAppResult
 28from acconeer.exptool.a121.algo.tank_level._processor import ProcessorLevelStatus
 29from acconeer.exptool.a121.algo.tank_level._ref_app import RefAppConfig, RefAppContext
 30
 31
 32log = logging.getLogger(__name__)
 33
 34
 35TIME_HISTORY_S = 30
 36
 37
 38@attrs.mutable(kw_only=True)
 39class SharedState:
 40    sensor_id: int = attrs.field(default=1)
 41    config: RefAppConfig = attrs.field(factory=RefAppConfig)
 42    context: RefAppContext = attrs.field(factory=RefAppContext)
 43
 44
 45def main():
 46    args = a121.ExampleArgumentParser().parse_args()
 47    et.utils.config_logging(args)
 48
 49    # Setup the configurations
 50    # Detailed at https://docs.acconeer.com/en/latest/exploration_tool/algo/a121/ref_apps/tank_level.html
 51
 52    # Sensor selections
 53    sensor = 1
 54
 55    # Tank level configurations
 56    ref_app_config = RefAppConfig(
 57        start_m=0.03,
 58        end_m=0.5,
 59        max_step_length=2,
 60        max_profile=a121.Profile.PROFILE_2,
 61        close_range_leakage_cancellation=True,
 62        signal_quality=20,
 63        update_rate=None,
 64        median_filter_length=5,
 65        num_medians_to_average=5,
 66        threshold_method=ThresholdMethod.CFAR,
 67        reflector_shape=ReflectorShape.PLANAR,
 68        peaksorting_method=PeakSortingMethod.CLOSEST,
 69        num_frames_in_recorded_threshold=50,
 70        fixed_threshold_value=DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE,  # float
 71        fixed_strength_threshold_value=DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE,  # float
 72        threshold_sensitivity=0.0,  # float
 73        level_tracking_active=False,
 74        partial_tracking_range_m=0.0,
 75    )
 76
 77    # End setup configurations
 78
 79    # Preparation for client
 80    client = a121.Client.open(**a121.get_client_args(args))
 81
 82    # Preparation for reference application processor
 83    ref_app = RefApp(client=client, sensor_id=sensor, config=ref_app_config)
 84    ref_app.calibrate()
 85    ref_app.start()
 86
 87    pg_updater = PGUpdater(
 88        config=ref_app_config, num_curves=len(ref_app._detector.processor_specs)
 89    )
 90    pg_process = et.PGProcess(pg_updater, max_freq=60)
 91    pg_process.start()
 92
 93    interrupt_handler = et.utils.ExampleInterruptHandler()
 94    print("Press Ctrl-C to end session")
 95
 96    while not interrupt_handler.got_signal:
 97        processed_data = ref_app.get_next()
 98        try:
 99            pg_process.put_data(processed_data)
100        except et.PGProccessDiedException:
101            break
102
103    ref_app.stop()
104    client.close()
105    print("Disconnecting...")
106
107
108class PGUpdater:
109    STATUS_MSG_MAP = {
110        ProcessorLevelStatus.IN_RANGE: "In range",
111        ProcessorLevelStatus.NO_DETECTION: "Not available",
112        ProcessorLevelStatus.OVERFLOW: "Warning: Overflow",
113        ProcessorLevelStatus.OUT_OF_RANGE: "Out of range",
114    }
115
116    def __init__(
117        self,
118        config: RefAppConfig,
119        num_curves: int,
120    ) -> None:
121        self.num_curves = num_curves
122        self.start_m = config.start_m
123        self.end_m = config.end_m
124
125    def setup(self, win):
126        # Sweep plot
127        self.sweep_plot = win.addPlot(row=1, col=0, colspan=3)
128        self.sweep_plot.setMenuEnabled(False)
129        self.sweep_plot.showGrid(x=True, y=True)
130        self.sweep_plot.addLegend()
131        self.sweep_plot.setLabel("left", "Amplitude")
132        self.sweep_plot.setLabel("bottom", "Distance (m)")
133        self.sweep_plot.addItem(pg.PlotDataItem())
134
135        self.vertical_line_start = pg.InfiniteLine(
136            pen=et.utils.pg_pen_cycler(2),
137            label="Tank start",
138            labelOpts={
139                "position": 0.5,
140                "color": (0, 100, 0),
141                "fill": (200, 200, 200, 50),
142                "movable": True,
143            },
144        )
145        self.sweep_plot.addItem(self.vertical_line_start)
146        self.vertical_line_end = pg.InfiniteLine(
147            pen=et.utils.pg_pen_cycler(2),
148            label="Tank end",
149            labelOpts={
150                "position": 0.5,
151                "color": (0, 100, 0),
152                "fill": (200, 200, 200, 50),
153                "movable": True,
154            },
155        )
156        self.sweep_plot.addItem(self.vertical_line_end)
157
158        pen = et.utils.pg_pen_cycler(0)
159        brush = et.utils.pg_brush_cycler(0)
160        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
161        feat_kw = dict(pen=pen, **symbol_kw)
162        self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
163
164        pen = et.utils.pg_pen_cycler(1)
165        brush = et.utils.pg_brush_cycler(1)
166        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
167        feat_kw = dict(pen=pen, **symbol_kw)
168        self.threshold_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
169
170        sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
171        sweep_plot_legend.setParentItem(self.sweep_plot)
172        sweep_plot_legend.addItem(self.sweep_curves[0], "Sweep")
173        sweep_plot_legend.addItem(self.threshold_curves[0], "Threshold")
174
175        # Level history plot
176        self.level_history_plot = win.addPlot(row=0, col=1, colspan=2)
177        self.level_history_plot.setMenuEnabled(False)
178        self.level_history_plot.showGrid(x=True, y=True)
179        self.level_history_plot.addLegend()
180        self.level_history_plot.setLabel("left", "Estimated level (cm)")
181        self.level_history_plot.setLabel("bottom", "Time (s)")
182        self.level_history_plot.addItem(pg.PlotDataItem())
183
184        pen = et.utils.pg_pen_cycler(0)
185        brush = et.utils.pg_brush_cycler(0)
186        symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
187        feat_kw = dict(pen=pen, **symbol_kw)
188        self.level_history_curve = self.level_history_plot.plot(**feat_kw)
189
190        self.sweep_smooth_max = et.utils.SmoothMax()
191        self.distance_hist_smooth_lim = et.utils.SmoothLimits()
192
193        # text items
194        self.level_html_format = (
195            '<div style="text-align: center">'
196            '<span style="color: #FFFFFF;font-size:12pt;">'
197            "{}</span></div>"
198        )
199
200        self.level_text_item = pg.TextItem(
201            fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
202            anchor=(0.5, 0),
203        )
204        self.level_history_plot.addItem(self.level_text_item)
205        self.level_text_item.hide()
206
207    def update(
208        self,
209        result: RefAppResult,
210    ) -> None:
211        # Get the first element as the plugin only supports single sensor operation.
212        (detector_result,) = list(result.extra_result.detector_result.values())
213        assert detector_result.distances is not None
214
215        time_and_level_dict = (
216            result.extra_result.processor_extra_result.level_and_time_for_plotting
217        )
218
219        # clear sweep curves
220        for idx in range(len(self.sweep_curves)):
221            self.sweep_curves[idx].clear()
222            self.threshold_curves[idx].clear()
223        # update sweep plot
224        max_val_in_plot = 0
225        for idx, processor_result in enumerate(detector_result.processor_results):
226            assert processor_result.extra_result.used_threshold is not None
227            assert processor_result.extra_result.distances_m is not None
228            assert processor_result.extra_result.abs_sweep is not None
229
230            self.sweep_curves[idx].setData(
231                processor_result.extra_result.distances_m, processor_result.extra_result.abs_sweep
232            )
233
234            self.threshold_curves[idx].setData(
235                processor_result.extra_result.distances_m,
236                processor_result.extra_result.used_threshold,
237            )
238
239            max_val_in_subsweep = max(
240                max(processor_result.extra_result.used_threshold),
241                max(processor_result.extra_result.abs_sweep),
242            )
243
244            max_val_in_plot = max(max_val_in_plot, max_val_in_subsweep)
245
246        self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_plot))
247        self.vertical_line_start.setValue(self.start_m)
248        self.vertical_line_end.setValue(self.end_m)
249        self.vertical_line_start.show()
250        self.vertical_line_end.show()
251
252        # update level history plot
253        if any(~np.isnan(time_and_level_dict["level"])):
254            self.level_history_curve.setData(
255                time_and_level_dict["time"], time_and_level_dict["level"] * 100
256            )
257        self.level_history_plot.setXRange(-TIME_HISTORY_S + 1, 0)
258        self.level_history_plot.setYRange(0, (self.end_m - self.start_m + 0.01) * 100)
259
260        # update level plot
261        if (
262            result.level is not None
263            and result.peak_detected is not None
264            and result.peak_status is not None
265        ):
266            current_level = result.level
267            peak_detected = result.peak_detected
268            peak_status = result.peak_status
269            level_text = self.STATUS_MSG_MAP[peak_status]
270            if peak_detected:
271                level_text = "Level: {:.1f} cm, {:.0f} %".format(
272                    current_level * 100,
273                    current_level / (self.end_m - self.start_m) * 100,
274                )
275
276                level_html = self.level_html_format.format(level_text)
277                self.level_text_item.setHtml(level_html)
278
279                x_pos = -(TIME_HISTORY_S) / 2
280                y_max_cm = self.end_m * 100
281                self.level_text_item.setPos(x_pos, 0.95 * y_max_cm)
282                self.level_text_item.show()
283
284            else:
285                self.level_text_item.hide()
286
287
288if __name__ == "__main__":
289    main()

View this example on GitHub: acconeer/acconeer-python-exploration