examples/app/new/plugins/my_plugin.py#
1# Copyright (c) Acconeer AB, 2024
2# All rights reserved
3
4from __future__ import annotations
5
6import logging
7import typing as t
8from enum import Enum, auto
9
10import attrs
11import numpy as np
12import numpy.typing as npt
13
14import pyqtgraph as pg
15
16from acconeer.exptool import a121
17from acconeer.exptool.a121.algo import AlgoParamEnum, AlgoProcessorConfigBase, ProcessorBase
18from acconeer.exptool.a121.algo._plugins import (
19 ProcessorBackendPluginBase,
20 ProcessorBackendPluginSharedState,
21 ProcessorPluginPreset,
22 ProcessorViewPluginBase,
23 SetupMessage,
24)
25from acconeer.exptool.a121.algo._utils import get_distances_m
26from acconeer.exptool.app.new import (
27 AppModel,
28 Message,
29 PgPlotPlugin,
30 PidgetFactoryMapping,
31 PluginFamily,
32 PluginGeneration,
33 PluginPresetBase,
34 PluginSpecBase,
35 backend,
36 pidgets,
37 register_plugin,
38)
39
40
41log = logging.getLogger(__name__)
42
43
44class PluginPresetId(Enum):
45 DEFAULT = auto()
46
47
48class PlotColor(AlgoParamEnum):
49 ACCONEER_BLUE = "#38bff0"
50 WHITE = "#ffffff"
51 BLACK = "#000000"
52 PINK = "#f280a1"
53
54
55@attrs.mutable(kw_only=True)
56class ProcessorConfig(AlgoProcessorConfigBase):
57 plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE, converter=PlotColor)
58 """What color the plot graph should be."""
59
60 scale: float = attrs.field(default=1.0)
61 """Allows you to scale the incoming amplitude by a factor."""
62
63 def _collect_validation_results(
64 self, config: a121.SessionConfig
65 ) -> t.List[a121.ValidationResult]:
66 return []
67
68
69@attrs.frozen(kw_only=True)
70class ProcessorResult:
71 scaled_mean_abs: npt.NDArray = attrs.field(default=np.array([]))
72 plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE)
73
74
75class Processor(ProcessorBase[ProcessorResult]):
76 def __init__(
77 self,
78 *,
79 sensor_config: a121.SensorConfig,
80 metadata: a121.Metadata,
81 processor_config: ProcessorConfig,
82 ) -> None:
83 self._scale = processor_config.scale
84 self._plot_color = processor_config.plot_color
85
86 def process(self, result: a121.Result) -> ProcessorResult:
87 frame = result.frame
88 mean_sweep = frame.mean(axis=0)
89 abs_mean_sweep = np.abs(mean_sweep)
90 return ProcessorResult(
91 scaled_mean_abs=self._scale * abs_mean_sweep, plot_color=self._plot_color
92 )
93
94
95class BackendPlugin(ProcessorBackendPluginBase):
96 PLUGIN_PRESETS = {
97 PluginPresetId.DEFAULT.value: lambda: ProcessorPluginPreset(
98 session_config=a121.SessionConfig(),
99 processor_config=ProcessorConfig(),
100 ),
101 }
102
103 @classmethod
104 def get_processor(cls, state: ProcessorBackendPluginSharedState[ProcessorConfig]) -> Processor:
105 if state.metadata is None:
106 msg = "metadata is None"
107 raise RuntimeError(msg)
108
109 if isinstance(state.metadata, list):
110 msg = "metadata is unexpectedly extended"
111 raise RuntimeError(msg)
112
113 return Processor(
114 sensor_config=state.session_config.sensor_config,
115 processor_config=state.processor_config,
116 metadata=state.metadata,
117 )
118
119 @classmethod
120 def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
121 return ProcessorConfig
122
123 @classmethod
124 def get_default_sensor_config(cls) -> a121.SensorConfig:
125 return a121.SensorConfig()
126
127
128class ViewPlugin(ProcessorViewPluginBase):
129 @classmethod
130 def get_pidget_mapping(cls) -> PidgetFactoryMapping:
131 return {
132 "plot_color": pidgets.EnumPidgetFactory(
133 enum_type=PlotColor,
134 name_label_text="Plot color:",
135 name_label_tooltip="What color the plot graph should be",
136 label_mapping={
137 PlotColor.ACCONEER_BLUE: "Acconeer Blue",
138 PlotColor.WHITE: "White",
139 PlotColor.BLACK: "Black",
140 PlotColor.PINK: "Pink",
141 },
142 ),
143 "scale": pidgets.FloatSliderPidgetFactory(
144 name_label_text="Scale:",
145 name_label_tooltip="Allows you to scale the incoming amplitude by a factor",
146 suffix="",
147 limits=(0.001, 1.0),
148 decimals=3,
149 ),
150 }
151
152 @classmethod
153 def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
154 return ProcessorConfig
155
156
157class PlotPlugin(PgPlotPlugin):
158 def __init__(self, app_model: AppModel) -> None:
159 super().__init__(app_model=app_model)
160 self._plot_job: t.Optional[ProcessorResult] = None
161 self._is_setup = False
162
163 def handle_message(self, message: backend.GeneralMessage) -> None:
164 if isinstance(message, backend.PlotMessage):
165 self._plot_job = message.result
166 elif isinstance(message, SetupMessage):
167 if isinstance(message.metadata, list):
168 msg = "Metadata is unexpectedly extended"
169 raise RuntimeError(msg)
170
171 self.setup(
172 metadata=message.metadata,
173 sensor_config=message.session_config.sensor_config,
174 )
175 self._is_setup = True
176 else:
177 log.warn(f"{self.__class__.__name__} got an unsupported command: {message.name!r}.")
178
179 def draw(self) -> None:
180 if not self._is_setup or self._plot_job is None:
181 return
182
183 try:
184 self.draw_plot_job(processor_result=self._plot_job)
185 finally:
186 self._plot_job = None
187
188 def setup(self, metadata: a121.Metadata, sensor_config: a121.SensorConfig) -> None:
189 self.plot_layout.clear()
190 self._distances_m = get_distances_m(sensor_config, metadata)
191
192 # amplitude plot
193 self.ampl_plot = pg.PlotItem()
194 self.ampl_plot.setMenuEnabled(False)
195 self.ampl_plot.showGrid(x=False, y=True)
196 self.ampl_plot.setLabel("left", "Amplitude")
197 self.ampl_plot.setLabel("bottom", "Distance (m)")
198 self.ampl_curve = self.ampl_plot.plot()
199
200 sublayout = self.plot_layout.addLayout()
201 sublayout.addItem(self.ampl_plot)
202
203 def draw_plot_job(self, processor_result: ProcessorResult) -> None:
204 self.ampl_plot.setYRange(0, processor_result.scaled_mean_abs.max())
205 self.ampl_curve.setData(
206 self._distances_m,
207 processor_result.scaled_mean_abs,
208 pen=pg.mkPen(processor_result.plot_color.value, width=2),
209 )
210
211
212class PluginSpec(PluginSpecBase):
213 def create_backend_plugin(
214 self, callback: t.Callable[[Message], None], key: str
215 ) -> BackendPlugin:
216 return BackendPlugin(callback=callback, generation=self.generation, key=key)
217
218 def create_view_plugin(self, app_model: AppModel) -> ViewPlugin:
219 return ViewPlugin(app_model=app_model)
220
221 def create_plot_plugin(self, app_model: AppModel) -> PlotPlugin:
222 return PlotPlugin(app_model=app_model)
223
224
225MY_PLUGIN = PluginSpec(
226 generation=PluginGeneration.A121,
227 key="my_plugin",
228 title="My Plugin",
229 description="My plugin.",
230 family=PluginFamily.EXTERNAL_PLUGIN,
231 presets=[
232 PluginPresetBase(name="Default", preset_id=PluginPresetId.DEFAULT),
233 ],
234 default_preset_id=PluginPresetId.DEFAULT,
235)
236
237
238def register() -> None:
239 register_plugin(MY_PLUGIN)
View this example on GitHub: acconeer/acconeer-python-exploration