examples/a121/algo/distance/processor.py#
1# Copyright (c) Acconeer AB, 2022-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import copy
7
8import numpy as np
9
10import pyqtgraph as pg
11
12import acconeer.exptool as et
13from acconeer.exptool import a121
14from acconeer.exptool.a121.algo.distance import (
15 Processor,
16 ProcessorConfig,
17 ProcessorContext,
18 ProcessorMode,
19 ThresholdMethod,
20 calculate_bg_noise_std,
21)
22
23
24def main():
25 args = a121.ExampleArgumentParser().parse_args()
26 et.utils.config_logging(args)
27
28 client = a121.Client.open(**a121.get_client_args(args))
29
30 # Define sensor configuration.
31 sensor_config = a121.SensorConfig(
32 subsweeps=[
33 a121.SubsweepConfig(
34 start_point=50,
35 step_length=4,
36 num_points=50,
37 profile=a121.Profile.PROFILE_1,
38 hwaas=32,
39 phase_enhancement=True,
40 )
41 ],
42 sweeps_per_frame=1,
43 )
44
45 # Calibrate noise.
46 noise_sensor_config = copy.deepcopy(sensor_config)
47 for subsweep in noise_sensor_config.subsweeps:
48 # Disable Tx when measuring background noise.
49 subsweep.enable_tx = False
50
51 metadata = client.setup_session(noise_sensor_config)
52 client.start_session()
53 result = client.get_next()
54 client.stop_session()
55
56 stds = [
57 calculate_bg_noise_std(subframe, subsweep_config)
58 for (subframe, subsweep_config) in zip(result.subframes, noise_sensor_config.subsweeps)
59 ]
60
61 # Create processor for distance estimation.
62 distance_context = ProcessorContext(
63 bg_noise_std=stds,
64 )
65 distance_config = ProcessorConfig(
66 processor_mode=ProcessorMode.DISTANCE_ESTIMATION,
67 threshold_method=ThresholdMethod.CFAR,
68 threshold_sensitivity=0.8,
69 )
70 distance_processor = Processor(
71 sensor_config=sensor_config,
72 metadata=metadata,
73 processor_config=distance_config,
74 context=distance_context,
75 )
76
77 pg_updater = PGUpdater()
78 pg_process = et.PGProcess(pg_updater)
79 pg_process.start()
80
81 metadata = client.setup_session(sensor_config)
82 client.start_session()
83
84 interrupt_handler = et.utils.ExampleInterruptHandler()
85 print("Press Ctrl-C to end session")
86
87 while not interrupt_handler.got_signal:
88 extended_result = client.get_next()
89 processed_data = distance_processor.process(extended_result)
90 try:
91 pg_process.put_data(processed_data)
92 except et.PGProccessDiedException:
93 break
94
95 print("Disconnecting...")
96 pg_process.close()
97 client.close()
98
99
100class PGUpdater:
101 def __init__(self):
102 self.history = [np.nan] * 100
103
104 def setup(self, win):
105 self.sweep_plot = win.addPlot(row=0, col=0)
106 self.sweep_plot.setMenuEnabled(False)
107 self.sweep_plot.showGrid(x=True, y=True)
108 self.sweep_plot.addLegend()
109 self.sweep_plot.setLabel("left", "Amplitude")
110 self.sweep_plot.setLabel("bottom", "Distance(m)")
111 self.sweep_plot.addItem(pg.PlotDataItem())
112
113 legends = ["Sweep", "Threshold"]
114 self.curves = {}
115 for i, legend in enumerate(legends):
116 pen = et.utils.pg_pen_cycler(i)
117 brush = et.utils.pg_brush_cycler(i)
118 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
119 feat_kw = dict(pen=pen, **symbol_kw)
120 self.curves[legend] = self.sweep_plot.plot(**feat_kw, name=legends[i])
121
122 self.smooth_max = et.utils.SmoothMax()
123
124 self.dist_history_plot = win.addPlot(row=1, col=0)
125 self.dist_history_plot.setMenuEnabled(False)
126 self.dist_history_plot.showGrid(x=True, y=True)
127 self.dist_history_plot.addLegend()
128 self.dist_history_plot.setLabel("left", "Estimated distance (m)")
129 self.dist_history_plot.setLabel("bottom", "History (frame)")
130 self.dist_history_plot.addItem(pg.PlotDataItem())
131 self.dist_history_plot.setXRange(0, len(self.history))
132
133 pen = et.utils.pg_pen_cycler(0)
134 brush = et.utils.pg_brush_cycler(0)
135 symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
136 feat_kw = dict(pen=pen, **symbol_kw)
137 self.dist_history_curve = self.dist_history_plot.plot(**feat_kw)
138
139 def update(self, d):
140 sweep = d.extra_result.abs_sweep
141 threshold = d.extra_result.used_threshold
142 distances = d.extra_result.distances_m
143
144 self.history.pop(0)
145 if len(d.estimated_distances) != 0:
146 self.history.append(d.estimated_distances[0])
147 else:
148 self.history.append(np.nan)
149
150 self.curves["Sweep"].setData(distances, sweep)
151 self.curves["Threshold"].setData(distances, threshold)
152 self.sweep_plot.setYRange(
153 0,
154 self.smooth_max.update(np.amax(np.concatenate((sweep, threshold)))),
155 )
156
157 if not np.all(np.isnan(self.history)):
158 self.dist_history_curve.setData(self.history)
159
160
161if __name__ == "__main__":
162 main()
View this example on GitHub: acconeer/acconeer-python-exploration