examples/app/new/plugins/my_plugin.py

examples/app/new/plugins/my_plugin.py#

  1# Copyright (c) Acconeer AB, 2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import logging
  7import typing as t
  8from enum import Enum, auto
  9
 10import attrs
 11import numpy as np
 12import numpy.typing as npt
 13
 14import pyqtgraph as pg
 15
 16from acconeer.exptool import a121
 17from acconeer.exptool.a121.algo import AlgoParamEnum, AlgoProcessorConfigBase, ProcessorBase
 18from acconeer.exptool.a121.algo._plugins import (
 19    ProcessorBackendPluginBase,
 20    ProcessorBackendPluginSharedState,
 21    ProcessorPluginPreset,
 22    ProcessorViewPluginBase,
 23    SetupMessage,
 24)
 25from acconeer.exptool.a121.algo._utils import get_distances_m
 26from acconeer.exptool.app.new import (
 27    AppModel,
 28    Message,
 29    PgPlotPlugin,
 30    PidgetFactoryMapping,
 31    PluginFamily,
 32    PluginGeneration,
 33    PluginPresetBase,
 34    PluginSpecBase,
 35    backend,
 36    pidgets,
 37    register_plugin,
 38)
 39
 40
 41log = logging.getLogger(__name__)
 42
 43
 44class PluginPresetId(Enum):
 45    DEFAULT = auto()
 46
 47
 48class PlotColor(AlgoParamEnum):
 49    ACCONEER_BLUE = "#38bff0"
 50    WHITE = "#ffffff"
 51    BLACK = "#000000"
 52    PINK = "#f280a1"
 53
 54
 55@attrs.mutable(kw_only=True)
 56class ProcessorConfig(AlgoProcessorConfigBase):
 57    plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE, converter=PlotColor)
 58    """What color the plot graph should be."""
 59
 60    scale: float = attrs.field(default=1.0)
 61    """Allows you to scale the incoming amplitude by a factor."""
 62
 63    def _collect_validation_results(
 64        self, config: a121.SessionConfig
 65    ) -> t.List[a121.ValidationResult]:
 66        return []
 67
 68
 69@attrs.frozen(kw_only=True)
 70class ProcessorResult:
 71    scaled_mean_abs: npt.NDArray = attrs.field(default=np.array([]))
 72    plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE)
 73
 74
 75class Processor(ProcessorBase[ProcessorResult]):
 76    def __init__(
 77        self,
 78        *,
 79        sensor_config: a121.SensorConfig,
 80        metadata: a121.Metadata,
 81        processor_config: ProcessorConfig,
 82    ) -> None:
 83        self._scale = processor_config.scale
 84        self._plot_color = processor_config.plot_color
 85
 86    def process(self, result: a121.Result) -> ProcessorResult:
 87        frame = result.frame
 88        mean_sweep = frame.mean(axis=0)
 89        abs_mean_sweep = np.abs(mean_sweep)
 90        return ProcessorResult(
 91            scaled_mean_abs=self._scale * abs_mean_sweep, plot_color=self._plot_color
 92        )
 93
 94
 95class BackendPlugin(ProcessorBackendPluginBase):
 96    PLUGIN_PRESETS = {
 97        PluginPresetId.DEFAULT.value: lambda: ProcessorPluginPreset(
 98            session_config=a121.SessionConfig(),
 99            processor_config=ProcessorConfig(),
100        ),
101    }
102
103    @classmethod
104    def get_processor(cls, state: ProcessorBackendPluginSharedState[ProcessorConfig]) -> Processor:
105        if state.metadata is None:
106            raise RuntimeError("metadata is None")
107
108        if isinstance(state.metadata, list):
109            raise RuntimeError("metadata is unexpectedly extended")
110
111        return Processor(
112            sensor_config=state.session_config.sensor_config,
113            processor_config=state.processor_config,
114            metadata=state.metadata,
115        )
116
117    @classmethod
118    def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
119        return ProcessorConfig
120
121    @classmethod
122    def get_default_sensor_config(cls) -> a121.SensorConfig:
123        return a121.SensorConfig()
124
125
126class ViewPlugin(ProcessorViewPluginBase):
127    @classmethod
128    def get_pidget_mapping(cls) -> PidgetFactoryMapping:
129        return {
130            "plot_color": pidgets.EnumPidgetFactory(
131                enum_type=PlotColor,
132                name_label_text="Plot color:",
133                name_label_tooltip="What color the plot graph should be",
134                label_mapping={
135                    PlotColor.ACCONEER_BLUE: "Acconeer Blue",
136                    PlotColor.WHITE: "White",
137                    PlotColor.BLACK: "Black",
138                    PlotColor.PINK: "Pink",
139                },
140            ),
141            "scale": pidgets.FloatSliderPidgetFactory(
142                name_label_text="Scale:",
143                name_label_tooltip="Allows you to scale the incoming amplitude by a factor",
144                suffix="",
145                limits=(0.001, 1.0),
146                decimals=3,
147            ),
148        }
149
150    @classmethod
151    def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
152        return ProcessorConfig
153
154
155class PlotPlugin(PgPlotPlugin):
156    def __init__(self, app_model: AppModel) -> None:
157        super().__init__(app_model=app_model)
158        self._plot_job: t.Optional[ProcessorResult] = None
159        self._is_setup = False
160
161    def handle_message(self, message: backend.GeneralMessage) -> None:
162        if isinstance(message, backend.PlotMessage):
163            self._plot_job = message.result
164        elif isinstance(message, SetupMessage):
165            if isinstance(message.metadata, list):
166                raise RuntimeError("Metadata is unexpectedly extended")
167
168            self.setup(
169                metadata=message.metadata,
170                sensor_config=message.session_config.sensor_config,
171            )
172            self._is_setup = True
173        else:
174            log.warn(f"{self.__class__.__name__} got an unsupported command: {message.name!r}.")
175
176    def draw(self) -> None:
177        if not self._is_setup or self._plot_job is None:
178            return
179
180        try:
181            self.draw_plot_job(processor_result=self._plot_job)
182        finally:
183            self._plot_job = None
184
185    def setup(self, metadata: a121.Metadata, sensor_config: a121.SensorConfig) -> None:
186        self.plot_layout.clear()
187        self._distances_m = get_distances_m(sensor_config, metadata)
188
189        # amplitude plot
190        self.ampl_plot = pg.PlotItem()
191        self.ampl_plot.setMenuEnabled(False)
192        self.ampl_plot.showGrid(x=False, y=True)
193        self.ampl_plot.setLabel("left", "Amplitude")
194        self.ampl_plot.setLabel("bottom", "Distance (m)")
195        self.ampl_curve = self.ampl_plot.plot()
196
197        sublayout = self.plot_layout.addLayout()
198        sublayout.addItem(self.ampl_plot)
199
200    def draw_plot_job(self, processor_result: ProcessorResult) -> None:
201        self.ampl_plot.setYRange(0, processor_result.scaled_mean_abs.max())
202        self.ampl_curve.setData(
203            self._distances_m,
204            processor_result.scaled_mean_abs,
205            pen=pg.mkPen(processor_result.plot_color.value, width=2),
206        )
207
208
209class PluginSpec(PluginSpecBase):
210    def create_backend_plugin(
211        self, callback: t.Callable[[Message], None], key: str
212    ) -> BackendPlugin:
213        return BackendPlugin(callback=callback, generation=self.generation, key=key)
214
215    def create_view_plugin(self, app_model: AppModel) -> ViewPlugin:
216        return ViewPlugin(app_model=app_model)
217
218    def create_plot_plugin(self, app_model: AppModel) -> PlotPlugin:
219        return PlotPlugin(app_model=app_model)
220
221
222MY_PLUGIN = PluginSpec(
223    generation=PluginGeneration.A121,
224    key="my_plugin",
225    title="My Plugin",
226    description="My plugin.",
227    family=PluginFamily.EXTERNAL_PLUGIN,
228    presets=[
229        PluginPresetBase(name="Default", preset_id=PluginPresetId.DEFAULT),
230    ],
231    default_preset_id=PluginPresetId.DEFAULT,
232)
233
234
235def register() -> None:
236    register_plugin(MY_PLUGIN)

View this example on GitHub: acconeer/acconeer-python-exploration