Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pytest
import simplejson as json
from qtpy.QtCore import QRect
from qtpy.QtGui import QPaintEvent
from qtpy.QtWidgets import QWidget
import typhos
from ophyd import Component as Cpt
from ophyd import Device
from typhos.utils import (TyphosBase, clean_name, load_suite,
no_device_lazy_load, saved_template, use_stylesheet)
class NestedDevice(Device):
phi = Cpt(Device)
class LayeredDevice(Device):
radial = Cpt(NestedDevice)
def test_clean_name():
device = LayeredDevice(name='test')
assert clean_name(device.radial, strip_parent=False) == 'test radial'
assert clean_name(device.radial, strip_parent=True) == 'radial'
assert clean_name(device.radial.phi,
strip_parent=False) == 'test radial phi'
assert clean_name(device.radial.phi, strip_parent=True) == 'phi'
assert clean_name(device.radial.phi, strip_parent=device) == 'radial phi'
from textwrap import dedent
import caproto
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run
import ophyd
from ophyd import Component as Cpt
from ophyd import EpicsSignal
from pcdsdevices.variety import set_metadata
class Variants(ophyd.Device):
soft_delta = Cpt(ophyd.Signal, value=1)
tweakable_delta_source_by_name = Cpt(EpicsSignal, 'tweakable')
set_metadata(
tweakable_delta_source_by_name,
{'variety': 'scalar-tweakable',
'delta.signal': 'soft_delta',
'delta.source': 'signal',
'range.source': 'value',
'range.value': [-10, 10],
}
)
tweakable_delta_source_by_component = Cpt(EpicsSignal, 'tweakable')
set_metadata(
tweakable_delta_source_by_component,
from qtpy.QtCore import QRect
from qtpy.QtGui import QPaintEvent
from qtpy.QtWidgets import QWidget
import typhos
from ophyd import Component as Cpt
from ophyd import Device
from typhos.utils import (TyphosBase, clean_name, load_suite,
no_device_lazy_load, saved_template, use_stylesheet)
class NestedDevice(Device):
phi = Cpt(Device)
class LayeredDevice(Device):
radial = Cpt(NestedDevice)
def test_clean_name():
device = LayeredDevice(name='test')
assert clean_name(device.radial, strip_parent=False) == 'test radial'
assert clean_name(device.radial, strip_parent=True) == 'radial'
assert clean_name(device.radial.phi,
strip_parent=False) == 'test radial phi'
assert clean_name(device.radial.phi, strip_parent=True) == 'phi'
assert clean_name(device.radial.phi, strip_parent=device) == 'radial phi'
def test_stylesheet(qtbot):
widget = QWidget()
qtbot.addWidget(widget)
"""Example to create a Panel of Ophyd Signals from an object"""
import sys
import numpy as np
from qtpy.QtWidgets import QApplication
import typhos
from ophyd import Component as Cpt
from ophyd import Device, Signal
from typhos.utils import SignalRO
class Sample(Device):
"""Simulated Device"""
readback = Cpt(SignalRO, value=1)
setpoint = Cpt(Signal, value=2)
waveform = Cpt(SignalRO, value=np.random.randn(100, ))
image = Cpt(SignalRO, value=np.abs(np.random.randn(100, 100)) * 455)
# Create my device without a prefix
sample = Sample('', name='sample')
if __name__ == '__main__':
# Create my application
app = QApplication(sys.argv)
typhos.use_stylesheet()
# Create my panel
panel = typhos.TyphosSignalPanel.from_device(sample)
class TransformPlugin_V31(PluginBase_V31, TransformPlugin_V26, version=(3, 1), version_of=TransformPlugin):
...
class TransformPlugin_V33(PluginBase_V33, TransformPlugin_V31, version=(3, 3), version_of=TransformPlugin):
...
class TransformPlugin_V34(PluginBase_V34, TransformPlugin_V33, version=(3, 4), version_of=TransformPlugin):
...
# --- NDPva ---
class PvaPlugin(Device, version_type='ADCore'):
"Serves as a base class for other versions"
...
class PvaPlugin_V25(PluginBase_V22, PvaPlugin, version=(2, 5), version_of=PvaPlugin):
pv_name = Cpt(EpicsSignalRO, "PvName_RBV")
class PvaPlugin_V26(PluginBase_V26, PvaPlugin_V25, version=(2, 6), version_of=PvaPlugin):
...
class PvaPlugin_V31(PluginBase_V31, PvaPlugin_V26, version=(3, 1), version_of=PvaPlugin):
...
class FFTPlugin_V31(PluginBase_V31, FFTPlugin_V26, version=(3, 1), version_of=FFTPlugin):
...
class FFTPlugin_V33(PluginBase_V33, FFTPlugin_V31, version=(3, 3), version_of=FFTPlugin):
...
class FFTPlugin_V34(PluginBase_V34, FFTPlugin_V33, version=(3, 4), version_of=FFTPlugin):
...
# --- NDScatter ---
class ScatterPlugin(Device, version_type='ADCore'):
"Serves as a base class for other versions"
...
class ScatterPlugin_V31(PluginBase_V31, ScatterPlugin, version=(3, 1), version_of=ScatterPlugin):
scatter_method = Cpt(SignalWithRBV, "ScatterMethod", string=True, doc="0='Round robin'")
class ScatterPlugin_V32(ScatterPlugin_V31, version=(3, 2), version_of=ScatterPlugin):
...
class ScatterPlugin_V33(PluginBase_V33, ScatterPlugin_V32, version=(3, 3), version_of=ScatterPlugin):
...
def add_device(self, device):
"""Typhos hook for adding a new device."""
# TODO: note that this does not call super
# super().add_device(device)
self._devices.append(device)
logger.debug('%s signals from device: %s', self.__class__.__name__,
device.name)
for attr, component in utils._get_top_level_components(type(device)):
dotted_name = f'{device.name}.{attr}'
if issubclass(component.cls, ophyd.Device):
sub_device = getattr(device, attr)
self.add_sub_device(sub_device, name=dotted_name)
else:
self._maybe_add_signal(device, attr, attr, component)
return create_device_from_components(
name=clsname,
base_class=bases + (CommonGatherPlugin, ),
gather_1=Cpt(cpt_cls, '', index=1),
gather_2=Cpt(cpt_cls, '', index=2),
gather_3=Cpt(cpt_cls, '', index=3),
gather_4=Cpt(cpt_cls, '', index=4),
gather_5=Cpt(cpt_cls, '', index=5),
gather_6=Cpt(cpt_cls, '', index=6),
gather_7=Cpt(cpt_cls, '', index=7),
gather_8=Cpt(cpt_cls, '', index=8),
)
class CommonOverlayPlugin(Device):
...
class CommonAttributePlugin(Device):
...
class CommonROIStatPlugin(Device):
...
class CommonGatherPlugin(Device):
...
class PluginVersions_V191(PluginVersions, version=(1, 9, 1), version_of=PluginVersions):
class ScatterPlugin_V32(ScatterPlugin_V31, version=(3, 2), version_of=ScatterPlugin):
...
class ScatterPlugin_V33(PluginBase_V33, ScatterPlugin_V32, version=(3, 3), version_of=ScatterPlugin):
...
class ScatterPlugin_V34(PluginBase_V34, ScatterPlugin_V33, version=(3, 4), version_of=ScatterPlugin):
...
# --- NDPosPlugin ---
class PosPlugin(Device, version_type='ADCore'):
"Serves as a base class for other versions"
...
class PosPluginPlugin_V25(PluginBase_V22, PosPlugin, version=(2, 5), version_of=PosPlugin):
delete = Cpt(EpicsSignal, "Delete", string=True, doc="")
duplicate = Cpt(SignalWithRBV, "Duplicate")
expected_id = Cpt(EpicsSignalRO, "ExpectedID_RBV")
file_valid = Cpt(EpicsSignalRO, "FileValid_RBV", string=True, doc="0='No' 1='Yes'")
filename = Cpt(SignalWithRBV, "Filename")
id_difference = Cpt(SignalWithRBV, "IDDifference")
id_name = Cpt(SignalWithRBV, "IDName", string=True)
id_start = Cpt(SignalWithRBV, "IDStart")
index = Cpt(EpicsSignalRO, "Index_RBV")
missing = Cpt(SignalWithRBV, "Missing")
mode = Cpt(SignalWithRBV, "Mode", string=True, doc="0='Discard' 1='Keep'")
process = Cpt(EpicsSignal, 'Process',
doc="Parameter Process")
def call(self, low: int = 100, high: int = 1000):
'A configurable random number'
self.low.put(low, wait=True)
self.high.put(high, wait=True)
self.process.put(1, wait=True)
status = self.status.get(use_monitor=False)
retval = self.retval.get(use_monitor=False)
if status != 'Success':
raise RuntimeError(f'RPC function failed: {status}')
return retval
class GroupDevice(ophyd.Device):
get_random = Cpt(get_randomDevice, 'get_random:',
doc='A configurable random number')
exit = Cpt(EpicsSignal, 'exit', doc='Poke me to exit')
random1 = Cpt(EpicsSignal, 'random1',
doc='Random integer between 1 and 100')
random2 = Cpt(EpicsSignal, 'random2',
doc='A nice random integer between 1000 and 2000')
# -------end autogenerated Devices---
if __name__ == '__main__':
ioc_options, run_options = ioc_arg_parser(
default_prefix='integration:',
desc='Run an IOC.')
ioc = Group(**ioc_options)