examples/a121/plot.py

examples/a121/plot.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8import acconeer.exptool as et
  9from acconeer.exptool import a121
 10
 11
 12def main():
 13    args = a121.ExampleArgumentParser().parse_args()
 14    et.utils.config_logging(args)
 15
 16    client = a121.Client.open(**a121.get_client_args(args))
 17
 18    session_config = a121.SessionConfig(
 19        [
 20            {
 21                1: a121.SensorConfig(
 22                    subsweeps=[
 23                        a121.SubsweepConfig(
 24                            start_point=25,
 25                            step_length=2,
 26                            num_points=30,
 27                            profile=a121.Profile.PROFILE_1,
 28                            hwaas=10,
 29                        ),
 30                        a121.SubsweepConfig(
 31                            start_point=75,
 32                            step_length=4,
 33                            num_points=25,
 34                            profile=a121.Profile.PROFILE_3,
 35                            hwaas=20,
 36                        ),
 37                    ],
 38                )
 39            },
 40            {
 41                1: a121.SensorConfig(
 42                    receiver_gain=20,
 43                ),
 44            },
 45        ],
 46        extended=True,
 47    )
 48
 49    extended_metadata = client.setup_session(session_config)
 50
 51    pg_updater = PGUpdater(session_config, extended_metadata)
 52    pg_process = et.PGProcess(pg_updater)
 53    pg_process.start()
 54
 55    client.start_session()
 56
 57    interrupt_handler = et.utils.ExampleInterruptHandler()
 58    print("Press Ctrl-C to end session")
 59
 60    while not interrupt_handler.got_signal:
 61        extended_result = client.get_next()
 62
 63        try:
 64            pg_process.put_data(extended_result)
 65        except et.PGProccessDiedException:
 66            break
 67
 68    print("Disconnecting...")
 69    pg_process.close()
 70    client.close()
 71
 72
 73class PGUpdater:
 74    def __init__(
 75        self,
 76        session_config: a121.SessionConfig,
 77        extended_metadata: list[dict[int, a121.Metadata]],
 78    ) -> None:
 79        self.session_config = session_config
 80        self.extended_metadata = extended_metadata
 81
 82    def setup(self, win):
 83        self.all_plots = []
 84        self.all_curves = []
 85        self.all_smooth_maxs = []
 86
 87        for group_idx, group in enumerate(self.session_config.groups):
 88            group_plots = {}
 89            group_curves = {}
 90            group_smooth_maxs = {}
 91
 92            for sensor_id, sensor_config in group.items():
 93                title = f"Group {group_idx} / Sensor {sensor_id}"
 94                plot = win.addPlot(title=title)
 95                plot.setMenuEnabled(False)
 96                plot.setMouseEnabled(x=False, y=False)
 97                plot.hideButtons()
 98                plot.showGrid(x=True, y=True)
 99
100                plot.setLabel("bottom", "Depth (m)")
101                plot.setLabel("left", "Amplitude")
102
103                curves = []
104                for i in range(sensor_config.num_subsweeps):
105                    curve = plot.plot(pen=et.utils.pg_pen_cycler(i))
106                    curves.append(curve)
107
108                group_plots[sensor_id] = plot
109                group_curves[sensor_id] = curves
110
111                smooth_max = et.utils.SmoothMax(self.session_config.update_rate)
112                group_smooth_maxs[sensor_id] = smooth_max
113
114            self.all_plots.append(group_plots)
115            self.all_curves.append(group_curves)
116            self.all_smooth_maxs.append(group_smooth_maxs)
117
118    def update(self, extended_result: list[dict[int, a121.Result]]) -> None:
119        for group_idx, group in enumerate(extended_result):
120            for sensor_id, result in group.items():
121                plot = self.all_plots[group_idx][sensor_id]
122                curves = self.all_curves[group_idx][sensor_id]
123
124                max_ = 0
125
126                for sub_idx, subframe in enumerate(result.subframes):
127                    x = get_distances_m(
128                        self.session_config.groups[group_idx][sensor_id].subsweeps[sub_idx]
129                    )
130                    y = np.abs(subframe).mean(axis=0)
131                    curves[sub_idx].setData(x, y)
132
133                    max_ = max(max_, np.max(y))
134
135                smooth_max = self.all_smooth_maxs[group_idx][sensor_id]
136                plot.setYRange(0, smooth_max.update(max_))
137
138
139def get_distances_m(config):
140    range_p = np.arange(config.num_points) * config.step_length + config.start_point
141    return range_p * 2.5e-3
142
143
144if __name__ == "__main__":
145    main()

View this example on GitHub: acconeer/acconeer-python-exploration