examples/a121/algo/distance/processor.py

examples/a121/algo/distance/processor.py#

  1# Copyright (c) Acconeer AB, 2022-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import copy
  7
  8import numpy as np
  9
 10import pyqtgraph as pg
 11
 12import acconeer.exptool as et
 13from acconeer.exptool import a121
 14from acconeer.exptool.a121.algo.distance import (
 15    Processor,
 16    ProcessorConfig,
 17    ProcessorContext,
 18    ProcessorMode,
 19    ThresholdMethod,
 20    calculate_bg_noise_std,
 21)
 22
 23
 24def main():
 25    args = a121.ExampleArgumentParser().parse_args()
 26    et.utils.config_logging(args)
 27
 28    client = a121.Client.open(**a121.get_client_args(args))
 29
 30    # Define sensor configuration.
 31    sensor_config = a121.SensorConfig(
 32        subsweeps=[
 33            a121.SubsweepConfig(
 34                start_point=50,
 35                step_length=4,
 36                num_points=50,
 37                profile=a121.Profile.PROFILE_1,
 38                hwaas=32,
 39                phase_enhancement=True,
 40            )
 41        ],
 42        sweeps_per_frame=1,
 43    )
 44
 45    # Calibrate noise.
 46    noise_sensor_config = copy.deepcopy(sensor_config)
 47    for subsweep in noise_sensor_config.subsweeps:
 48        # Disable Tx when measuring background noise.
 49        subsweep.enable_tx = False
 50
 51    metadata = client.setup_session(noise_sensor_config)
 52    client.start_session()
 53    result = client.get_next()
 54    client.stop_session()
 55
 56    stds = [
 57        calculate_bg_noise_std(subframe, subsweep_config)
 58        for (subframe, subsweep_config) in zip(result.subframes, noise_sensor_config.subsweeps)
 59    ]
 60
 61    # Create processor for distance estimation.
 62    distance_context = ProcessorContext(
 63        bg_noise_std=stds,
 64    )
 65    distance_config = ProcessorConfig(
 66        processor_mode=ProcessorMode.DISTANCE_ESTIMATION,
 67        threshold_method=ThresholdMethod.CFAR,
 68        threshold_sensitivity=0.8,
 69    )
 70    distance_processor = Processor(
 71        sensor_config=sensor_config,
 72        metadata=metadata,
 73        processor_config=distance_config,
 74        context=distance_context,
 75    )
 76
 77    pg_updater = PGUpdater()
 78    pg_process = et.PGProcess(pg_updater)
 79    pg_process.start()
 80
 81    metadata = client.setup_session(sensor_config)
 82    client.start_session()
 83
 84    interrupt_handler = et.utils.ExampleInterruptHandler()
 85    print("Press Ctrl-C to end session")
 86
 87    while not interrupt_handler.got_signal:
 88        extended_result = client.get_next()
 89        processed_data = distance_processor.process(extended_result)
 90        try:
 91            pg_process.put_data(processed_data)
 92        except et.PGProccessDiedException:
 93            break
 94
 95    print("Disconnecting...")
 96    pg_process.close()
 97    client.close()
 98
 99
100class PGUpdater:
101    def __init__(self):
102        self.history = [np.nan] * 100
103
104    def setup(self, win):
105        self.sweep_plot = win.addPlot(row=0, col=0)
106        self.sweep_plot.setMenuEnabled(False)
107        self.sweep_plot.showGrid(x=True, y=True)
108        self.sweep_plot.addLegend()
109        self.sweep_plot.setLabel("left", "Amplitude")
110        self.sweep_plot.setLabel("bottom", "Distance(m)")
111        self.sweep_plot.addItem(pg.PlotDataItem())
112
113        legends = ["Sweep", "Threshold"]
114        self.curves = {}
115        for i, legend in enumerate(legends):
116            pen = et.utils.pg_pen_cycler(i)
117            brush = et.utils.pg_brush_cycler(i)
118            symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
119            feat_kw = dict(pen=pen, **symbol_kw)
120            self.curves[legend] = self.sweep_plot.plot(**feat_kw, name=legends[i])
121
122        self.smooth_max = et.utils.SmoothMax()
123
124        self.dist_history_plot = win.addPlot(row=1, col=0)
125        self.dist_history_plot.setMenuEnabled(False)
126        self.dist_history_plot.showGrid(x=True, y=True)
127        self.dist_history_plot.addLegend()
128        self.dist_history_plot.setLabel("left", "Estimated distance (m)")
129        self.dist_history_plot.setLabel("bottom", "History (frame)")
130        self.dist_history_plot.addItem(pg.PlotDataItem())
131        self.dist_history_plot.setXRange(0, len(self.history))
132
133        pen = et.utils.pg_pen_cycler(0)
134        brush = et.utils.pg_brush_cycler(0)
135        symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
136        feat_kw = dict(pen=pen, **symbol_kw)
137        self.dist_history_curve = self.dist_history_plot.plot(**feat_kw)
138
139    def update(self, d):
140        sweep = d.extra_result.abs_sweep
141        threshold = d.extra_result.used_threshold
142        distances = d.extra_result.distances_m
143
144        self.history.pop(0)
145        if len(d.estimated_distances) != 0:
146            self.history.append(d.estimated_distances[0])
147        else:
148            self.history.append(np.nan)
149
150        self.curves["Sweep"].setData(distances, sweep)
151        self.curves["Threshold"].setData(distances, threshold)
152        self.sweep_plot.setYRange(
153            0,
154            self.smooth_max.update(np.amax(np.concatenate((sweep, threshold)))),
155        )
156
157        if not np.all(np.isnan(self.history)):
158            self.dist_history_curve.setData(self.history)
159
160
161if __name__ == "__main__":
162    main()

View this example on GitHub: acconeer/acconeer-python-exploration