examples/a121/algo/distance/detector.py

examples/a121/algo/distance/detector.py#

  1# Copyright (c) Acconeer AB, 2022-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo.distance import Detector, DetectorConfig, ThresholdMethod
 16
 17
 18SENSOR_ID = 1
 19
 20
 21def main():
 22    args = a121.ExampleArgumentParser().parse_args()
 23    et.utils.config_logging(args)
 24
 25    client = a121.Client.open(**a121.get_client_args(args))
 26    detector_config = DetectorConfig(
 27        start_m=0.0,
 28        end_m=2.0,
 29        max_profile=a121.Profile.PROFILE_3,
 30        max_step_length=12,
 31        threshold_method=ThresholdMethod.RECORDED,
 32    )
 33    detector = Detector(client=client, sensor_ids=[SENSOR_ID], detector_config=detector_config)
 34
 35    detector.calibrate_detector()
 36
 37    detector.start()
 38
 39    pg_updater = PGUpdater(num_curves=len(detector.processor_specs))
 40    pg_process = et.PGProcess(pg_updater)
 41    pg_process.start()
 42
 43    interrupt_handler = et.utils.ExampleInterruptHandler()
 44    print("Press Ctrl-C to end session")
 45
 46    while not interrupt_handler.got_signal:
 47        detector_result = detector.get_next()
 48        try:
 49            pg_process.put_data(detector_result)
 50        except et.PGProccessDiedException:
 51            break
 52
 53    detector.stop()
 54
 55    print("Disconnecting...")
 56    client.close()
 57
 58
 59class PGUpdater:
 60    def __init__(self, num_curves):
 61        self.num_curves = num_curves
 62        self.distance_history = [np.nan] * 100
 63
 64    def setup(self, win):
 65        self.sweep_plot = win.addPlot(row=0, col=0)
 66        self.sweep_plot.setMenuEnabled(False)
 67        self.sweep_plot.showGrid(x=True, y=True)
 68        self.sweep_plot.addLegend()
 69        self.sweep_plot.setLabel("left", "Amplitude")
 70        self.sweep_plot.addItem(pg.PlotDataItem())
 71
 72        pen = et.utils.pg_pen_cycler(0)
 73        brush = et.utils.pg_brush_cycler(0)
 74        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
 75        feat_kw = dict(pen=pen, **symbol_kw)
 76        self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
 77
 78        pen = et.utils.pg_pen_cycler(1)
 79        brush = et.utils.pg_brush_cycler(1)
 80        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
 81        feat_kw = dict(pen=pen, **symbol_kw)
 82        self.threshold_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
 83
 84        self.dist_history_plot = win.addPlot(row=1, col=0)
 85        self.dist_history_plot.setMenuEnabled(False)
 86        self.dist_history_plot.showGrid(x=True, y=True)
 87        self.dist_history_plot.addLegend()
 88        self.dist_history_plot.setLabel("left", "Estimated_distance")
 89        self.dist_history_plot.addItem(pg.PlotDataItem())
 90
 91        pen = et.utils.pg_pen_cycler(0)
 92        brush = et.utils.pg_brush_cycler(0)
 93        symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
 94        feat_kw = dict(pen=pen, **symbol_kw)
 95        self.dist_history_curve = self.dist_history_plot.plot(**feat_kw)
 96
 97        self.distance_hist_smooth_lim = et.utils.SmoothLimits()
 98
 99    def update(self, multi_sensor_result):
100        # Get the first element as the example only supports single sensor operation.
101        result = multi_sensor_result[SENSOR_ID]
102        self.distance_history.pop(0)
103        if len(result.distances) != 0:
104            self.distance_history.append(result.distances[0])
105        else:
106            self.distance_history.append(np.nan)
107
108        for idx, processor_result in enumerate(result.processor_results):
109            threshold = processor_result.extra_result.used_threshold
110            valid_threshold_idx = np.where(~np.isnan(threshold))[0]
111            threshold = threshold[valid_threshold_idx]
112            self.sweep_curves[idx].setData(
113                processor_result.extra_result.distances_m, processor_result.extra_result.abs_sweep
114            )
115            self.threshold_curves[idx].setData(
116                processor_result.extra_result.distances_m[valid_threshold_idx], threshold
117            )
118        if np.any(~np.isnan(self.distance_history)):
119            self.dist_history_curve.setData(self.distance_history)
120            lims = self.distance_hist_smooth_lim.update(self.distance_history)
121            self.dist_history_plot.setYRange(lims[0], lims[1])
122        else:
123            self.dist_history_curve.setData([])
124
125
126if __name__ == "__main__":
127    main()

View this example on GitHub: acconeer/acconeer-python-exploration