examples/a121/algo/tank_level/tank_level_with_gui.py

examples/a121/algo/tank_level/tank_level_with_gui.py#

  1# Copyright (c) Acconeer AB, 2022-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import logging
  7
  8import attrs
  9import numpy as np
 10
 11# Added here to force pyqtgraph to choose PySide
 12import PySide6  # noqa: F401
 13
 14import pyqtgraph as pg
 15
 16import acconeer.exptool as et
 17from acconeer.exptool import a121
 18from acconeer.exptool.a121.algo.distance import (
 19    PeakSortingMethod,
 20    ReflectorShape,
 21    ThresholdMethod,
 22)
 23from acconeer.exptool.a121.algo.distance._processors import (
 24    DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE,
 25    DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE,
 26)
 27from acconeer.exptool.a121.algo.tank_level import RefApp, RefAppResult
 28from acconeer.exptool.a121.algo.tank_level._processor import ProcessorLevelStatus
 29from acconeer.exptool.a121.algo.tank_level._ref_app import RefAppConfig, RefAppContext
 30
 31
 32log = logging.getLogger(__name__)
 33
 34
 35TIME_HISTORY_S = 30
 36
 37
 38@attrs.mutable(kw_only=True)
 39class SharedState:
 40    sensor_id: int = attrs.field(default=1)
 41    config: RefAppConfig = attrs.field(factory=RefAppConfig)
 42    context: RefAppContext = attrs.field(factory=RefAppContext)
 43
 44
 45def main():
 46    args = a121.ExampleArgumentParser().parse_args()
 47    et.utils.config_logging(args)
 48
 49    # Setup the configurations
 50    # Detailed at https://docs.acconeer.com/en/latest/exploration_tool/algo/a121/ref_apps/tank_level.html
 51
 52    # Sensor selections
 53    sensor = 1
 54
 55    # Tank level configurations
 56    ref_app_config = RefAppConfig(
 57        start_m=0.03,
 58        end_m=0.5,
 59        max_step_length=2,
 60        max_profile=a121.Profile.PROFILE_2,
 61        close_range_leakage_cancellation=True,
 62        signal_quality=20,
 63        update_rate=None,
 64        median_filter_length=5,
 65        num_medians_to_average=5,
 66        threshold_method=ThresholdMethod.CFAR,
 67        reflector_shape=ReflectorShape.PLANAR,
 68        peaksorting_method=PeakSortingMethod.CLOSEST,
 69        num_frames_in_recorded_threshold=50,
 70        fixed_threshold_value=DEFAULT_FIXED_AMPLITUDE_THRESHOLD_VALUE,  # float
 71        fixed_strength_threshold_value=DEFAULT_FIXED_STRENGTH_THRESHOLD_VALUE,  # float
 72        threshold_sensitivity=0.0,  # float
 73    )
 74
 75    # End setup configurations
 76
 77    # Preparation for client
 78    client = a121.Client.open(**a121.get_client_args(args))
 79
 80    # Preparation for reference application processor
 81    ref_app = RefApp(client=client, sensor_id=sensor, config=ref_app_config)
 82    ref_app.calibrate()
 83    ref_app.start()
 84
 85    pg_updater = PGUpdater(
 86        config=ref_app_config, num_curves=len(ref_app._detector.processor_specs)
 87    )
 88    pg_process = et.PGProcess(pg_updater, max_freq=60)
 89    pg_process.start()
 90
 91    interrupt_handler = et.utils.ExampleInterruptHandler()
 92    print("Press Ctrl-C to end session")
 93
 94    while not interrupt_handler.got_signal:
 95        processed_data = ref_app.get_next()
 96        try:
 97            pg_process.put_data(processed_data)
 98        except et.PGProccessDiedException:
 99            break
100
101    ref_app.stop()
102    client.close()
103    print("Disconnecting...")
104
105
106class PGUpdater:
107    STATUS_MSG_MAP = {
108        ProcessorLevelStatus.IN_RANGE: "In range",
109        ProcessorLevelStatus.NO_DETECTION: "Not available",
110        ProcessorLevelStatus.OVERFLOW: "Warning: Overflow",
111        ProcessorLevelStatus.OUT_OF_RANGE: "Out of range",
112    }
113
114    def __init__(
115        self,
116        config: RefAppConfig,
117        num_curves: int,
118    ) -> None:
119        self.num_curves = num_curves
120        self.start_m = config.start_m
121        self.end_m = config.end_m
122
123    def setup(self, win):
124        # Sweep plot
125        self.sweep_plot = win.addPlot(row=1, col=0, colspan=3)
126        self.sweep_plot.setMenuEnabled(False)
127        self.sweep_plot.showGrid(x=True, y=True)
128        self.sweep_plot.addLegend()
129        self.sweep_plot.setLabel("left", "Amplitude")
130        self.sweep_plot.setLabel("bottom", "Distance (m)")
131        self.sweep_plot.addItem(pg.PlotDataItem())
132
133        self.vertical_line_start = pg.InfiniteLine(
134            pen=et.utils.pg_pen_cycler(2),
135            label="Tank start",
136            labelOpts={
137                "position": 0.5,
138                "color": (0, 100, 0),
139                "fill": (200, 200, 200, 50),
140                "movable": True,
141            },
142        )
143        self.sweep_plot.addItem(self.vertical_line_start)
144        self.vertical_line_end = pg.InfiniteLine(
145            pen=et.utils.pg_pen_cycler(2),
146            label="Tank end",
147            labelOpts={
148                "position": 0.5,
149                "color": (0, 100, 0),
150                "fill": (200, 200, 200, 50),
151                "movable": True,
152            },
153        )
154        self.sweep_plot.addItem(self.vertical_line_end)
155
156        pen = et.utils.pg_pen_cycler(0)
157        brush = et.utils.pg_brush_cycler(0)
158        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
159        feat_kw = dict(pen=pen, **symbol_kw)
160        self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
161
162        pen = et.utils.pg_pen_cycler(1)
163        brush = et.utils.pg_brush_cycler(1)
164        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
165        feat_kw = dict(pen=pen, **symbol_kw)
166        self.threshold_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
167
168        sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
169        sweep_plot_legend.setParentItem(self.sweep_plot)
170        sweep_plot_legend.addItem(self.sweep_curves[0], "Sweep")
171        sweep_plot_legend.addItem(self.threshold_curves[0], "Threshold")
172
173        # Level history plot
174        self.level_history_plot = win.addPlot(row=0, col=1, colspan=2)
175        self.level_history_plot.setMenuEnabled(False)
176        self.level_history_plot.showGrid(x=True, y=True)
177        self.level_history_plot.addLegend()
178        self.level_history_plot.setLabel("left", "Estimated level (cm)")
179        self.level_history_plot.setLabel("bottom", "Time (s)")
180        self.level_history_plot.addItem(pg.PlotDataItem())
181
182        pen = et.utils.pg_pen_cycler(0)
183        brush = et.utils.pg_brush_cycler(0)
184        symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
185        feat_kw = dict(pen=pen, **symbol_kw)
186        self.level_history_curve = self.level_history_plot.plot(**feat_kw)
187
188        self.sweep_smooth_max = et.utils.SmoothMax()
189        self.distance_hist_smooth_lim = et.utils.SmoothLimits()
190
191        # text items
192        self.level_html_format = (
193            '<div style="text-align: center">'
194            '<span style="color: #FFFFFF;font-size:12pt;">'
195            "{}</span></div>"
196        )
197
198        self.level_text_item = pg.TextItem(
199            fill=pg.mkColor(0x1F, 0x77, 0xB4, 180),
200            anchor=(0.5, 0),
201        )
202        self.level_history_plot.addItem(self.level_text_item)
203        self.level_text_item.hide()
204
205    def update(
206        self,
207        result: RefAppResult,
208    ) -> None:
209        # Get the first element as the plugin only supports single sensor operation.
210        (detector_result,) = list(result.extra_result.detector_result.values())
211        assert detector_result.distances is not None
212
213        time_and_level_dict = (
214            result.extra_result.processor_extra_result.level_and_time_for_plotting
215        )
216
217        # update sweep plot
218        max_val_in_plot = 0
219        for idx, processor_result in enumerate(detector_result.processor_results):
220            assert processor_result.extra_result.used_threshold is not None
221            assert processor_result.extra_result.distances_m is not None
222            assert processor_result.extra_result.abs_sweep is not None
223
224            self.sweep_curves[idx].setData(
225                processor_result.extra_result.distances_m, processor_result.extra_result.abs_sweep
226            )
227
228            self.threshold_curves[idx].setData(
229                processor_result.extra_result.distances_m,
230                processor_result.extra_result.used_threshold,
231            )
232
233            max_val_in_subsweep = max(
234                max(processor_result.extra_result.used_threshold),
235                max(processor_result.extra_result.abs_sweep),
236            )
237
238            max_val_in_plot = max(max_val_in_plot, max_val_in_subsweep)
239
240        self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_plot))
241        self.vertical_line_start.setValue(self.start_m)
242        self.vertical_line_end.setValue(self.end_m)
243        self.vertical_line_start.show()
244        self.vertical_line_end.show()
245
246        # update level history plot
247        if any(~np.isnan(time_and_level_dict["level"])):
248            self.level_history_curve.setData(
249                time_and_level_dict["time"], time_and_level_dict["level"] * 100
250            )
251        self.level_history_plot.setXRange(-TIME_HISTORY_S + 1, 0)
252        self.level_history_plot.setYRange(0, (self.end_m - self.start_m + 0.01) * 100)
253
254        # update level plot
255        if (
256            result.level is not None
257            and result.peak_detected is not None
258            and result.peak_status is not None
259        ):
260            current_level = result.level
261            peak_detected = result.peak_detected
262            peak_status = result.peak_status
263            level_text = self.STATUS_MSG_MAP[peak_status]
264            if peak_detected:
265                level_text = "Level: {:.1f} cm, {:.0f} %".format(
266                    current_level * 100,
267                    current_level / (self.end_m - self.start_m) * 100,
268                )
269
270                level_html = self.level_html_format.format(level_text)
271                self.level_text_item.setHtml(level_html)
272
273                x_pos = -(TIME_HISTORY_S) / 2
274                y_max_cm = self.end_m * 100
275                self.level_text_item.setPos(x_pos, 0.95 * y_max_cm)
276                self.level_text_item.show()
277
278            else:
279                self.level_text_item.hide()
280
281
282if __name__ == "__main__":
283    main()

View this example on GitHub: acconeer/acconeer-python-exploration