examples/a121/algo/phase_tracking/phase_tracking.py#
1# Copyright (c) Acconeer AB, 2022-2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10
11import pyqtgraph as pg
12
13import acconeer.exptool as et
14from acconeer.exptool import a121
15from acconeer.exptool.a121.algo._utils import get_distances_m
16from acconeer.exptool.a121.algo.phase_tracking import (
17 Processor,
18 ProcessorConfig,
19 ProcessorContext,
20 get_sensor_config,
21)
22
23
24def main():
25 args = a121.ExampleArgumentParser().parse_args()
26 et.utils.config_logging(args)
27
28 sensor_config = get_sensor_config()
29
30 client = a121.Client.open(**a121.get_client_args(args))
31 metadata = client.setup_session(sensor_config)
32
33 processor = Processor(
34 sensor_config=sensor_config,
35 metadata=metadata,
36 processor_config=ProcessorConfig(),
37 context=ProcessorContext(),
38 )
39 pg_updater = PGUpdater(sensor_config, metadata)
40 pg_process = et.PGProcess(pg_updater)
41 pg_process.start()
42
43 client.start_session()
44 interrupt_handler = et.utils.ExampleInterruptHandler()
45 print("Press Ctrl-C to end session")
46
47 while not interrupt_handler.got_signal:
48 result = client.get_next()
49 plot_data = processor.process(result)
50 try:
51 pg_process.put_data(plot_data)
52 except et.PGProccessDiedException:
53 break
54
55 print("Disconnecting...")
56 client.close()
57
58
59class PGUpdater:
60 def __init__(self, sensor_config: a121.SensorConfig, metadata: a121.Metadata):
61 self.distances_m = get_distances_m(sensor_config, metadata)
62
63 def setup(self, win):
64 pens = [et.utils.pg_pen_cycler(i) for i in range(3)]
65 brush = et.utils.pg_brush_cycler(0)
66 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
67 feat_kws = [dict(pen=pen, **symbol_kw) for pen in pens]
68
69 # sweep and threshold plot
70 self.sweep_plot = win.addPlot(row=0, col=0)
71 self.sweep_plot.setMenuEnabled(False)
72 self.sweep_plot.showGrid(x=True, y=True)
73 self.sweep_plot.addLegend()
74 self.sweep_plot.setLabel("left", "Amplitude")
75 self.sweep_plot.setLabel("bottom", "Distance (m)")
76 self.sweep_plot.addItem(pg.PlotDataItem())
77 self.sweeps_curve = [self.sweep_plot.plot(**feat_kw) for feat_kw in feat_kws]
78 self.sweep_vertical_line = pg.InfiniteLine(pen=pens[2])
79 self.sweep_plot.addItem(self.sweep_vertical_line)
80 self.sweep_smooth_max = et.utils.SmoothMax()
81 sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
82 sweep_plot_legend.setParentItem(self.sweep_plot)
83 sweep_plot_legend.addItem(self.sweeps_curve[0], "Sweep")
84 sweep_plot_legend.addItem(self.sweeps_curve[1], "Threshold")
85
86 # argument plot
87 argument_plot = win.addPlot(row=1, col=0)
88 argument_plot.setMenuEnabled(False)
89 argument_plot.showGrid(x=True, y=True)
90 argument_plot.addLegend()
91 argument_plot.setLabel("bottom", "Distance (m)")
92 argument_plot.setLabel("left", "Phase")
93 argument_plot.setYRange(-np.pi, np.pi)
94 argument_plot.getAxis("left").setTicks(et.utils.pg_phase_ticks)
95 argument_plot.setYRange(-np.pi, np.pi)
96 argument_plot.addItem(pg.ScatterPlotItem())
97 self.argument_curve = argument_plot.plot(
98 **dict(pen=None, symbol="o", symbolSize=5, symbolPen="k")
99 )
100 self.argument_vertical_line = pg.InfiniteLine(pen=pens[2])
101 argument_plot.addItem(self.argument_vertical_line)
102
103 # history plot
104 self.history_plot = win.addPlot(row=2, col=0)
105 self.history_plot.setMenuEnabled(False)
106 self.history_plot.showGrid(x=True, y=True)
107 self.history_plot.addLegend()
108 self.history_plot.setLabel("left", "Distance (mm)")
109 self.history_plot.setLabel("bottom", "Time (s)")
110 self.history_plot.addItem(pg.PlotDataItem())
111 self.history_curve = self.history_plot.plot(**feat_kws[0])
112
113 self.sweep_smooth_max = et.utils.SmoothMax()
114 self.distance_hist_smooth_lim = et.utils.SmoothLimits(tau_decay=0.5, tau_grow=0.1)
115
116 def update(self, processor_result):
117 assert processor_result is not None
118 assert processor_result.threshold is not None
119
120 sweep = processor_result.lp_abs_sweep
121 threshold = processor_result.threshold * np.ones(sweep.size)
122 angle_sweep = processor_result.angle_sweep
123 peak_loc = processor_result.peak_loc_m
124 history = processor_result.distance_history
125 rel_time_stamps = processor_result.rel_time_stamps
126
127 # update sweep plot
128 self.sweeps_curve[0].setData(self.distances_m, sweep)
129 self.sweeps_curve[1].setData(self.distances_m, threshold)
130 max_val_in_sweep_plot = max(np.max(sweep), np.max(threshold))
131 self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_sweep_plot))
132
133 # update argument plot
134 self.argument_curve.setData(self.distances_m, angle_sweep)
135
136 if peak_loc is not None:
137 # update vertical lines
138 self.sweep_vertical_line.setValue(peak_loc)
139 self.argument_vertical_line.setValue(peak_loc)
140 self.sweep_vertical_line.show()
141 self.argument_vertical_line.show()
142 else:
143 self.sweep_vertical_line.hide()
144 self.argument_vertical_line.hide()
145
146 if history.shape[0] != 0:
147 # update history plot
148 self.history_curve.setData(rel_time_stamps, history)
149 lims = self.distance_hist_smooth_lim.update(history)
150 self.history_plot.setYRange(lims[0], lims[1])
151 self.history_plot.setXRange(-Processor.TIME_HORIZON_S, 0)
152
153
154if __name__ == "__main__":
155 main()
View this example on GitHub: acconeer/acconeer-python-exploration