examples/a121/algo/phase_tracking/phase_tracking.py

examples/a121/algo/phase_tracking/phase_tracking.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo._utils import get_distances_m
 16from acconeer.exptool.a121.algo.phase_tracking import (
 17    Processor,
 18    ProcessorConfig,
 19    ProcessorContext,
 20    get_sensor_config,
 21)
 22
 23
 24def main():
 25    args = a121.ExampleArgumentParser().parse_args()
 26    et.utils.config_logging(args)
 27
 28    sensor_config = get_sensor_config()
 29
 30    client = a121.Client.open(**a121.get_client_args(args))
 31    metadata = client.setup_session(sensor_config)
 32
 33    processor = Processor(
 34        sensor_config=sensor_config,
 35        metadata=metadata,
 36        processor_config=ProcessorConfig(),
 37        context=ProcessorContext(),
 38    )
 39    pg_updater = PGUpdater(sensor_config, metadata)
 40    pg_process = et.PGProcess(pg_updater)
 41    pg_process.start()
 42
 43    client.start_session()
 44    interrupt_handler = et.utils.ExampleInterruptHandler()
 45    print("Press Ctrl-C to end session")
 46
 47    while not interrupt_handler.got_signal:
 48        result = client.get_next()
 49        plot_data = processor.process(result)
 50        try:
 51            pg_process.put_data(plot_data)
 52        except et.PGProccessDiedException:
 53            break
 54
 55    print("Disconnecting...")
 56    client.close()
 57
 58
 59class PGUpdater:
 60    def __init__(self, sensor_config: a121.SensorConfig, metadata: a121.Metadata):
 61        self.distances_m = get_distances_m(sensor_config, metadata)
 62
 63    def setup(self, win):
 64        pens = [et.utils.pg_pen_cycler(i) for i in range(3)]
 65        brush = et.utils.pg_brush_cycler(0)
 66        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
 67        feat_kws = [dict(pen=pen, **symbol_kw) for pen in pens]
 68
 69        # sweep and threshold plot
 70        self.sweep_plot = win.addPlot(row=0, col=0)
 71        self.sweep_plot.setMenuEnabled(False)
 72        self.sweep_plot.showGrid(x=True, y=True)
 73        self.sweep_plot.addLegend()
 74        self.sweep_plot.setLabel("left", "Amplitude")
 75        self.sweep_plot.setLabel("bottom", "Distance (m)")
 76        self.sweep_plot.addItem(pg.PlotDataItem())
 77        self.sweeps_curve = [self.sweep_plot.plot(**feat_kw) for feat_kw in feat_kws]
 78        self.sweep_vertical_line = pg.InfiniteLine(pen=pens[2])
 79        self.sweep_plot.addItem(self.sweep_vertical_line)
 80        self.sweep_smooth_max = et.utils.SmoothMax()
 81        sweep_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
 82        sweep_plot_legend.setParentItem(self.sweep_plot)
 83        sweep_plot_legend.addItem(self.sweeps_curve[0], "Sweep")
 84        sweep_plot_legend.addItem(self.sweeps_curve[1], "Threshold")
 85
 86        # argument plot
 87        argument_plot = win.addPlot(row=1, col=0)
 88        argument_plot.setMenuEnabled(False)
 89        argument_plot.showGrid(x=True, y=True)
 90        argument_plot.addLegend()
 91        argument_plot.setLabel("bottom", "Distance (m)")
 92        argument_plot.setLabel("left", "Phase")
 93        argument_plot.setYRange(-np.pi, np.pi)
 94        argument_plot.getAxis("left").setTicks(et.utils.pg_phase_ticks)
 95        argument_plot.setYRange(-np.pi, np.pi)
 96        argument_plot.addItem(pg.ScatterPlotItem())
 97        self.argument_curve = argument_plot.plot(
 98            **dict(pen=None, symbol="o", symbolSize=5, symbolPen="k")
 99        )
100        self.argument_vertical_line = pg.InfiniteLine(pen=pens[2])
101        argument_plot.addItem(self.argument_vertical_line)
102
103        # history plot
104        self.history_plot = win.addPlot(row=2, col=0)
105        self.history_plot.setMenuEnabled(False)
106        self.history_plot.showGrid(x=True, y=True)
107        self.history_plot.addLegend()
108        self.history_plot.setLabel("left", "Distance (mm)")
109        self.history_plot.setLabel("bottom", "Time (s)")
110        self.history_plot.addItem(pg.PlotDataItem())
111        self.history_curve = self.history_plot.plot(**feat_kws[0])
112
113        self.sweep_smooth_max = et.utils.SmoothMax()
114        self.distance_hist_smooth_lim = et.utils.SmoothLimits(tau_decay=0.5, tau_grow=0.1)
115
116    def update(self, processor_result):
117        assert processor_result is not None
118        assert processor_result.threshold is not None
119
120        sweep = processor_result.lp_abs_sweep
121        threshold = processor_result.threshold * np.ones(sweep.size)
122        angle_sweep = processor_result.angle_sweep
123        peak_loc = processor_result.peak_loc_m
124        history = processor_result.distance_history
125        rel_time_stamps = processor_result.rel_time_stamps
126
127        # update sweep plot
128        self.sweeps_curve[0].setData(self.distances_m, sweep)
129        self.sweeps_curve[1].setData(self.distances_m, threshold)
130        max_val_in_sweep_plot = max(np.max(sweep), np.max(threshold))
131        self.sweep_plot.setYRange(0, self.sweep_smooth_max.update(max_val_in_sweep_plot))
132
133        # update argument plot
134        self.argument_curve.setData(self.distances_m, angle_sweep)
135
136        if peak_loc is not None:
137            # update vertical lines
138            self.sweep_vertical_line.setValue(peak_loc)
139            self.argument_vertical_line.setValue(peak_loc)
140            self.sweep_vertical_line.show()
141            self.argument_vertical_line.show()
142        else:
143            self.sweep_vertical_line.hide()
144            self.argument_vertical_line.hide()
145
146        if history.shape[0] != 0:
147            # update history plot
148            self.history_curve.setData(rel_time_stamps, history)
149            lims = self.distance_hist_smooth_lim.update(history)
150            self.history_plot.setYRange(lims[0], lims[1])
151        self.history_plot.setXRange(-Processor.TIME_HORIZON_S, 0)
152
153
154if __name__ == "__main__":
155    main()

View this example on GitHub: acconeer/acconeer-python-exploration