examples/a121/stress.py#
1# Copyright (c) Acconeer AB, 2022-2023
2# All rights reserved
3
4from __future__ import annotations
5
6import time
7import typing as t
8
9import acconeer.exptool as et
10from acconeer.exptool import a121
11
12
13def main() -> None:
14 parser = a121.ExampleArgumentParser()
15
16 parser.add_argument(
17 "program",
18 choices=[
19 "throughput",
20 "rate",
21 "short_pauses",
22 "long_pauses",
23 "flow",
24 ],
25 )
26
27 args = parser.parse_args()
28 et.utils.config_logging(args)
29
30 client = a121.Client.open(**a121.get_client_args(args))
31 print(client.client_info)
32 print()
33 print(client.server_info)
34 print()
35
36 if args.program == "throughput":
37 func = throughput
38 elif args.program == "rate":
39 func = rate
40 elif args.program == "short_pauses":
41 func = short_pauses
42 elif args.program == "long_pauses":
43 func = long_pauses
44 elif args.program == "flow":
45 func = flow
46 else:
47 raise RuntimeError
48
49 try:
50 func(client)
51 except KeyboardInterrupt:
52 print()
53 try:
54 client.close()
55 except Exception:
56 pass
57
58
59def flow(client: a121.Client.open) -> None:
60 client.close()
61
62 n = 0
63
64 while True:
65 for _ in range(5):
66 client.setup_session(a121.SensorConfig())
67 client.start_session()
68
69 for _ in range(5):
70 client.get_next()
71
72 client.stop_session()
73
74 client.close()
75
76 n += 1
77
78 print_status(f"{n:8}")
79
80
81def throughput(client: a121.Client.open) -> None:
82 config = a121.SensorConfig(
83 hwaas=1,
84 profile=a121.Profile.PROFILE_5,
85 prf=a121.PRF.PRF_13_0_MHz,
86 sweeps_per_frame=1,
87 start_point=0,
88 num_points=2047,
89 step_length=1,
90 double_buffering=True,
91 )
92 _stress_session(client, config)
93
94
95def rate(client: a121.Client.open) -> None:
96 config = a121.SensorConfig(
97 hwaas=1,
98 profile=a121.Profile.PROFILE_5,
99 prf=a121.PRF.PRF_13_0_MHz,
100 sweeps_per_frame=1,
101 start_point=0,
102 num_points=1,
103 step_length=1,
104 double_buffering=True,
105 )
106 _stress_session(client, config)
107
108
109def short_pauses(client: a121.Client.open) -> None:
110 _pauses(client, pause_time=0.25)
111
112
113def long_pauses(client: a121.Client.open) -> None:
114 _pauses(client, pause_time=1.5)
115
116
117def _pauses(client: a121.Client.open, *, pause_time: float) -> None:
118 config = a121.SensorConfig(
119 hwaas=1,
120 profile=a121.Profile.PROFILE_5,
121 prf=a121.PRF.PRF_13_0_MHz,
122 sweeps_per_frame=1,
123 start_point=0,
124 num_points=500,
125 step_length=1,
126 frame_rate=100.0,
127 )
128 _stress_session(client, config, pause_time=pause_time)
129
130
131def _stress_session(
132 client: a121.Client.open,
133 config: a121.SensorConfig,
134 *,
135 pause_time: t.Optional[float] = None,
136) -> None:
137 if pause_time is None:
138 pause_interval = None
139 else:
140 pause_interval = pause_time * 2
141
142 client.setup_session(config)
143 client.start_session()
144
145 n = 0
146 last_pause = time.monotonic()
147
148 while True:
149 client.get_next()
150
151 n += 1
152
153 now = time.monotonic()
154 if pause_interval is not None and (now - last_pause) > pause_interval:
155 last_pause = now
156 assert pause_time is not None
157 time.sleep(pause_time)
158
159 stats = client._rate_stats
160
161 bits_per_point = 32
162 points_per_frame = config.num_points * config.sweeps_per_frame
163 effective_bitrate = stats.rate * points_per_frame * bits_per_point
164
165 print_status(
166 " | ".join(
167 [
168 f"{n:8}",
169 f"{stats.rate:7.1f} Hz",
170 f"{stats.jitter * 1e6:7.1f} us jitter",
171 f"{effective_bitrate * 1e-6:7.1f} Mbit/s",
172 ]
173 )
174 )
175
176
177def print_status(s: str) -> None:
178 now = time.monotonic()
179 last_time = getattr(print_status, "last_time", -1)
180 dt = now - last_time
181
182 if dt < 0.25:
183 return
184
185 setattr(print_status, "last_time", now)
186 print(s, end="\r", flush=True)
187
188
189if __name__ == "__main__":
190 main()
View this example on GitHub: acconeer/acconeer-python-exploration