examples/app/new/plugins/my_plugin.py

examples/app/new/plugins/my_plugin.py#

  1# Copyright (c) Acconeer AB, 2024
  2# All rights reserved
  3
  4from __future__ import annotations
  5
  6import logging
  7import typing as t
  8from enum import Enum, auto
  9
 10import attrs
 11import numpy as np
 12import numpy.typing as npt
 13
 14import pyqtgraph as pg
 15
 16from acconeer.exptool import a121
 17from acconeer.exptool.a121.algo import AlgoParamEnum, AlgoProcessorConfigBase, ProcessorBase
 18from acconeer.exptool.a121.algo._plugins import (
 19    ProcessorBackendPluginBase,
 20    ProcessorBackendPluginSharedState,
 21    ProcessorPluginPreset,
 22    ProcessorViewPluginBase,
 23    SetupMessage,
 24)
 25from acconeer.exptool.a121.algo._utils import get_distances_m
 26from acconeer.exptool.app.new import (
 27    AppModel,
 28    Message,
 29    PgPlotPlugin,
 30    PidgetFactoryMapping,
 31    PluginFamily,
 32    PluginGeneration,
 33    PluginPresetBase,
 34    PluginSpecBase,
 35    backend,
 36    pidgets,
 37    register_plugin,
 38)
 39
 40
 41log = logging.getLogger(__name__)
 42
 43
 44class PluginPresetId(Enum):
 45    DEFAULT = auto()
 46
 47
 48class PlotColor(AlgoParamEnum):
 49    ACCONEER_BLUE = "#38bff0"
 50    WHITE = "#ffffff"
 51    BLACK = "#000000"
 52    PINK = "#f280a1"
 53
 54
 55@attrs.mutable(kw_only=True)
 56class ProcessorConfig(AlgoProcessorConfigBase):
 57    plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE, converter=PlotColor)
 58    """What color the plot graph should be."""
 59
 60    scale: float = attrs.field(default=1.0)
 61    """Allows you to scale the incoming amplitude by a factor."""
 62
 63    def _collect_validation_results(
 64        self, config: a121.SessionConfig
 65    ) -> t.List[a121.ValidationResult]:
 66        return []
 67
 68
 69@attrs.frozen(kw_only=True)
 70class ProcessorResult:
 71    scaled_mean_abs: npt.NDArray = attrs.field(default=np.array([]))
 72    plot_color: PlotColor = attrs.field(default=PlotColor.ACCONEER_BLUE)
 73
 74
 75class Processor(ProcessorBase[ProcessorResult]):
 76    def __init__(
 77        self,
 78        *,
 79        sensor_config: a121.SensorConfig,
 80        metadata: a121.Metadata,
 81        processor_config: ProcessorConfig,
 82    ) -> None:
 83        self._scale = processor_config.scale
 84        self._plot_color = processor_config.plot_color
 85
 86    def process(self, result: a121.Result) -> ProcessorResult:
 87        frame = result.frame
 88        mean_sweep = frame.mean(axis=0)
 89        abs_mean_sweep = np.abs(mean_sweep)
 90        return ProcessorResult(
 91            scaled_mean_abs=self._scale * abs_mean_sweep, plot_color=self._plot_color
 92        )
 93
 94
 95class BackendPlugin(ProcessorBackendPluginBase):
 96    PLUGIN_PRESETS = {
 97        PluginPresetId.DEFAULT.value: lambda: ProcessorPluginPreset(
 98            session_config=a121.SessionConfig(),
 99            processor_config=ProcessorConfig(),
100        ),
101    }
102
103    @classmethod
104    def get_processor(cls, state: ProcessorBackendPluginSharedState[ProcessorConfig]) -> Processor:
105        if state.metadata is None:
106            msg = "metadata is None"
107            raise RuntimeError(msg)
108
109        if isinstance(state.metadata, list):
110            msg = "metadata is unexpectedly extended"
111            raise RuntimeError(msg)
112
113        return Processor(
114            sensor_config=state.session_config.sensor_config,
115            processor_config=state.processor_config,
116            metadata=state.metadata,
117        )
118
119    @classmethod
120    def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
121        return ProcessorConfig
122
123    @classmethod
124    def get_default_sensor_config(cls) -> a121.SensorConfig:
125        return a121.SensorConfig()
126
127
128class ViewPlugin(ProcessorViewPluginBase):
129    @classmethod
130    def get_pidget_mapping(cls) -> PidgetFactoryMapping:
131        return {
132            "plot_color": pidgets.EnumPidgetFactory(
133                enum_type=PlotColor,
134                name_label_text="Plot color:",
135                name_label_tooltip="What color the plot graph should be",
136                label_mapping={
137                    PlotColor.ACCONEER_BLUE: "Acconeer Blue",
138                    PlotColor.WHITE: "White",
139                    PlotColor.BLACK: "Black",
140                    PlotColor.PINK: "Pink",
141                },
142            ),
143            "scale": pidgets.FloatSliderPidgetFactory(
144                name_label_text="Scale:",
145                name_label_tooltip="Allows you to scale the incoming amplitude by a factor",
146                suffix="",
147                limits=(0.001, 1.0),
148                decimals=3,
149            ),
150        }
151
152    @classmethod
153    def get_processor_config_cls(cls) -> t.Type[ProcessorConfig]:
154        return ProcessorConfig
155
156
157class PlotPlugin(PgPlotPlugin):
158    def __init__(self, app_model: AppModel) -> None:
159        super().__init__(app_model=app_model)
160        self._plot_job: t.Optional[ProcessorResult] = None
161        self._is_setup = False
162
163    def handle_message(self, message: backend.GeneralMessage) -> None:
164        if isinstance(message, backend.PlotMessage):
165            self._plot_job = message.result
166        elif isinstance(message, SetupMessage):
167            if isinstance(message.metadata, list):
168                msg = "Metadata is unexpectedly extended"
169                raise RuntimeError(msg)
170
171            self.setup(
172                metadata=message.metadata,
173                sensor_config=message.session_config.sensor_config,
174            )
175            self._is_setup = True
176        else:
177            log.warn(f"{self.__class__.__name__} got an unsupported command: {message.name!r}.")
178
179    def draw(self) -> None:
180        if not self._is_setup or self._plot_job is None:
181            return
182
183        try:
184            self.draw_plot_job(processor_result=self._plot_job)
185        finally:
186            self._plot_job = None
187
188    def setup(self, metadata: a121.Metadata, sensor_config: a121.SensorConfig) -> None:
189        self.plot_layout.clear()
190        self._distances_m = get_distances_m(sensor_config, metadata)
191
192        # amplitude plot
193        self.ampl_plot = pg.PlotItem()
194        self.ampl_plot.setMenuEnabled(False)
195        self.ampl_plot.showGrid(x=False, y=True)
196        self.ampl_plot.setLabel("left", "Amplitude")
197        self.ampl_plot.setLabel("bottom", "Distance (m)")
198        self.ampl_curve = self.ampl_plot.plot()
199
200        sublayout = self.plot_layout.addLayout()
201        sublayout.addItem(self.ampl_plot)
202
203    def draw_plot_job(self, processor_result: ProcessorResult) -> None:
204        self.ampl_plot.setYRange(0, processor_result.scaled_mean_abs.max())
205        self.ampl_curve.setData(
206            self._distances_m,
207            processor_result.scaled_mean_abs,
208            pen=pg.mkPen(processor_result.plot_color.value, width=2),
209        )
210
211
212class PluginSpec(PluginSpecBase):
213    def create_backend_plugin(
214        self, callback: t.Callable[[Message], None], key: str
215    ) -> BackendPlugin:
216        return BackendPlugin(callback=callback, generation=self.generation, key=key)
217
218    def create_view_plugin(self, app_model: AppModel) -> ViewPlugin:
219        return ViewPlugin(app_model=app_model)
220
221    def create_plot_plugin(self, app_model: AppModel) -> PlotPlugin:
222        return PlotPlugin(app_model=app_model)
223
224
225MY_PLUGIN = PluginSpec(
226    generation=PluginGeneration.A121,
227    key="my_plugin",
228    title="My Plugin",
229    description="My plugin.",
230    family=PluginFamily.EXTERNAL_PLUGIN,
231    presets=[
232        PluginPresetBase(name="Default", preset_id=PluginPresetId.DEFAULT),
233    ],
234    default_preset_id=PluginPresetId.DEFAULT,
235)
236
237
238def register() -> None:
239    register_plugin(MY_PLUGIN)

View this example on GitHub: acconeer/acconeer-python-exploration