examples/a121/algo/breathing/breathing_with_gui.py#
1# Copyright (c) Acconeer AB, 2022-2023
2# All rights reserved
3
4from __future__ import annotations
5
6import numpy as np
7
8# Added here to force pyqtgraph to choose PySide
9import PySide6 # noqa: F401
10from PySide6.QtGui import QFont
11
12import pyqtgraph as pg
13
14import acconeer.exptool as et
15from acconeer.exptool import a121
16from acconeer.exptool.a121.algo._utils import get_distances_m
17from acconeer.exptool.a121.algo.breathing import AppState, RefApp
18from acconeer.exptool.a121.algo.breathing._ref_app import (
19 BreathingProcessorConfig,
20 RefAppConfig,
21 get_sensor_config,
22)
23from acconeer.exptool.a121.algo.presence import ProcessorConfig as PresenceProcessorConfig
24
25
26def main():
27 args = a121.ExampleArgumentParser().parse_args()
28 et.utils.config_logging(args)
29
30 # Setup the configurations
31 # Detailed at https://docs.acconeer.com/en/latest/exploration_tool/algo/a121/ref_apps/breathing.html
32
33 # Sensor selections
34 sensor = 1
35
36 # Ref App Configurations
37 breathing_processor_config = BreathingProcessorConfig(
38 lowest_breathing_rate=6,
39 highest_breathing_rate=60,
40 time_series_length_s=20,
41 )
42
43 # Presence Configurations
44 presence_config = PresenceProcessorConfig(
45 intra_detection_threshold=4,
46 intra_frame_time_const=0.15,
47 inter_frame_fast_cutoff=20,
48 inter_frame_slow_cutoff=0.2,
49 inter_frame_deviation_time_const=0.5,
50 )
51
52 # Breathing Configurations
53 ref_app_config = RefAppConfig(
54 use_presence_processor=True,
55 num_distances_to_analyze=3,
56 distance_determination_duration=5,
57 breathing_config=breathing_processor_config,
58 presence_config=presence_config,
59 )
60
61 # End setup configurations
62
63 # Preparation for client
64 sensor_config = get_sensor_config(ref_app_config=ref_app_config)
65 client = a121.Client.open(**a121.get_client_args(args))
66 metadata = client.setup_session(sensor_config)
67
68 # Preparation for reference application processor
69 ref_app = RefApp(client=client, sensor_id=sensor, ref_app_config=ref_app_config)
70 ref_app.start()
71
72 pg_updater = PGUpdater(sensor_config, ref_app_config, metadata)
73 pg_process = et.PGProcess(pg_updater)
74 pg_process.start()
75
76 interrupt_handler = et.utils.ExampleInterruptHandler()
77 print("Press Ctrl-C to end session")
78
79 while not interrupt_handler.got_signal:
80 processed_data = ref_app.get_next()
81 try:
82 pg_process.put_data(processed_data)
83 except et.PGProccessDiedException:
84 break
85
86 ref_app.stop()
87 print("Disconnecting...")
88 client.close()
89
90
91class PGUpdater:
92 def __init__(
93 self,
94 sensor_config: a121.SensorConfig,
95 ref_app_config: RefAppConfig,
96 metadata: a121.Metadata,
97 ):
98 self.distances = get_distances_m(sensor_config, metadata)
99 self.use_presence_processor = ref_app_config.use_presence_processor
100
101 def setup(self, win):
102 # Define pens and font.
103 blue_color = et.utils.color_cycler(0)
104 orange_color = et.utils.color_cycler(1)
105 brush = et.utils.pg_brush_cycler(0)
106 self.blue = dict(
107 pen=pg.mkPen(blue_color, width=2),
108 symbol="o",
109 symbolSize=1,
110 symbolBrush=brush,
111 symbolPen="k",
112 )
113 self.orange = dict(
114 pen=pg.mkPen(orange_color, width=2),
115 symbol="o",
116 symbolSize=1,
117 symbolBrush=brush,
118 symbolPen="k",
119 )
120 self.blue_transparent_pen = pg.mkPen(f"{blue_color}50", width=2)
121 self.orange_transparent_pen = pg.mkPen(f"{orange_color}50", width=2)
122
123 brush_dot = et.utils.pg_brush_cycler(1)
124 symbol_dot_kw = dict(symbol="o", symbolSize=10, symbolBrush=brush_dot, symbolPen="k")
125
126 font = QFont()
127 font.setPixelSize(16)
128
129 # Presence plot.
130 self.presence_plot = win.addPlot(row=0, col=0)
131 self.presence_plot.setMenuEnabled(False)
132 self.presence_plot.showGrid(x=True, y=True)
133 self.presence_plot.addLegend()
134 self.presence_plot.setLabel("left", "Presence score")
135 self.presence_plot.setLabel("bottom", "Distance (m)")
136 self.presence_plot.addItem(pg.PlotDataItem())
137 self.presence_plot_curve = []
138 self.presence_plot_curve.append(self.presence_plot.plot(**self.blue))
139 self.presence_plot_curve.append(self.presence_plot.plot(**self.orange))
140 self.presence_plot_curve.append(self.presence_plot.plot(**self.blue))
141 self.presence_plot_curve.append(self.presence_plot.plot(**self.orange))
142
143 self.presence_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
144 self.presence_plot_legend.setParentItem(self.presence_plot)
145 self.presence_plot_legend.addItem(self.presence_plot_curve[2], "Slow motion")
146 self.presence_plot_legend.addItem(self.presence_plot_curve[3], "Fast motion")
147 self.presence_plot_legend.show()
148
149 self.presence_smoot_max = et.utils.SmoothMax()
150
151 self.presence_text_item = pg.TextItem(
152 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
153 anchor=(0.5, 0),
154 color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
155 )
156 self.presence_text_item.setFont(font)
157 self.presence_text_item.show()
158 self.presence_plot.addItem(self.presence_text_item)
159
160 # Time series plot.
161 self.time_series_plot = win.addPlot(row=1, col=0)
162 self.time_series_plot.setMenuEnabled(False)
163 self.time_series_plot.showGrid(x=True, y=True)
164 self.time_series_plot.addLegend()
165 self.time_series_plot.setLabel("left", "Displacement")
166 self.time_series_plot.setLabel("bottom", "Time (s)")
167 self.time_series_plot.addItem(pg.PlotDataItem())
168 self.time_series_curve = self.time_series_plot.plot(**self.blue)
169
170 self.time_series_text_item = pg.TextItem(
171 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
172 anchor=(0.5, 0),
173 color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
174 )
175 self.time_series_text_item.setFont(font)
176 self.time_series_text_item.show()
177 self.time_series_plot.addItem(self.time_series_text_item)
178
179 # Breathing psd plot.
180 self.breathing_psd_plot = win.addPlot(row=2, col=0)
181 self.breathing_psd_plot.setMenuEnabled(False)
182 self.breathing_psd_plot.showGrid(x=True, y=True)
183 self.breathing_psd_plot.addLegend()
184 self.breathing_psd_plot.setLabel("left", "PSD")
185 self.breathing_psd_plot.setLabel("bottom", "Breathing rate (Hz)")
186 self.breathing_psd_plot.addItem(pg.PlotDataItem())
187 self.breathing_psd_curve = self.breathing_psd_plot.plot(**self.blue)
188
189 self.psd_smoothing = et.utils.SmoothMax()
190
191 # Breathing rate plot.
192 self.breathing_rate_plot = win.addPlot(row=3, col=0)
193 self.breathing_rate_plot.setMenuEnabled(False)
194 self.breathing_rate_plot.showGrid(x=True, y=True)
195 self.breathing_rate_plot.addLegend()
196 self.breathing_rate_plot.setLabel("left", "Breaths per minute")
197 self.breathing_rate_plot.setLabel("bottom", "Time (s)")
198 self.breathing_rate_plot.addItem(pg.PlotDataItem())
199 self.breathing_rate_curves = []
200 self.breathing_rate_curves.append(self.breathing_rate_plot.plot(**self.blue))
201 self.breathing_rate_curves.append(
202 self.breathing_rate_plot.plot(**dict(pen=None, **symbol_dot_kw))
203 )
204 self.smooth_breathing_rate = et.utils.SmoothLimits()
205
206 self.breathing_rate_plot_legend = pg.LegendItem(offset=(0.0, 0.5))
207 self.breathing_rate_plot_legend.setParentItem(self.breathing_rate_plot)
208 self.breathing_rate_plot_legend.addItem(self.breathing_rate_curves[0], "Breathing rate")
209 self.breathing_rate_plot_legend.addItem(
210 self.breathing_rate_curves[1], "Breathing rate(embedded output)"
211 )
212 self.breathing_rate_plot_legend.hide()
213
214 self.breathing_rate_text_item = pg.TextItem(
215 fill=pg.mkColor(0xFF, 0x7F, 0x0E, 200),
216 anchor=(0.5, 0),
217 color=pg.mkColor(0xFF, 0xFF, 0xFF, 200),
218 )
219 self.breathing_rate_text_item.setFont(font)
220 self.breathing_rate_text_item.hide()
221 self.breathing_rate_plot.addItem(self.breathing_rate_text_item)
222
223 def update(self, ref_app_result):
224 app_state = ref_app_result.app_state
225
226 max_ampl = max(
227 np.max(ref_app_result.presence_result.inter),
228 np.max(ref_app_result.presence_result.intra),
229 )
230 lim = self.presence_smoot_max.update(max_ampl)
231 self.presence_plot.setYRange(0, lim)
232
233 if ref_app_result.distances_being_analyzed is None:
234 self.presence_plot_curve[0].setData(
235 self.distances, ref_app_result.presence_result.inter, **self.blue
236 )
237 self.presence_plot_curve[1].setData(
238 self.distances, ref_app_result.presence_result.intra, **self.orange
239 )
240 self.presence_plot_curve[2].setData([], [])
241 self.presence_plot_curve[3].setData([], [])
242 else:
243 start = ref_app_result.distances_being_analyzed[0]
244 end = ref_app_result.distances_being_analyzed[1]
245 s = slice(start, end)
246 distance_slice = self.distances[s]
247 self.presence_plot_curve[0].setData(
248 self.distances,
249 ref_app_result.presence_result.inter,
250 pen=self.blue_transparent_pen,
251 )
252 self.presence_plot_curve[1].setData(
253 self.distances,
254 ref_app_result.presence_result.intra,
255 pen=self.orange_transparent_pen,
256 )
257 self.presence_plot_curve[2].setData(
258 distance_slice, ref_app_result.presence_result.inter[s]
259 )
260 self.presence_plot_curve[3].setData(
261 distance_slice, ref_app_result.presence_result.intra[s]
262 )
263
264 if ref_app_result.breathing_result is not None:
265 breathing_result = ref_app_result.breathing_result.extra_result
266 breathing_motion = breathing_result.breathing_motion
267 psd = breathing_result.psd
268 frequencies = breathing_result.frequencies
269 time_vector = breathing_result.time_vector
270 all_breathing_rate_history = breathing_result.all_breathing_rate_history
271 breathing_rate_history = breathing_result.breathing_rate_history
272
273 self.time_series_curve.setData(
274 time_vector[-breathing_motion.shape[0] :], breathing_motion
275 )
276 y = np.max(np.abs(breathing_motion)) * 1.05
277 self.time_series_plot.setYRange(-y, y)
278 self.time_series_plot.setXRange(
279 time_vector[-breathing_motion.shape[0]], max(time_vector)
280 )
281
282 if not np.all(np.isnan(all_breathing_rate_history)):
283 ylim = self.psd_smoothing.update(psd)
284 self.breathing_psd_curve.setData(frequencies, psd)
285 self.breathing_psd_plot.setYRange(0, ylim)
286 self.breathing_psd_plot.setXRange(0, 2)
287
288 self.breathing_rate_curves[0].setData(time_vector, all_breathing_rate_history)
289 lims = self.smooth_breathing_rate.update(all_breathing_rate_history)
290 self.breathing_rate_plot.setYRange(lims[0] - 3, lims[1] + 3)
291
292 self.breathing_rate_plot_legend.show()
293
294 if not np.all(np.isnan(breathing_rate_history)):
295 self.breathing_rate_curves[1].setData(time_vector, breathing_rate_history)
296
297 if not np.isnan(breathing_rate_history[-1]):
298 self.displayed_breathing_rate = "{:.1f}".format(breathing_rate_history[-1])
299 self.breathing_rate_text_item.show()
300
301 else:
302 self.time_series_plot.setYRange(0, 1)
303 self.time_series_plot.setXRange(0, 1)
304 self.time_series_curve.setData([], [])
305 self.breathing_psd_curve.setData([], [])
306 self.breathing_rate_curves[0].setData([], [])
307 self.breathing_rate_curves[1].setData([], [])
308 self.displayed_breathing_rate = None
309 self.breathing_rate_text_item.hide()
310
311 # Set text in text boxes according to app state.
312
313 # Presence text
314 if app_state == AppState.NO_PRESENCE_DETECTED:
315 presence_text = "No presence detected"
316 elif app_state == AppState.DETERMINE_DISTANCE_ESTIMATE:
317 presence_text = "Determining distance with presence"
318 elif app_state == AppState.ESTIMATE_BREATHING_RATE:
319 start_m = "{:.2f}".format(distance_slice[0])
320 end_m = "{:.2f}".format(distance_slice[-1])
321 if self.use_presence_processor:
322 presence_text = (
323 "Presence detected in the range " + start_m + " - " + end_m + " (m)"
324 )
325 else:
326 presence_text = "Presence distance detection disabled"
327 elif app_state == AppState.INTRA_PRESENCE_DETECTED:
328 presence_text = "Large motion detected"
329 else:
330 presence_text = ""
331
332 text_y_pos = self.presence_plot.getAxis("left").range[1] * 0.95
333 text_x_pos = (
334 self.presence_plot.getAxis("bottom").range[1]
335 + self.presence_plot.getAxis("bottom").range[0]
336 ) / 2.0
337 self.presence_text_item.setPos(text_x_pos, text_y_pos)
338 self.presence_text_item.setHtml(presence_text)
339
340 # Breathing text
341 if app_state == AppState.ESTIMATE_BREATHING_RATE:
342 if (
343 ref_app_result.breathing_result is not None
344 and ref_app_result.breathing_result.breathing_rate is None
345 ):
346 time_series_text = "Initializing breathing detection"
347 elif self.displayed_breathing_rate is not None:
348 time_series_text = "Breathing rate: " + self.displayed_breathing_rate + " bpm"
349 else:
350 time_series_text = "Waiting for distance"
351
352 text_y_pos = self.time_series_plot.getAxis("left").range[1] * 0.95
353 text_x_pos = (
354 self.time_series_plot.getAxis("bottom").range[1]
355 + self.time_series_plot.getAxis("bottom").range[0]
356 ) / 2.0
357 self.time_series_text_item.setPos(text_x_pos, text_y_pos)
358 self.time_series_text_item.setHtml(time_series_text)
359
360 if self.displayed_breathing_rate is not None:
361 text_y_pos = self.breathing_rate_plot.getAxis("left").range[1] * 0.95
362 text_x_pos = time_vector[0]
363
364 self.breathing_rate_text_item.setPos(text_x_pos, text_y_pos)
365 self.breathing_rate_text_item.setHtml(self.displayed_breathing_rate + " bpm")
366
367
368if __name__ == "__main__":
369 main()
View this example on GitHub: acconeer/acconeer-python-exploration