Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_add_meta_with_defaults(self, registry, mock_meta):
from pyrsistent import freeze
standard_meta = freeze(mock_meta.return_value)\
.transform(['states', 'draft', 'acm', 'principals'], ['role:moderator'])
registry.content.workflows_meta['standard'] = standard_meta
mock_meta.return_value = {'defaults': 'standard'}
self.call_fut(registry, 'package:dummy.yaml', 'dummy')
meta = registry.content.workflows_meta['dummy']
assert meta['states'] == standard_meta['states']
assert meta['transitions'] == standard_meta['transitions']
assert meta['initial_state'] == standard_meta['initial_state']
assert meta['defaults'] == 'standard'
def test_new_elements_created_when_missing():
m = freeze({})
assert m.transform(['foo', 'bar', 'baz'], 7) == {'foo': {'bar': {'baz': 7}}}
def test_multiple_transformations():
v = freeze([1, 2])
assert v.transform([2, 'foo'], 3, [2, 'foo'], inc) == freeze([1, 2, {'foo': 4}])
def test_add_meta_with_defaults_state_add_permission(self, registry,
mock_meta):
from pyrsistent import freeze
standard_meta = freeze(mock_meta.return_value)\
.transform(['states', 'draft', 'acm', 'principals'], ['role:moderator'])
registry.content.workflows_meta['standard'] = standard_meta
mock_meta.return_value =\
{'defaults': 'standard',
'states': {'draft': {'acm': {'permissions':[['new', 'Added']]}}
}}
self.call_fut(registry, 'package:dummy.yaml', 'dummy')
meta = registry.content.workflows_meta['dummy']
assert meta['states']['draft']['acm']['permissions'] ==\
[['view', 'Deny'],
['new', 'Added']]
def _unserialize_packer_dict(serialized_packer_dict):
"""
Parse a packer serialized dictionary.
:param unicode serialized_packer_dict: The serialized form.
:return: A ``dict`` of the keys and values found.
"""
packer_dict = {}
for item in serialized_packer_dict.split("%!(PACKER_COMMA)"):
key, value = item.split(":")
packer_dict[key] = value
return freeze(packer_dict)
def get_desired_server_group_state(group_id, launch_config, desired):
"""
Create a :obj:`DesiredServerGroupState` from a group details.
:param str group_id: The group ID
:param dict launch_config: Group's launch config as per
:obj:`otter.json_schema.group_schemas.launch_config`
:param int desired: Group's desired capacity
"""
lbs = freeze(launch_config['args'].get('loadBalancers', []))
lbs = json_to_LBConfigs(lbs)
server_lc = prepare_server_launch_config(
group_id,
freeze({'server': launch_config['args']['server']}),
lbs)
draining = float(launch_config["args"].get("draining_timeout", 0.0))
desired_state = DesiredServerGroupState(
server_config=server_lc,
capacity=desired, desired_lbs=lbs,
draining_timeout=draining)
return desired_state
def prepare_launch_config(scaling_group_uuid, launch_config):
"""
Prepare a launch_config for the specified scaling_group.
This is responsible for returning a copy of the launch config that
has metadata and unique server names added.
:param IScalingGroup scaling_group: The scaling group this server is
getting launched for.
:param dict launch_config: The complete launch_config args we want to build
servers from.
:return dict: The prepared launch config.
"""
launch_config = freeze(launch_config)
lb_descriptions = json_to_LBConfigs(launch_config.get('loadBalancers', []))
launch_config = prepare_server_launch_config(
scaling_group_uuid, launch_config, lb_descriptions)
suffix = generate_server_name()
launch_config = set_server_name(launch_config, suffix)
return thaw(launch_config)
metadata={"description": "Expected string output from the program."}
)
@marshmallow.post_load()
def make(self, validated, partial, many): # pylint: disable=unused-argument
return CommandTest(**validated)
@attr.dataclass(frozen=True)
class CommandSpec:
name: str
short_help: t.Optional[str]
help: t.Optional[str]
arguments: t.List[click.Argument] = attr.ib(converter=pyrsistent.freeze)
options: t.List[click.Option] = attr.ib(converter=pyrsistent.freeze)
stages: t.List[CommandStage] = attr.ib(converter=pyrsistent.freeze)
inject_values: t.List[str] = attr.ib(converter=pyrsistent.freeze)
tests: t.List[CommandTest] = attr.ib(converter=pyrsistent.freeze)
section: str
hidden: bool
class CommandSpecSchema(marshmallow.Schema):
"""A new command."""
name = fields.String(metadata={"description": "Name of the new command."})
help = fields.String(
default=None,
missing=None,
metadata={
"description": "Long-form documentation of the command. Will be interpreted as ReStructuredText markup."
},
def _parse_line_ARTIFACT(self, parts):
"""
Parse line parts containing information about an artifact.
:param list parts: The parts of resulting from splitting a comma
separated packer output line.
"""
artifact_type = parts[1]
if parts[4] == 'end':
self._current_artifact['type'] = artifact_type
self.artifacts.append(freeze(self._current_artifact))
self._current_artifact = {}
return
key = parts[4]
value = parts[5:]
if len(value) == 1:
value = value[0]
self._current_artifact[key] = value
from it. The returned definition will be used to construct the Python
type. By default, no transformation is performed.
:ivar dict _pclasses: A cache of the `PClass`\ es that have been
constructed for definitions from this specification already. This
allows multiple requests for the same definition to be satisfied with
the same type object (rather than a new type object which has all the
same attributes and behavior). This plays better with Python's type
system than the alternative.
"""
info = field(mandatory=True, initial=None, factory=freeze)
paths = field(mandatory=True, initial=None, factory=freeze)
definitions = field(mandatory=True, initial=None, factory=freeze)
securityDefinitions = field(mandatory=True, initial=None, factory=freeze)
security = field(mandatory=True, initial=None, factory=freeze)
swagger = field(mandatory=True, initial=None, factory=freeze)
# lambda lambda red pajamda
# https://github.com/tobgu/pyrsistent/issues/109
transform_definition = field(
mandatory=True,
initial=lambda: lambda name, definition: definition)
_behaviors = field(mandatory=True, type=dict)
_pclasses = field(mandatory=True, type=dict)
def __hash__(self):
return hash((
self.info,
self.paths,
self.definitions,
self.securityDefinitions,