examples/a121/algo/breathing/breathing_with_gui.py

examples/a121/algo/breathing/breathing_with_gui.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import numpy as np
  7
  8# Added here to force pyqtgraph to choose PySide
  9import PySide6  # noqa: F401
 10from PySide6.QtGui import QFont
 11
 12import pyqtgraph as pg
 13
 14import acconeer.exptool as et
 15from acconeer.exptool import a121
 16from acconeer.exptool.a121.algo._utils import get_distances_m
 17from acconeer.exptool.a121.algo.breathing import AppState, RefApp
 18from acconeer.exptool.a121.algo.breathing._ref_app import (
 19    BreathingProcessorConfig,
 20    RefAppConfig,
 21    get_sensor_config,
 22)
 23from acconeer.exptool.a121.algo.presence import ProcessorConfig as PresenceProcessorConfig
 24
 25
 26def main():
 27    args = a121.ExampleArgumentParser().parse_args()
 28    et.utils.config_logging(args)
 29
 30    # Setup the configurations
 31    # Detailed at https://docs.acconeer.com/en/latest/exploration_tool/algo/a121/ref_apps/breathing.html
 32
 33    # Sensor selections
 34    sensor = 1
 35
 36    # Ref App Configurations
 37    breathing_processor_config = BreathingProcessorConfig(
 38        lowest_breathing_rate=6,
 39        highest_breathing_rate=60,
 40        time_series_length_s=20,
 41    )
 42
 43    # Presence Configurations
 44    presence_config = PresenceProcessorConfig(
 45        intra_detection_threshold=4,
 46        intra_frame_time_const=0.15,
 47        inter_frame_fast_cutoff=20,
 48        inter_frame_slow_cutoff=0.2,
 49        inter_frame_deviation_time_const=0.5,
 50    )
 51
 52    # Breathing Configurations
 53    ref_app_config = RefAppConfig(
 54        use_presence_processor=True,
 55        num_distances_to_analyze=3,
 56        distance_determination_duration=5,
 57        breathing_config=breathing_processor_config,
 58        presence_config=presence_config,
 59    )
 60
 61    # End setup configurations
 62
 63    # Preparation for client
 64    sensor_config = get_sensor_config(ref_app_config=ref_app_config)
 65    client = a121.Client.open(**a121.get_client_args(args))
 66    metadata = client.setup_session(sensor_config)
 67
 68    # Preparation for reference application processor
 69    ref_app = RefApp(client=client, sensor_id=sensor, ref_app_config=ref_app_config)
 70    ref_app.start()
 71
 72    pg_updater = PGUpdater(sensor_config, ref_app_config, metadata)
 73    pg_process = et.PGProcess(pg_updater)
 74    pg_process.start()
 75
 76    interrupt_handler = et.utils.ExampleInterruptHandler()
 77    print("Press Ctrl-C to end session")
 78
 79    while not interrupt_handler.got_signal:
 80        processed_data = ref_app.get_next()
 81        try:
 82            pg_process.put_data(processed_data)
 83        except et.PGProccessDiedException:
 84            break
 85
 86    ref_app.stop()
 87    print("Disconnecting...")
 88    client.close()
 89
 90
 91class PGUpdater:
 92    def __init__(
 93        self,
 94        sensor_config: a121.SensorConfig,
 95        ref_app_config: RefAppConfig,
 96        metadata: a121.Metadata,
 97    ):
 98        self.distances = get_distances_m(sensor_config, metadata)
 99        self.use_presence_processor = ref_app_config.use_presence_processor
100
101    def setup(self, win):
102        # Define pens and font.
103        blue_color = et.utils.color_cycler(0)
104        orange_color = et.utils.color_cycler(1)
105        brush = et.utils.pg_brush_cycler(0)
106        self.blue = dict(
107            pen=pg.mkPen(blue_color, width=2),
108            symbol="o",
109            symbolSize=1,
110            symbolBrush=brush,
111            symbolPen="k",
112        )
113        self.orange = dict(
114            pen=pg.mkPen(orange_color, width=2),
115            symbol="o",
116            symbolSize=1,
117            symbolBrush=brush,
118            symbolPen="k",
119        )
120        self.blue_transparent_pen = pg.mkPen(f"{blue_color}50", width=2)
121        self.orange_transparent_pen = pg.mkPen(f"{orange_color}50", width=2)
122
123        brush_dot = et.utils.pg_brush_cycler(1)
124        symbol_dot_kw = dict(symbol="o", symbolSize=10, symbolBrush=brush_dot, symbolPen="k")
125
126        font = QFont()
127        font.setPixelSize(16)
128
129        # Presence plot.
130        self.presence_plot = win.addPlot(row=0, col=0)
131        self.presence_plot.setMenuEnabled(False)
132        self.presence_plot.showGrid(x=True, y=True)
133        self.presence_plot.addLegend()
134        self.presence_plot.setLabel("left", "Presence score")
135        self.presence_plot.setLabel("bottom", "Distance (m)")
136        self.presence_plot.addItem(pg.PlotDataItem())
137        self.presence_plot_curve = []
138        self.presence_plot_curve.append(self.presence_plot.plot(**self.blue))
139        self.presence_plot_curve.append(self.presence_plot.plot(**self.orange))
140        self.presence_plot_curve.append(self.presence_plot.plot(**self.blue))
141        self.presence_plot_curve.append(self.presence_plot.plot(**self.orange))
142
143        self.presence_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
144        self.presence_plot_legend.setParentItem(self.presence_plot)
145        self.presence_plot_legend.addItem(self.presence_plot_curve[2], "Slow motion")
146        self.presence_plot_legend.addItem(self.presence_plot_curve[3], "Fast motion")
147        self.presence_plot_legend.show()
148
149        self.presence_smoot_max = et.utils.SmoothMax()
150
151        self.presence_text_item = pg.TextItem(
152            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
153            anchor=(0.5, 0),
154            color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
155        )
156        self.presence_text_item.setFont(font)
157        self.presence_text_item.show()
158        self.presence_plot.addItem(self.presence_text_item)
159
160        # Time series plot.
161        self.time_series_plot = win.addPlot(row=1, col=0)
162        self.time_series_plot.setMenuEnabled(False)
163        self.time_series_plot.showGrid(x=True, y=True)
164        self.time_series_plot.addLegend()
165        self.time_series_plot.setLabel("left", "Displacement")
166        self.time_series_plot.setLabel("bottom", "Time (s)")
167        self.time_series_plot.addItem(pg.PlotDataItem())
168        self.time_series_curve = self.time_series_plot.plot(**self.blue)
169
170        self.time_series_text_item = pg.TextItem(
171            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
172            anchor=(0.5, 0),
173            color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
174        )
175        self.time_series_text_item.setFont(font)
176        self.time_series_text_item.show()
177        self.time_series_plot.addItem(self.time_series_text_item)
178
179        # Breathing psd plot.
180        self.breathing_psd_plot = win.addPlot(row=2, col=0)
181        self.breathing_psd_plot.setMenuEnabled(False)
182        self.breathing_psd_plot.showGrid(x=True, y=True)
183        self.breathing_psd_plot.addLegend()
184        self.breathing_psd_plot.setLabel("left", "PSD")
185        self.breathing_psd_plot.setLabel("bottom", "Breathing rate (Hz)")
186        self.breathing_psd_plot.addItem(pg.PlotDataItem())
187        self.breathing_psd_curve = self.breathing_psd_plot.plot(**self.blue)
188
189        self.psd_smoothing = et.utils.SmoothMax()
190
191        # Breathing rate plot.
192        self.breathing_rate_plot = win.addPlot(row=3, col=0)
193        self.breathing_rate_plot.setMenuEnabled(False)
194        self.breathing_rate_plot.showGrid(x=True, y=True)
195        self.breathing_rate_plot.addLegend()
196        self.breathing_rate_plot.setLabel("left", "Breaths per minute")
197        self.breathing_rate_plot.setLabel("bottom", "Time (s)")
198        self.breathing_rate_plot.addItem(pg.PlotDataItem())
199        self.breathing_rate_curves = []
200        self.breathing_rate_curves.append(self.breathing_rate_plot.plot(**self.blue))
201        self.breathing_rate_curves.append(
202            self.breathing_rate_plot.plot(**dict(pen=None, **symbol_dot_kw))
203        )
204        self.smooth_breathing_rate = et.utils.SmoothLimits()
205
206        self.breathing_rate_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
207        self.breathing_rate_plot_legend.setParentItem(self.breathing_rate_plot)
208        self.breathing_rate_plot_legend.addItem(self.breathing_rate_curves[0], "Breathing rate")
209        self.breathing_rate_plot_legend.addItem(
210            self.breathing_rate_curves[1], "Breathing rate(embedded output)"
211        )
212        self.breathing_rate_plot_legend.hide()
213
214        self.breathing_rate_text_item = pg.TextItem(
215            fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
216            anchor=(0.5, 0),
217            color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
218        )
219        self.breathing_rate_text_item.setFont(font)
220        self.breathing_rate_text_item.hide()
221        self.breathing_rate_plot.addItem(self.breathing_rate_text_item)
222
223    def update(self, ref_app_result):
224        app_state = ref_app_result.app_state
225
226        max_ampl = max(
227            np.max(ref_app_result.presence_result.inter),
228            np.max(ref_app_result.presence_result.intra),
229        )
230        lim = self.presence_smoot_max.update(max_ampl)
231        self.presence_plot.setYRange(0, lim)
232
233        if ref_app_result.distances_being_analyzed is None:
234            self.presence_plot_curve[0].setData(
235                self.distances, ref_app_result.presence_result.inter, **self.blue
236            )
237            self.presence_plot_curve[1].setData(
238                self.distances, ref_app_result.presence_result.intra, **self.orange
239            )
240            self.presence_plot_curve[2].setData([], [])
241            self.presence_plot_curve[3].setData([], [])
242        else:
243            start = ref_app_result.distances_being_analyzed[0]
244            end = ref_app_result.distances_being_analyzed[1]
245            s = slice(start, end)
246            distance_slice = self.distances[s]
247            self.presence_plot_curve[0].setData(
248                self.distances,
249                ref_app_result.presence_result.inter,
250                pen=self.blue_transparent_pen,
251            )
252            self.presence_plot_curve[1].setData(
253                self.distances,
254                ref_app_result.presence_result.intra,
255                pen=self.orange_transparent_pen,
256            )
257            self.presence_plot_curve[2].setData(
258                distance_slice, ref_app_result.presence_result.inter[s]
259            )
260            self.presence_plot_curve[3].setData(
261                distance_slice, ref_app_result.presence_result.intra[s]
262            )
263
264        if ref_app_result.breathing_result is not None:
265            breathing_result = ref_app_result.breathing_result.extra_result
266            breathing_motion = breathing_result.breathing_motion
267            psd = breathing_result.psd
268            frequencies = breathing_result.frequencies
269            time_vector = breathing_result.time_vector
270            all_breathing_rate_history = breathing_result.all_breathing_rate_history
271            breathing_rate_history = breathing_result.breathing_rate_history
272
273            self.time_series_curve.setData(
274                time_vector[-breathing_motion.shape[0] :], breathing_motion
275            )
276            y = np.max(np.abs(breathing_motion)) * 1.05
277            self.time_series_plot.setYRange(-y, y)
278            self.time_series_plot.setXRange(
279                time_vector[-breathing_motion.shape[0]], max(time_vector)
280            )
281
282            if not np.all(np.isnan(all_breathing_rate_history)):
283                ylim = self.psd_smoothing.update(psd)
284                self.breathing_psd_curve.setData(frequencies, psd)
285                self.breathing_psd_plot.setYRange(0, ylim)
286                self.breathing_psd_plot.setXRange(0, 2)
287
288                self.breathing_rate_curves[0].setData(time_vector, all_breathing_rate_history)
289                lims = self.smooth_breathing_rate.update(all_breathing_rate_history)
290                self.breathing_rate_plot.setYRange(lims[0] - 3, lims[1] + 3)
291
292                self.breathing_rate_plot_legend.show()
293
294            if not np.all(np.isnan(breathing_rate_history)):
295                self.breathing_rate_curves[1].setData(time_vector, breathing_rate_history)
296
297            if not np.isnan(breathing_rate_history[-1]):
298                self.displayed_breathing_rate = "{:.1f}".format(breathing_rate_history[-1])
299                self.breathing_rate_text_item.show()
300
301        else:
302            self.time_series_plot.setYRange(0, 1)
303            self.time_series_plot.setXRange(0, 1)
304            self.time_series_curve.setData([], [])
305            self.breathing_psd_curve.setData([], [])
306            self.breathing_rate_curves[0].setData([], [])
307            self.breathing_rate_curves[1].setData([], [])
308            self.displayed_breathing_rate = None
309            self.breathing_rate_text_item.hide()
310
311        # Set text in text boxes according to app state.
312
313        # Presence text
314        if app_state == AppState.NO_PRESENCE_DETECTED:
315            presence_text = "No presence detected"
316        elif app_state == AppState.DETERMINE_DISTANCE_ESTIMATE:
317            presence_text = "Determining distance with presence"
318        elif app_state == AppState.ESTIMATE_BREATHING_RATE:
319            start_m = "{:.2f}".format(distance_slice[0])
320            end_m = "{:.2f}".format(distance_slice[-1])
321            if self.use_presence_processor:
322                presence_text = (
323                    "Presence detected in the range " + start_m + " - " + end_m + " (m)"
324                )
325            else:
326                presence_text = "Presence distance detection disabled"
327        elif app_state == AppState.INTRA_PRESENCE_DETECTED:
328            presence_text = "Large motion detected"
329        else:
330            presence_text = ""
331
332        text_y_pos = self.presence_plot.getAxis("left").range[1] * 0.95
333        text_x_pos = (
334            self.presence_plot.getAxis("bottom").range[1]
335            + self.presence_plot.getAxis("bottom").range[0]
336        ) / 2.0
337        self.presence_text_item.setPos(text_x_pos, text_y_pos)
338        self.presence_text_item.setHtml(presence_text)
339
340        # Breathing text
341        if app_state == AppState.ESTIMATE_BREATHING_RATE:
342            if (
343                ref_app_result.breathing_result is not None
344                and ref_app_result.breathing_result.breathing_rate is None
345            ):
346                time_series_text = "Initializing breathing detection"
347            elif self.displayed_breathing_rate is not None:
348                time_series_text = "Breathing rate: " + self.displayed_breathing_rate + " bpm"
349        else:
350            time_series_text = "Waiting for distance"
351
352        text_y_pos = self.time_series_plot.getAxis("left").range[1] * 0.95
353        text_x_pos = (
354            self.time_series_plot.getAxis("bottom").range[1]
355            + self.time_series_plot.getAxis("bottom").range[0]
356        ) / 2.0
357        self.time_series_text_item.setPos(text_x_pos, text_y_pos)
358        self.time_series_text_item.setHtml(time_series_text)
359
360        if self.displayed_breathing_rate is not None:
361            text_y_pos = self.breathing_rate_plot.getAxis("left").range[1] * 0.95
362            text_x_pos = time_vector[0]
363
364            self.breathing_rate_text_item.setPos(text_x_pos, text_y_pos)
365            self.breathing_rate_text_item.setHtml(self.displayed_breathing_rate + " bpm")
366
367
368if __name__ == "__main__":
369    main()

View this example on GitHub: acconeer/acconeer-python-exploration