examples/a121/algo/vibration/processor.py

examples/a121/algo/vibration/processor.py#

 1# Copyright (c) Acconeer AB, 2022-2026
 2# All rights reserved
 3
 4from __future__ import annotations
 5
 6import pyqtgraph as pg
 7
 8import acconeer.exptool as et
 9from acconeer.exptool import a121
10from acconeer.exptool.a121.algo._utils import APPROX_BASE_STEP_LENGTH_M
11from acconeer.exptool.a121.algo.vibration import (
12    ExampleApp,
13    Processor,
14    ProcessorConfig,
15    ProcessorContext,
16    ProcessorResult,
17    get_high_frequency_config,
18)
19from acconeer.exptool.a121.algo.vibration._processor import RANGE_SUBSWEEP
20from acconeer.exptool.a121.algo.vibration.plot import VibrationPlot
21
22
23def main():
24    args = a121.ExampleArgumentParser().parse_args()
25    et.utils.config_logging(args)
26
27    sensor_config = ExampleApp._get_sensor_config(get_high_frequency_config())
28
29    client = a121.Client.open(**a121.get_client_args(args))
30    metadata = client.setup_session(sensor_config)
31
32    processor = Processor(
33        sensor_config=sensor_config,
34        metadata=metadata,
35        processor_config=ProcessorConfig(),
36        context=ProcessorContext(),
37    )
38    pg_updater = PGUpdater(sensor_config)
39    pg_process = et.PGProcess(pg_updater)
40    pg_process.start()
41
42    client.start_session()
43    interrupt_handler = et.utils.ExampleInterruptHandler()
44    print("Press Ctrl-C to end session")
45
46    while not interrupt_handler.got_signal:
47        result = client.get_next()
48        plot_data = processor.process(result)
49        try:
50            pg_process.put_data(plot_data)
51        except et.PGProccessDiedException:
52            break
53
54    print("Disconnecting...")
55    client.close()
56
57
58class PGUpdater:
59    def __init__(self, sensor_config: a121.SensorConfig) -> None:
60        self._sensor_config = sensor_config.subsweeps[RANGE_SUBSWEEP]
61        self._meas_dist_m = self._sensor_config.start_point * APPROX_BASE_STEP_LENGTH_M
62        self._vibration_plot = VibrationPlot()
63
64    def setup(self, win: pg.GraphicsLayoutWidget) -> None:
65        self._vibration_plot.setup_plot(win, self._meas_dist_m)
66
67    def update(self, processor_result: ProcessorResult) -> None:
68        self._vibration_plot.update_plot(
69            result=processor_result,
70            extra_result=processor_result.extra_result,
71            show_time_series_std=True,
72        )
73
74
75if __name__ == "__main__":
76    main()