examples/app/new/plugins/my_plugin.py#
1# Copyright (c) Acconeer AB, 2024
2# All rights reserved
3
4from __future__ import annotations
5
6import logging
7import typing as t
8from enum import Enum, auto
9
10import attrs
11import numpy as np
12import numpy.typing as npt
13
14import pyqtgraph as pg
15
16from acconeer.exptool import a121
17from acconeer.exptool.a121.algo import AlgoParamEnum, AlgoProcessorConfigBase, ProcessorBase
18from acconeer.exptool.a121.algo._plugins import (
19 ProcessorBackendPluginBase,
20 ProcessorBackendPluginSharedState,
21 ProcessorPluginPreset,
22 ProcessorViewPluginBase,
23 SetupMessage,
24)
25from acconeer.exptool.a121.algo._utils import get_distances_m
26from acconeer.exptool.app.new import (
27 AppModel,
28 Message,
29 PgPlotPlugin,
30 PidgetFactoryMapping,
31 PluginFamily,
32 PluginGeneration,
33 PluginPresetBase,
34 PluginSpecBase,
35 backend,
36 pidgets,
37 register_plugin,
38)
39
40
41log = logging.getLogger(__name__)
42
43
44class PluginPresetId(Enum):
45 DEFAULT = auto()
46
47
48class PlotColor(AlgoParamEnum):
49 ACCONEER_BLUE = "#38bff0"
50 WHITE = "#ffffff"
51 BLACK = "#000000"
52 PINK = "#f280a1"
53
54
55@attrs.mutable(kw_only=True)
56class ProcessorConfig(AlgoProcessorConfigBase):
57 plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE, converter=PlotColor)
58 """What color the plot graph should be."""
59
60 scale: float = attrs.field(default=1.0)
61 """Allows you to scale the incoming amplitude by a factor."""
62
63 def _collect_validation_results(
64 self, config: a121.SessionConfig
65 ) -> t.List[a121.ValidationResult]:
66 return []
67
68
69@attrs.frozen(kw_only=True)
70class ProcessorResult:
71 scaled_mean_abs: npt.NDArray = attrs.field(default=np.array([]))
72 plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE)
73
74
75class Processor(ProcessorBase[ProcessorResult]):
76 def __init__(
77 self,
78 *,
79 sensor_config: a121.SensorConfig,
80 metadata: a121.Metadata,
81 processor_config: ProcessorConfig,
82 ) -> None:
83 self._scale = processor_config.scale
84 self._plot_color = processor_config.plot_color
85
86 def process(self, result: a121.Result) -> ProcessorResult:
87 frame = result.frame
88 mean_sweep = frame.mean(axis=0)
89 abs_mean_sweep = np.abs(mean_sweep)
90 return ProcessorResult(
91 scaled_mean_abs=self._scale * abs_mean_sweep, plot_color=self._plot_color
92 )
93
94
95class BackendPlugin(ProcessorBackendPluginBase):
96 PLUGIN_PRESETS = {
97 PluginPresetId.DEFAULT.value: lambda: ProcessorPluginPreset(
98 session_config=a121.SessionConfig(),
99 processor_config=ProcessorConfig(),
100 ),
101 }
102
103 @classmethod
104 def get_processor(cls, state: ProcessorBackendPluginSharedState[ProcessorConfig]) -> Processor:
105 if state.metadata is None:
106 raise RuntimeError("metadata is None")
107
108 if isinstance(state.metadata, list):
109 raise RuntimeError("metadata is unexpectedly extended")
110
111 return Processor(
112 sensor_config=state.session_config.sensor_config,
113 processor_config=state.processor_config,
114 metadata=state.metadata,
115 )
116
117 @classmethod
118 def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
119 return ProcessorConfig
120
121 @classmethod
122 def get_default_sensor_config(cls) -> a121.SensorConfig:
123 return a121.SensorConfig()
124
125
126class ViewPlugin(ProcessorViewPluginBase):
127 @classmethod
128 def get_pidget_mapping(cls) -> PidgetFactoryMapping:
129 return {
130 "plot_color": pidgets.EnumPidgetFactory(
131 enum_type=PlotColor,
132 name_label_text="Plot color:",
133 name_label_tooltip="What color the plot graph should be",
134 label_mapping={
135 PlotColor.ACCONEER_BLUE: "Acconeer Blue",
136 PlotColor.WHITE: "White",
137 PlotColor.BLACK: "Black",
138 PlotColor.PINK: "Pink",
139 },
140 ),
141 "scale": pidgets.FloatSliderPidgetFactory(
142 name_label_text="Scale:",
143 name_label_tooltip="Allows you to scale the incoming amplitude by a factor",
144 suffix="",
145 limits=(0.001, 1.0),
146 decimals=3,
147 ),
148 }
149
150 @classmethod
151 def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
152 return ProcessorConfig
153
154
155class PlotPlugin(PgPlotPlugin):
156 def __init__(self, app_model: AppModel) -> None:
157 super().__init__(app_model=app_model)
158 self._plot_job: t.Optional[ProcessorResult] = None
159 self._is_setup = False
160
161 def handle_message(self, message: backend.GeneralMessage) -> None:
162 if isinstance(message, backend.PlotMessage):
163 self._plot_job = message.result
164 elif isinstance(message, SetupMessage):
165 if isinstance(message.metadata, list):
166 raise RuntimeError("Metadata is unexpectedly extended")
167
168 self.setup(
169 metadata=message.metadata,
170 sensor_config=message.session_config.sensor_config,
171 )
172 self._is_setup = True
173 else:
174 log.warn(f"{self.__class__.__name__} got an unsupported command: {message.name!r}.")
175
176 def draw(self) -> None:
177 if not self._is_setup or self._plot_job is None:
178 return
179
180 try:
181 self.draw_plot_job(processor_result=self._plot_job)
182 finally:
183 self._plot_job = None
184
185 def setup(self, metadata: a121.Metadata, sensor_config: a121.SensorConfig) -> None:
186 self.plot_layout.clear()
187 self._distances_m = get_distances_m(sensor_config, metadata)
188
189 # amplitude plot
190 self.ampl_plot = pg.PlotItem()
191 self.ampl_plot.setMenuEnabled(False)
192 self.ampl_plot.showGrid(x=False, y=True)
193 self.ampl_plot.setLabel("left", "Amplitude")
194 self.ampl_plot.setLabel("bottom", "Distance (m)")
195 self.ampl_curve = self.ampl_plot.plot()
196
197 sublayout = self.plot_layout.addLayout()
198 sublayout.addItem(self.ampl_plot)
199
200 def draw_plot_job(self, processor_result: ProcessorResult) -> None:
201 self.ampl_plot.setYRange(0, processor_result.scaled_mean_abs.max())
202 self.ampl_curve.setData(
203 self._distances_m,
204 processor_result.scaled_mean_abs,
205 pen=pg.mkPen(processor_result.plot_color.value, width=2),
206 )
207
208
209class PluginSpec(PluginSpecBase):
210 def create_backend_plugin(
211 self, callback: t.Callable[[Message], None], key: str
212 ) -> BackendPlugin:
213 return BackendPlugin(callback=callback, generation=self.generation, key=key)
214
215 def create_view_plugin(self, app_model: AppModel) -> ViewPlugin:
216 return ViewPlugin(app_model=app_model)
217
218 def create_plot_plugin(self, app_model: AppModel) -> PlotPlugin:
219 return PlotPlugin(app_model=app_model)
220
221
222MY_PLUGIN = PluginSpec(
223 generation=PluginGeneration.A121,
224 key="my_plugin",
225 title="My Plugin",
226 description="My plugin.",
227 family=PluginFamily.EXTERNAL_PLUGIN,
228 presets=[
229 PluginPresetBase(name="Default", preset_id=PluginPresetId.DEFAULT),
230 ],
231 default_preset_id=PluginPresetId.DEFAULT,
232)
233
234
235def register() -> None:
236 register_plugin(MY_PLUGIN)
View this example on GitHub: acconeer/acconeer-python-exploration