examples/a121/algo/vibration/processor.py

examples/a121/algo/vibration/processor.py#

  1# Copyright (c) Acconeer AB, 2022-2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10
 11import pyqtgraph as pg
 12
 13import acconeer.exptool as et
 14from acconeer.exptool import a121
 15from acconeer.exptool.a121.algo._utils import APPROX_BASE_STEP_LENGTH_M
 16from acconeer.exptool.a121.algo.vibration import (
 17    ExampleApp,
 18    Processor,
 19    ProcessorConfig,
 20    ProcessorContext,
 21    ProcessorResult,
 22    get_high_frequency_config,
 23)
 24from acconeer.exptool.a121.algo.vibration._processor import RANGE_SUBSWEEP
 25
 26
 27def main():
 28    args = a121.ExampleArgumentParser().parse_args()
 29    et.utils.config_logging(args)
 30
 31    sensor_config = ExampleApp._get_sensor_config(get_high_frequency_config())
 32
 33    client = a121.Client.open(**a121.get_client_args(args))
 34    metadata = client.setup_session(sensor_config)
 35
 36    processor = Processor(
 37        sensor_config=sensor_config,
 38        metadata=metadata,
 39        processor_config=ProcessorConfig(),
 40        context=ProcessorContext(),
 41    )
 42    pg_updater = PGUpdater(sensor_config)
 43    pg_process = et.PGProcess(pg_updater)
 44    pg_process.start()
 45
 46    client.start_session()
 47    interrupt_handler = et.utils.ExampleInterruptHandler()
 48    print("Press Ctrl-C to end session")
 49
 50    while not interrupt_handler.got_signal:
 51        result = client.get_next()
 52        plot_data = processor.process(result)
 53        try:
 54            pg_process.put_data(plot_data)
 55        except et.PGProccessDiedException:
 56            break
 57
 58    print("Disconnecting...")
 59    client.close()
 60
 61
 62class PGUpdater:
 63    def __init__(self, sensor_config: a121.SensorConfig):
 64        self.sensor_config = sensor_config.subsweeps[RANGE_SUBSWEEP]
 65        self.meas_dist_m = self.sensor_config.start_point * APPROX_BASE_STEP_LENGTH_M
 66
 67    def setup(self, win):
 68        pen_blue = et.utils.pg_pen_cycler(0)
 69        pen_orange = et.utils.pg_pen_cycler(1)
 70        brush = et.utils.pg_brush_cycler(0)
 71        brush_dot = et.utils.pg_brush_cycler(1)
 72        symbol_kw = dict(symbol="o", symbolSize=1, symbolBrush=brush, symbolPen="k")
 73        feat_kw_blue = dict(pen=pen_blue, **symbol_kw)
 74        feat_kw_orange = dict(pen=pen_orange)
 75        symbol_dot_kw = dict(symbol="o", symbolSize=10, symbolBrush=brush_dot, symbolPen="k")
 76
 77        # presence plot
 78        self.object_detection_plot = pg.PlotItem()
 79        self.object_detection_plot.setMenuEnabled(False)
 80        self.object_detection_plot.showGrid(x=False, y=True)
 81        self.object_detection_plot.setLabel("left", "Max amplitude")
 82        self.object_detection_plot.setLabel("bottom", "Distance (m)")
 83        self.object_detection_plot.setXRange(self.meas_dist_m - 0.001, self.meas_dist_m + 0.001)
 84        self.presence_curve = self.object_detection_plot.plot(
 85            **dict(pen=pen_blue, **symbol_dot_kw)
 86        )
 87
 88        self.presence_threshold = pg.InfiniteLine(pen=pen_blue, angle=0)
 89        self.object_detection_plot.addItem(self.presence_threshold)
 90        self.presence_threshold.show()
 91
 92        self.smooth_max_presence = et.utils.SmoothMax(tau_decay=10.0)
 93
 94        # sweep and threshold plot
 95        self.time_series_plot = pg.PlotItem()
 96        self.time_series_plot.setMenuEnabled(False)
 97        self.time_series_plot.showGrid(x=True, y=True)
 98        self.time_series_plot.setLabel("left", "Displacement (<font>&mu;</font>m)")
 99        self.time_series_plot.setLabel("bottom", "History")
100        self.time_series_curve = self.time_series_plot.plot(**feat_kw_blue)
101
102        self.time_series_plot.setYRange(-1000, 1000)
103        self.time_series_plot.setXRange(0, 1024)
104
105        self.text_item_time_series = pg.TextItem(
106            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
107            anchor=(0.5, 0),
108        )
109        self.text_item_time_series.hide()
110        self.time_series_plot.addItem(self.text_item_time_series)
111
112        sublayout = win.addLayout(row=0, col=0)
113        sublayout.layout.setColumnStretchFactor(1, 5)
114        sublayout.addItem(self.object_detection_plot, row=0, col=0)
115        sublayout.addItem(self.time_series_plot, row=0, col=1)
116
117        self.smooth_lim_time_series = et.utils.SmoothLimits(tau_decay=0.5, tau_grow=0.1)
118
119        self.fft_plot = win.addPlot(col=0, row=1)
120        self.fft_plot.setMenuEnabled(False)
121        self.fft_plot.showGrid(x=True, y=True)
122        self.fft_plot.setLabel("left", "Displacement (<font>&mu;</font>m)")
123        self.fft_plot.setLabel("bottom", "Frequency (Hz)")
124        self.fft_plot.setLogMode(False, True)
125        self.fft_plot.addItem(pg.PlotDataItem())
126        self.fft_curve = [
127            self.fft_plot.plot(**feat_kw_blue),
128            self.fft_plot.plot(**feat_kw_orange),
129            self.fft_plot.plot(**dict(pen=pen_blue, **symbol_dot_kw)),
130        ]
131
132        self.text_item_fft = pg.TextItem(
133            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
134            anchor=(0.5, 0),
135        )
136        self.text_item_fft.hide()
137        self.fft_plot.addItem(self.text_item_fft)
138
139        self.smooth_max_fft = et.utils.SmoothMax()
140
141    def update(self, processor_result: ProcessorResult) -> None:
142        # Extra result
143        time_series = processor_result.extra_result.zm_time_series
144        lp_displacements_threshold = processor_result.extra_result.lp_displacements_threshold
145        amplitude_threshold = processor_result.extra_result.amplitude_threshold
146
147        # Processor result
148        lp_displacements = processor_result.lp_displacements
149        lp_displacements_freqs = processor_result.lp_displacements_freqs
150        max_amplitude = processor_result.max_sweep_amplitude
151        max_displacement = processor_result.max_displacement
152        max_displacement_freq = processor_result.max_displacement_freq
153        time_series_rms = processor_result.time_series_std
154
155        # Plot object presence metric
156        self.presence_curve.setData([self.meas_dist_m], [max_amplitude])
157        self.presence_threshold.setValue(amplitude_threshold)
158        lim = self.smooth_max_presence.update(max_amplitude)
159        self.object_detection_plot.setYRange(0, max(1000.0, lim))
160
161        # Plot time series
162        if time_series is not None and amplitude_threshold < max_amplitude:
163            assert time_series_rms is not None
164            lim = self.smooth_lim_time_series.update(time_series)
165            self.time_series_plot.setYRange(lim[0], lim[1])
166            self.time_series_plot.setXRange(0, time_series.shape[0])
167
168            self.text_item_time_series.setPos(time_series.size / 2, lim[1] * 0.95)
169            html_format = (
170                '<div style="text-align: center">'
171                '<span style="color: #FFFFFF;font-size:15pt;">'
172                "{}</span></div>".format("RMS : " + str(int(time_series_rms)))
173            )
174            self.text_item_time_series.setHtml(html_format)
175            self.text_item_time_series.show()
176            self.time_series_curve.setData(time_series)
177
178        # Plot spectrum
179        if lp_displacements is not None:
180            assert time_series is not None
181            assert lp_displacements is not None
182
183            self.fft_curve[0].setData(lp_displacements_freqs, lp_displacements)
184            self.fft_curve[1].setData(lp_displacements_freqs, lp_displacements_threshold)
185            lim = self.smooth_max_fft.update(np.max(lp_displacements))
186            self.fft_plot.setYRange(-1, np.log10(lim))
187
188            if max_displacement_freq is not None and max_displacement is not None:
189                self.fft_curve[2].setData([max_displacement_freq], [max_displacement])
190
191                # Place text box centered at the top of the plotting window
192                self.text_item_fft.setPos(max(lp_displacements_freqs) / 2, np.log10(lim) * 0.95)
193                html_format = (
194                    '<div style="text-align: center">'
195                    '<span style="color: #FFFFFF;font-size:15pt;">'
196                    "{}</span></div>".format(
197                        "Frequency: "
198                        + str(int(max_displacement_freq))
199                        + "Hz Displacement: "
200                        + str(int(max_displacement))
201                        + "<font>&mu;</font>m"
202                    )
203                )
204                self.text_item_fft.setHtml(html_format)
205                self.text_item_fft.show()
206            else:
207                self.fft_curve[2].setData([], [])
208                self.text_item_fft.hide()
209
210
211if __name__ == "__main__":
212    main()