examples/a121/algo/distance/processor.py

examples/a121/algo/distance/processor.py#

  1# Copyright (c) Acconeer AB, 2022-2025
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import copy
  7
  8import numpy as np
  9
 10import pyqtgraph as pg
 11
 12import acconeer.exptool as et
 13from acconeer.exptool import a121
 14from acconeer.exptool.a121.algo.distance import (
 15    Processor,
 16    ProcessorConfig,
 17    ProcessorContext,
 18    ProcessorMode,
 19    ThresholdMethod,
 20    calculate_bg_noise_std,
 21)
 22
 23
 24def main():
 25    args = a121.ExampleArgumentParser().parse_args()
 26    et.utils.config_logging(args)
 27
 28    client = a121.Client.open(**a121.get_client_args(args))
 29
 30    # Define sensor configuration.
 31    sensor_config = a121.SensorConfig(
 32        subsweeps=[
 33            a121.SubsweepConfig(
 34                start_point=50,
 35                step_length=4,
 36                num_points=50,
 37                profile=a121.Profile.PROFILE_1,
 38                hwaas=32,
 39                phase_enhancement=True,
 40                iq_imbalance_compensation=True,
 41            )
 42        ],
 43        sweeps_per_frame=1,
 44    )
 45
 46    # Calibrate noise.
 47    noise_sensor_config = copy.deepcopy(sensor_config)
 48    for subsweep in noise_sensor_config.subsweeps:
 49        # Disable Tx when measuring background noise.
 50        subsweep.enable_tx = False
 51
 52    metadata = client.setup_session(noise_sensor_config)
 53    client.start_session()
 54    result = client.get_next()
 55    client.stop_session()
 56
 57    stds = [
 58        calculate_bg_noise_std(subframe, subsweep_config)
 59        for (subframe, subsweep_config) in zip(result.subframes, noise_sensor_config.subsweeps)
 60    ]
 61
 62    # Create processor for distance estimation.
 63    distance_context = ProcessorContext(
 64        bg_noise_std=stds,
 65    )
 66    distance_config = ProcessorConfig(
 67        processor_mode=ProcessorMode.DISTANCE_ESTIMATION,
 68        threshold_method=ThresholdMethod.CFAR,
 69        threshold_sensitivity=0.8,
 70    )
 71    distance_processor = Processor(
 72        sensor_config=sensor_config,
 73        metadata=metadata,
 74        processor_config=distance_config,
 75        context=distance_context,
 76    )
 77
 78    pg_updater = PGUpdater()
 79    pg_process = et.PGProcess(pg_updater)
 80    pg_process.start()
 81
 82    metadata = client.setup_session(sensor_config)
 83    client.start_session()
 84
 85    interrupt_handler = et.utils.ExampleInterruptHandler()
 86    print("Press Ctrl-C to end session")
 87
 88    while not interrupt_handler.got_signal:
 89        extended_result = client.get_next()
 90        processed_data = distance_processor.process(extended_result)
 91        try:
 92            pg_process.put_data(processed_data)
 93        except et.PGProccessDiedException:
 94            break
 95
 96    print("Disconnecting...")
 97    pg_process.close()
 98    client.close()
 99
100
101class PGUpdater:
102    def __init__(self):
103        self.history = [np.nan] * 100
104
105    def setup(self, win):
106        self.sweep_plot = win.addPlot(row=0, col=0)
107        self.sweep_plot.setMenuEnabled(False)
108        self.sweep_plot.showGrid(x=True, y=True)
109        self.sweep_plot.addLegend()
110        self.sweep_plot.setLabel("left", "Amplitude")
111        self.sweep_plot.setLabel("bottom", "Distance(m)")
112        self.sweep_plot.addItem(pg.PlotDataItem())
113
114        legends = ["Sweep", "Threshold"]
115        self.curves = {}
116        for i, legend in enumerate(legends):
117            pen = et.utils.pg_pen_cycler(i)
118            brush = et.utils.pg_brush_cycler(i)
119            symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
120            feat_kw = dict(pen=pen, **symbol_kw)
121            self.curves[legend] = self.sweep_plot.plot(**feat_kw, name=legends[i])
122
123        self.smooth_max = et.utils.SmoothMax()
124
125        self.dist_history_plot = win.addPlot(row=1, col=0)
126        self.dist_history_plot.setMenuEnabled(False)
127        self.dist_history_plot.showGrid(x=True, y=True)
128        self.dist_history_plot.addLegend()
129        self.dist_history_plot.setLabel("left", "Estimated distance (m)")
130        self.dist_history_plot.setLabel("bottom", "History (frame)")
131        self.dist_history_plot.addItem(pg.PlotDataItem())
132        self.dist_history_plot.setXRange(0, len(self.history))
133
134        pen = et.utils.pg_pen_cycler(0)
135        brush = et.utils.pg_brush_cycler(0)
136        symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
137        feat_kw = dict(pen=pen, **symbol_kw)
138        self.dist_history_curve = self.dist_history_plot.plot(**feat_kw)
139
140    def update(self, d):
141        sweep = d.extra_result.abs_sweep
142        threshold = d.extra_result.used_threshold
143        distances = d.extra_result.distances_m
144
145        self.history.pop(0)
146        if len(d.estimated_distances) != 0:
147            self.history.append(d.estimated_distances[0])
148        else:
149            self.history.append(np.nan)
150
151        self.curves["Sweep"].setData(distances, sweep)
152        self.curves["Threshold"].setData(distances, threshold)
153        self.sweep_plot.setYRange(
154            0,
155            self.smooth_max.update(np.amax(np.concatenate((sweep, threshold)))),
156        )
157
158        if not np.all(np.isnan(self.history)):
159            self.dist_history_curve.setData(self.history)
160
161
162if __name__ == "__main__":
163    main()

View this example on GitHub: acconeer/acconeer-python-exploration