examples/a121/algo/distance/detector.py#
1# Copyright (c) Acconeer AB, 2022-2024
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10
11import pyqtgraph as pg
12
13import acconeer.exptool as et
14from acconeer.exptool import a121
15from acconeer.exptool.a121.algo.distance import Detector, DetectorConfig, ThresholdMethod
16
17
18SENSOR_ID = 1
19
20
21def main():
22 args = a121.ExampleArgumentParser().parse_args()
23 et.utils.config_logging(args)
24
25 client = a121.Client.open(**a121.get_client_args(args))
26 detector_config = DetectorConfig(
27 start_m=0.0,
28 end_m=2.0,
29 max_profile=a121.Profile.PROFILE_3,
30 max_step_length=12,
31 threshold_method=ThresholdMethod.RECORDED,
32 )
33 detector = Detector(client=client, sensor_ids=[SENSOR_ID], detector_config=detector_config)
34
35 detector.calibrate_detector()
36
37 detector.start()
38
39 pg_updater = PGUpdater(num_curves=len(detector.processor_specs))
40 pg_process = et.PGProcess(pg_updater)
41 pg_process.start()
42
43 interrupt_handler = et.utils.ExampleInterruptHandler()
44 print("Press Ctrl-C to end session")
45
46 while not interrupt_handler.got_signal:
47 detector_result = detector.get_next()
48 try:
49 pg_process.put_data(detector_result)
50 except et.PGProccessDiedException:
51 break
52
53 detector.stop()
54
55 print("Disconnecting...")
56 client.close()
57
58
59class PGUpdater:
60 def __init__(self, num_curves):
61 self.num_curves = num_curves
62 self.distance_history = [np.nan] * 100
63
64 def setup(self, win):
65 self.sweep_plot = win.addPlot(row=0, col=0)
66 self.sweep_plot.setMenuEnabled(False)
67 self.sweep_plot.showGrid(x=True, y=True)
68 self.sweep_plot.addLegend()
69 self.sweep_plot.setLabel("left", "Amplitude")
70 self.sweep_plot.addItem(pg.PlotDataItem())
71
72 pen = et.utils.pg_pen_cycler(0)
73 brush = et.utils.pg_brush_cycler(0)
74 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
75 feat_kw = dict(pen=pen, **symbol_kw)
76 self.sweep_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
77
78 pen = et.utils.pg_pen_cycler(1)
79 brush = et.utils.pg_brush_cycler(1)
80 symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
81 feat_kw = dict(pen=pen, **symbol_kw)
82 self.threshold_curves = [self.sweep_plot.plot(**feat_kw) for _ in range(self.num_curves)]
83
84 self.dist_history_plot = win.addPlot(row=1, col=0)
85 self.dist_history_plot.setMenuEnabled(False)
86 self.dist_history_plot.showGrid(x=True, y=True)
87 self.dist_history_plot.addLegend()
88 self.dist_history_plot.setLabel("left", "Estimated_distance")
89 self.dist_history_plot.addItem(pg.PlotDataItem())
90
91 pen = et.utils.pg_pen_cycler(0)
92 brush = et.utils.pg_brush_cycler(0)
93 symbol_kw = dict(symbol="o", symbolSize=5, symbolBrush=brush, symbolPen="k")
94 feat_kw = dict(pen=pen, **symbol_kw)
95 self.dist_history_curve = self.dist_history_plot.plot(**feat_kw)
96
97 self.distance_hist_smooth_lim = et.utils.SmoothLimits()
98
99 def update(self, multi_sensor_result):
100 # Get the first element as the example only supports single sensor operation.
101 result = multi_sensor_result[SENSOR_ID]
102 self.distance_history.pop(0)
103 if len(result.distances) != 0:
104 self.distance_history.append(result.distances[0])
105 else:
106 self.distance_history.append(np.nan)
107
108 for idx, processor_result in enumerate(result.processor_results):
109 threshold = processor_result.extra_result.used_threshold
110 valid_threshold_idx = np.where(~np.isnan(threshold))[0]
111 threshold = threshold[valid_threshold_idx]
112 self.sweep_curves[idx].setData(
113 processor_result.extra_result.distances_m, processor_result.extra_result.abs_sweep
114 )
115 self.threshold_curves[idx].setData(
116 processor_result.extra_result.distances_m[valid_threshold_idx], threshold
117 )
118 if np.any(~np.isnan(self.distance_history)):
119 self.dist_history_curve.setData(self.distance_history)
120 lims = self.distance_hist_smooth_lim.update(self.distance_history)
121 self.dist_history_plot.setYRange(lims[0], lims[1])
122 else:
123 self.dist_history_curve.setData([])
124
125
126if __name__ == "__main__":
127 main()
View this example on GitHub: acconeer/acconeer-python-exploration