Skip to content

Commit

Permalink
Make StandardReadable into generic PluggableDevice
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl committed Aug 4, 2023
1 parent 5c413cc commit d2dbba9
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 67 deletions.
154 changes: 97 additions & 57 deletions ophyd/v2/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from enum import Enum
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Awaitable,
Callable,
Coroutine,
Expand All @@ -37,6 +37,7 @@

import numpy as np
from bluesky.protocols import (
Asset,
Configurable,
Descriptor,
Dtype,
Expand All @@ -47,6 +48,8 @@
Stageable,
Status,
Subscribable,
Triggerable,
WritesExternalAssets,
)
from bluesky.run_engine import call_in_bluesky_event_loop

Expand Down Expand Up @@ -782,7 +785,7 @@ async def execute(self, wait=True, timeout=None):
await self._backend.put(None, wait=wait, timeout=timeout or self._timeout)


async def observe_value(signal: SignalR[T]) -> AsyncGenerator[T, None]:
async def observe_value(signal: SignalR[T]) -> AsyncIterator[T]:
"""Subscribe to the value of a signal so it can be iterated from.
Parameters
Expand Down Expand Up @@ -898,83 +901,120 @@ async def set_and_wait_for_value(
return status


async def merge_gathered_dicts(
coros: Iterable[Awaitable[Dict[str, T]]]
async def _merge_gathered_dicts(
callables: Iterable[Callable[[], Awaitable[Dict[str, T]]]]
) -> Dict[str, T]:
"""Merge dictionaries produced by a sequence of coroutines.
Can be used for merging ``read()`` or ``describe``. For instance::
combined_read = await merge_gathered_dicts(s.read() for s in signals)
"""
"""Merge dictionaries produced by a sequence of coroutines"""
ret: Dict[str, T] = {}
coros = [f() for f in callables]
for result in await asyncio.gather(*coros):
ret.update(result)
return ret


class StandardReadable(Device, Readable, Configurable, Stageable):
"""Device that owns its children and provides useful default behavior.
- When its name is set it renames child Devices
- Signals can be registered for read() and read_configuration()
- These signals will be subscribed for read() between stage() and unstage()
"""
DescribePlug = Callable[[], Awaitable[Dict[str, Descriptor]]]
ReadPlug = Callable[[], Awaitable[Dict[str, Reading]]]
StatusPlug = Callable[[], AsyncStatus]
AssetPlug = Callable[[], AsyncIterator[Asset]]

_read_signals: Tuple[SignalR, ...] = ()
_configuration_signals: Tuple[SignalR, ...] = ()
_read_uncached_signals: Tuple[SignalR, ...] = ()

def set_readable_signals(
class PluggableDevice(
Device, Stageable, Configurable, Triggerable, Readable, WritesExternalAssets
):
_stage_plugs: Tuple[StatusPlug, ...] = ()
_describe_config_plugs: Tuple[DescribePlug, ...] = ()
_read_config_plugs: Tuple[ReadPlug, ...] = ()
_describe_plugs: Tuple[DescribePlug, ...] = ()
_trigger_plugs: Tuple[StatusPlug, ...] = ()
_read_plugs: Tuple[ReadPlug, ...] = ()
_collect_asset_docs_plugs: Tuple[AssetPlug, ...] = ()
_unstage_plugs: Tuple[StatusPlug, ...] = ()

def add_plug(
self,
read: Sequence[SignalR] = (),
config: Sequence[SignalR] = (),
read_uncached: Sequence[SignalR] = (),
*,
stage: Optional[StatusPlug] = None,
describe_config: Optional[DescribePlug] = None,
read_config: Optional[ReadPlug] = None,
describe: Optional[DescribePlug] = None,
trigger: Optional[StatusPlug] = None,
read: Optional[ReadPlug] = None,
collect_asset_docs: Optional[AssetPlug] = None,
unstage: Optional[StatusPlug] = None,
):
"""
Parameters
----------
read:
Signals to make up `read()`
conf:
Signals to make up `read_configuration()`
read_uncached:
Signals to make up `read()` that won't be cached
"""
self._read_signals = tuple(read)
self._configuration_signals = tuple(config)
self._read_uncached_signals = tuple(read_uncached)
if stage:
self._stage_plugs += (stage,)
if describe_config:
self._describe_config_plugs += (describe_config,)
if read_config:
self._read_config_plugs += (read_config,)
if describe:
self._describe_plugs += (describe,)
if trigger:
self._trigger_plugs += (trigger,)
if read:
self._read_plugs += (read,)
if collect_asset_docs:
self._collect_asset_docs_plugs += (collect_asset_docs,)
if unstage:
self._unstage_plugs += (unstage,)

@AsyncStatus.wrap
async def stage(self) -> None:
for sig in self._read_signals + self._configuration_signals:
await sig.stage().task

@AsyncStatus.wrap
async def unstage(self) -> None:
for sig in self._read_signals + self._configuration_signals:
await sig.unstage().task
await asyncio.wait([plug().task for plug in self._stage_plugs])

async def describe_configuration(self) -> Dict[str, Descriptor]:
return await merge_gathered_dicts(
[sig.describe() for sig in self._configuration_signals]
)
return await _merge_gathered_dicts(self._describe_config_plugs)

async def read_configuration(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(
[sig.read() for sig in self._configuration_signals]
)
return await _merge_gathered_dicts(self._read_config_plugs)

async def describe(self) -> Dict[str, Descriptor]:
return await merge_gathered_dicts(
[sig.describe() for sig in self._read_signals + self._read_uncached_signals]
)
return await _merge_gathered_dicts(self._describe_plugs)

@AsyncStatus.wrap
async def trigger(self) -> None:
await asyncio.wait(plug().task for plug in self._trigger_plugs)

async def read(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(
[sig.read() for sig in self._read_signals]
+ [sig.read(cached=False) for sig in self._read_uncached_signals]
)
return await _merge_gathered_dicts(self._read_plugs)

async def collect_asset_docs(self) -> AsyncIterator[Asset]:
for plug in self._collect_asset_docs_plugs:
async for asset in plug():
yield asset

@AsyncStatus.wrap
async def unstage(self) -> None:
await asyncio.wait([plug().task for plug in self._unstage_plugs])


def add_readable_signals(
device: PluggableDevice,
read: Sequence[SignalR] = (),
config: Sequence[SignalR] = (),
read_uncached: Sequence[SignalR] = (),
):
"""
Parameters
----------
read:
Signals to make up `read()`
conf:
Signals to make up `read_configuration()`
read_uncached:
Signals to make up `read()` that won't be cached
"""
read, config, read_uncached = tuple(read), tuple(config), tuple(read_uncached)
for sig in read + config:
# Start monitoring them so read is fast
device.add_plug(stage=sig.stage, unstage=sig.unstage)
for sig in config:
device.add_plug(describe_config=sig.describe, read_config=sig.read)
for sig in read:
device.add_plug(describe=sig.describe, read=sig.read)
for sig in read_uncached:
device.add_plug(describe=sig.describe, read=lambda: sig.read(cached=False))


VT = TypeVar("VT", bound=Device)
Expand Down
22 changes: 12 additions & 10 deletions ophyd/v2/epicsdemo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
import numpy as np
from bluesky.protocols import Movable, Stoppable

from ophyd.v2.core import AsyncStatus, Device, StandardReadable, observe_value
from ophyd.v2.core import (
AsyncStatus,
Device,
PluggableDevice,
add_readable_signals,
observe_value,
)
from ophyd.v2.epics import epics_signal_r, epics_signal_rw, epics_signal_x


Expand All @@ -21,22 +27,19 @@ class EnergyMode(Enum):
high = "High Energy"


class Sensor(StandardReadable):
class Sensor(PluggableDevice):
"""A demo sensor that produces a scalar value based on X and Y Movers"""

def __init__(self, prefix: str, name="") -> None:
# Define some signals
self.value = epics_signal_r(float, prefix + "Value")
self.mode = epics_signal_rw(EnergyMode, prefix + "Mode")
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.value],
config=[self.mode],
)
add_readable_signals(self, read=[self.value], config=[self.mode])
super().__init__(name=name)


class Mover(StandardReadable, Movable, Stoppable):
class Mover(PluggableDevice, Movable, Stoppable):
"""A demo movable that moves based on velocity"""

def __init__(self, prefix: str, name="") -> None:
Expand All @@ -51,9 +54,8 @@ def __init__(self, prefix: str, name="") -> None:
# Whether set() should complete successfully or not
self._set_success = True
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.readback],
config=[self.velocity, self.units],
add_readable_signals(
self, read=[self.readback], config=[self.velocity, self.units]
)
super().__init__(name=name)

Expand Down

0 comments on commit d2dbba9

Please sign in to comment.