examples/a121/algo/distance/processor.py#
1# Copyright (c) Acconeer AB, 2022-2025
2# All rights reserved
3
4from __future__ import annotations
5
6import copy
7
8import numpy as np
9
10import pyqtgraph as pg
11
12import acconeer.exptool as et
13from acconeer.exptool import a121
14from acconeer.exptool.a121.algo.distance import (
15 Processor,
16 ProcessorConfig,
17 ProcessorContext,
18 ProcessorMode,
19 ThresholdMethod,
20 calculate_bg_noise_std,
21)
22
23
24def main():
25 args = a121.ExampleArgumentParser().parse_args()
26 et.utils.config_logging(args)
27
28 client = a121.Client.open(**a121.get_client_args(args))
29
30 # Define sensor configuration.
31 sensor_config = a121.SensorConfig(
32 subsweeps=[
33 a121.SubsweepConfig(
34 start_point=50,
35 step_length=4,
36 num_points=50,
37 profile=a121.Profile.PROFILE_1,
38 hwaas=32,
39 phase_enhancement=True,
40 iq_imbalance_compensation=True,
41 )
42 ],
43 sweeps_per_frame=1,
44 )
45
46 # Calibrate noise.
47 noise_sensor_config = copy.deepcopy(sensor_config)
48 for subsweep in noise_sensor_config.subsweeps:
49 # Disable Tx when measuring background noise.
50 subsweep.enable_tx = False
51
52 metadata = client.setup_session(noise_sensor_config)
53 client.start_session()
54 result = client.get_next()
55 client.stop_session()
56
57 stds = [
58 calculate_bg_noise_std(subframe, subsweep_config)
59 for (subframe, subsweep_config) in zip(result.subframes, noise_sensor_config.subsweeps)
60 ]
61
62 # Create processor for distance estimation.
63 distance_context = ProcessorContext(
64 bg_noise_std=stds,
65 )
66 distance_config = ProcessorConfig(
67 processor_mode=ProcessorMode.DISTANCE_ESTIMATION,
68 threshold_method=ThresholdMethod.CFAR,
69 threshold_sensitivity=0.8,
70 )
71 distance_processor = Processor(
72 sensor_config=sensor_config,
73 metadata=metadata,
74 processor_config=distance_config,
75 context=distance_context,
76 )
77
78 pg_updater = PGUpdater()
79 pg_process = et.PGProcess(pg_updater)
80 pg_process.start()
81
82 metadata = client.setup_session(sensor_config)
83 client.start_session()
84
85 interrupt_handler = et.utils.ExampleInterruptHandler()
86 print("Press Ctrl-C to end session")
87
88 while not interrupt_handler.got_signal:
89 extended_result = client.get_next()
90 processed_data = distance_processor.process(extended_result)
91 try:
92 pg_process.put_data(processed_data)
93 except et.PGProccessDiedException:
94 break
95
96 print("Disconnecting...")
97 pg_process.close()
98 client.close()
99
100
101class PGUpdater:
102 def __init__(self):
103 self.history = [np.nan] * 100
104
105 def setup(self, win):
106 self.sweep_plot = win.addPlot(row=0, col=0)
107 self.sweep_plot.setMenuEnabled(False)
108 self.sweep_plot.showGrid(x=True, y=True)
109 self.sweep_plot.addLegend()
110 self.sweep_plot.setLabel("left", "Amplitude")
111 self.sweep_plot.setLabel("bottom", "Distance(m)")
112 self.sweep_plot.addItem(pg.PlotDataItem())
113
114 legends = ["Sweep", "Threshold"]
115 self.curves = {}
116 for i, legend in enumerate(legends):
117 pen = et.utils.pg_pen_cycler(i)
118 brush = et.utils.pg_brush_cycler(i)
119 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
120 feat_kw = dict(pen=pen, **symbol_kw)
121 self.curves[legend] = self.sweep_plot.plot(**feat_kw, name=legends[i])
122
123 self.smooth_max = et.utils.SmoothMax()
124
125 self.dist_history_plot = win.addPlot(row=1, col=0)
126 self.dist_history_plot.setMenuEnabled(False)
127 self.dist_history_plot.showGrid(x=True, y=True)
128 self.dist_history_plot.addLegend()
129 self.dist_history_plot.setLabel("left", "Estimated distance (m)")
130 self.dist_history_plot.setLabel("bottom", "History (frame)")
131 self.dist_history_plot.addItem(pg.PlotDataItem())
132 self.dist_history_plot.setXRange(0, len(self.history))
133
134 pen = et.utils.pg_pen_cycler(0)
135 brush = et.utils.pg_brush_cycler(0)
136 symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
137 feat_kw = dict(pen=pen, **symbol_kw)
138 self.dist_history_curve = self.dist_history_plot.plot(**feat_kw)
139
140 def update(self, d):
141 sweep = d.extra_result.abs_sweep
142 threshold = d.extra_result.used_threshold
143 distances = d.extra_result.distances_m
144
145 self.history.pop(0)
146 if len(d.estimated_distances) != 0:
147 self.history.append(d.estimated_distances[0])
148 else:
149 self.history.append(np.nan)
150
151 self.curves["Sweep"].setData(distances, sweep)
152 self.curves["Threshold"].setData(distances, threshold)
153 self.sweep_plot.setYRange(
154 0,
155 self.smooth_max.update(np.amax(np.concatenate((sweep, threshold)))),
156 )
157
158 if not np.all(np.isnan(self.history)):
159 self.dist_history_curve.setData(self.history)
160
161
162if __name__ == "__main__":
163 main()
View this example on GitHub: acconeer/acconeer-python-exploration