examples/a121/algo/vibration/example_app.py

examples/a121/algo/vibration/example_app.py#

 1# Copyright (c) Acconeer AB, 2024-2026
 2# All rights reserved
 3
 4from __future__ import annotations
 5
 6import pyqtgraph as pg
 7
 8import acconeer.exptool as et
 9from acconeer.exptool import a121
10from acconeer.exptool.a121.algo._utils import APPROX_BASE_STEP_LENGTH_M
11from acconeer.exptool.a121.algo.vibration import (
12    ExampleApp,
13    ExampleAppConfig,
14    ExampleAppResult,
15    get_high_frequency_config,
16)
17from acconeer.exptool.a121.algo.vibration.plot import VibrationPlot
18
19
20SENSOR_ID = 1
21
22
23def main():
24    args = a121.ExampleArgumentParser().parse_args()
25    et.utils.config_logging(args)
26
27    example_app_config = get_high_frequency_config()
28
29    client = a121.Client.open(**a121.get_client_args(args))
30
31    example_app = ExampleApp(
32        client=client,
33        sensor_id=SENSOR_ID,
34        example_app_config=example_app_config,
35    )
36
37    pg_updater = PGUpdater(example_app_config)
38    pg_process = et.PGProcess(pg_updater)
39
40    example_app.start()
41    pg_process.start()
42
43    interrupt_handler = et.utils.ExampleInterruptHandler()
44    print("Press Ctrl-C to end session")
45
46    while not interrupt_handler.got_signal:
47        example_app_result = example_app.get_next()
48
49        try:
50            pg_process.put_data(example_app_result)
51        except et.PGProccessDiedException:
52            break
53
54    print("Disconnecting...")
55    pg_process.close()
56    example_app.stop()
57
58
59class PGUpdater:
60    def __init__(self, example_app_config: ExampleAppConfig) -> None:
61        self._sensor_config = ExampleApp._get_sensor_config(example_app_config)
62        self._meas_dist_m = example_app_config.measured_point * APPROX_BASE_STEP_LENGTH_M
63        self._vibration_plot = VibrationPlot()
64
65    def setup(self, win: pg.GraphicsLayoutWidget) -> None:
66        self._vibration_plot.setup_plot(win, self._meas_dist_m)
67
68    def update(self, example_app_result: ExampleAppResult) -> None:
69        self._vibration_plot.update_plot(
70            result=example_app_result,
71            extra_result=example_app_result.processor_extra_result,
72            show_time_series_std=True,
73        )
74
75
76if __name__ == "__main__":
77    main()