examples/a121/stress.py

examples/a121/stress.py#

  1# Copyright (c) Acconeer AB, 2022-2023
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import time
  7import typing as t
  8
  9import acconeer.exptool as et
 10from acconeer.exptool import a121
 11
 12
 13def main() -> None:
 14    parser = a121.ExampleArgumentParser()
 15
 16    parser.add_argument(
 17        "program",
 18        choices=[
 19            "throughput",
 20            "rate",
 21            "short_pauses",
 22            "long_pauses",
 23            "flow",
 24        ],
 25    )
 26
 27    args = parser.parse_args()
 28    et.utils.config_logging(args)
 29
 30    client = a121.Client.open(**a121.get_client_args(args))
 31    print(client.client_info)
 32    print()
 33    print(client.server_info)
 34    print()
 35
 36    if args.program == "throughput":
 37        func = throughput
 38    elif args.program == "rate":
 39        func = rate
 40    elif args.program == "short_pauses":
 41        func = short_pauses
 42    elif args.program == "long_pauses":
 43        func = long_pauses
 44    elif args.program == "flow":
 45        func = flow
 46    else:
 47        raise RuntimeError
 48
 49    try:
 50        func(client)
 51    except KeyboardInterrupt:
 52        print()
 53        try:
 54            client.close()
 55        except Exception:
 56            pass
 57
 58
 59def flow(client: a121.Client.open) -> None:
 60    client.close()
 61
 62    n = 0
 63
 64    while True:
 65        for _ in range(5):
 66            client.setup_session(a121.SensorConfig())
 67            client.start_session()
 68
 69            for _ in range(5):
 70                client.get_next()
 71
 72            client.stop_session()
 73
 74        client.close()
 75
 76        n += 1
 77
 78        print_status(f"{n:8}")
 79
 80
 81def throughput(client: a121.Client.open) -> None:
 82    config = a121.SensorConfig(
 83        hwaas=1,
 84        profile=a121.Profile.PROFILE_5,
 85        prf=a121.PRF.PRF_13_0_MHz,
 86        sweeps_per_frame=1,
 87        start_point=0,
 88        num_points=2047,
 89        step_length=1,
 90        double_buffering=True,
 91    )
 92    _stress_session(client, config)
 93
 94
 95def rate(client: a121.Client.open) -> None:
 96    config = a121.SensorConfig(
 97        hwaas=1,
 98        profile=a121.Profile.PROFILE_5,
 99        prf=a121.PRF.PRF_13_0_MHz,
100        sweeps_per_frame=1,
101        start_point=0,
102        num_points=1,
103        step_length=1,
104        double_buffering=True,
105    )
106    _stress_session(client, config)
107
108
109def short_pauses(client: a121.Client.open) -> None:
110    _pauses(client, pause_time=0.25)
111
112
113def long_pauses(client: a121.Client.open) -> None:
114    _pauses(client, pause_time=1.5)
115
116
117def _pauses(client: a121.Client.open, *, pause_time: float) -> None:
118    config = a121.SensorConfig(
119        hwaas=1,
120        profile=a121.Profile.PROFILE_5,
121        prf=a121.PRF.PRF_13_0_MHz,
122        sweeps_per_frame=1,
123        start_point=0,
124        num_points=500,
125        step_length=1,
126        frame_rate=100.0,
127    )
128    _stress_session(client, config, pause_time=pause_time)
129
130
131def _stress_session(
132    client: a121.Client.open,
133    config: a121.SensorConfig,
134    *,
135    pause_time: t.Optional[float] = None,
136) -> None:
137    if pause_time is None:
138        pause_interval = None
139    else:
140        pause_interval = pause_time * 2
141
142    client.setup_session(config)
143    client.start_session()
144
145    n = 0
146    last_pause = time.monotonic()
147
148    while True:
149        client.get_next()
150
151        n += 1
152
153        now = time.monotonic()
154        if pause_interval is not None and (now - last_pause) > pause_interval:
155            last_pause = now
156            assert pause_time is not None
157            time.sleep(pause_time)
158
159        stats = client._rate_stats
160
161        bits_per_point = 32
162        points_per_frame = config.num_points * config.sweeps_per_frame
163        effective_bitrate = stats.rate * points_per_frame * bits_per_point
164
165        print_status(
166            " | ".join(
167                [
168                    f"{n:8}",
169                    f"{stats.rate:7.1f} Hz",
170                    f"{stats.jitter * 1e6:7.1f} us jitter",
171                    f"{effective_bitrate * 1e-6:7.1f} Mbit/s",
172                ]
173            )
174        )
175
176
177def print_status(s: str) -> None:
178    now = time.monotonic()
179    last_time = getattr(print_status, "last_time", -1)
180    dt = now - last_time
181
182    if dt < 0.25:
183        return
184
185    setattr(print_status, "last_time", now)
186    print(s, end="\r", flush=True)
187
188
189if __name__ == "__main__":
190    main()

View this example on GitHub: acconeer/acconeer-python-exploration