Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self, resource):
""" Test that the argument this asserts for is present in the
resource. """
try:
resource.inner[self.name].resolve()
return True
except errors.NoMatching:
return False
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from yay import ast
class GraphExternalAction(ast.PythonClass):
def test(self):
pass
def destroy(self):
pass
def _resolve(self):
# FIXME: There is a nicer way to do this without resolve, but more yay
# refactoring required
root = self.root
if self not in root.actors:
root.actors.append(self)
return super(GraphExternalAction, self)._resolve()
elif self.type == 'boolean':
if isinstance(value, type(True)):
# might already be boolean
return value
if value.lower() in ('no', '0', 'off', 'false'):
return False
elif value.lower() in ('yes', '1', 'on', 'true'):
return True
raise ArgParseError(
"Cannot parse boolean from %r for argument %r" % (value, self.name))
else:
raise ArgParseError(
"Don't understand %r as a type for argument %r" % (self.type, self.name))
class YaybuArgv(ast.PythonClass):
def __init__(self, **args):
super(YaybuArgv, self).__init__(ast.PythonDict({}))
self.schema = {}
self.args = args
self.anchor = None
@classmethod
def from_argv(cls, argv):
arguments = {}
for arg in argv:
name, value = arg.split("=", 1)
if name in arguments:
raise ArgParseError(
"Duplicate argument %r specified" % (name,))
arguments[name] = value
self.lastcmd = ''
if cmd == '':
return self.default(line)
else:
try:
func = getattr(self, 'do_' + cmd)
except AttributeError:
return self.default(line)
parser = self.parser()
optparse_func = getattr(self, 'opts_' + cmd, lambda x: x)
optparse_func(parser)
opts, args = parser.parse_args(arg.split())
try:
return func(opts, args)
except yay.errors.Error as e:
if getattr(self, "debug", False):
import pdb
pdb.post_mortem()
print e.get_string()
return getattr(e, "returncode", 128)
except KeyboardInterrupt:
print "^C"
if isinstance(self.command, list):
command = []
for c in self.command:
if isinstance(c, AST):
command.append(c.as_string())
else:
command.append(c)
logas = []
for c in self.command:
if isinstance(c, AST):
logas.append(c.as_safe_string())
else:
logas.append(c)
elif isinstance(self.command, basestring):
logas = command = shlex.split(self.command.encode("UTF-8"))
elif isinstance(self.command, AST):
command = shlex.split(self.command.as_string().encode("UTF-8"))
logas = shlex.split(self.command.as_safe_string().encode("UTF-8"))
command = self._tounicode(command)
logas = self._tounicode(logas)
renderer.command(logas)
env = {
"PATH":
"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
}
if self.env:
for key, item in self.env.iteritems():
env[key] = item
def apply(self, ctx, renderer):
transport = ctx.transport
if isinstance(self.command, list):
command = []
for c in self.command:
if isinstance(c, AST):
command.append(c.as_string())
else:
command.append(c)
logas = []
for c in self.command:
if isinstance(c, AST):
logas.append(c.as_safe_string())
else:
logas.append(c)
elif isinstance(self.command, basestring):
logas = command = shlex.split(self.command.encode("UTF-8"))
elif isinstance(self.command, AST):
command = shlex.split(self.command.as_string().encode("UTF-8"))
logas = shlex.split(self.command.as_safe_string().encode("UTF-8"))
command = self._tounicode(command)
def _get_source_container(self):
try:
return self._get_source_container_from_string()
except errors.TypeError:
driver = get_driver_from_expression(
self.params.source, get_driver, Provider, self.extra_drivers, self.root, ignore=("container", ))
container = driver.get_container(
self.params.source.container.as_string())
return container
def clean_allowed(self):
from boto.ec2 import connect_to_region
c = connect_to_region('eu-west-1')
allowed = []
try:
groups = self.params.allowed.get_iterable()
except errors.NoMatching:
groups = []
for group in groups:
try:
name = group.as_string()
except errors.TypeError:
name = group.get_key("name").as_string()
try:
groups = c.get_all_security_groups(groupnames=[name])
g = groups[0]
except EC2ResponseError:
# FIXME: Don't raise if simulating
raise TypeError("No such EC2 SecurityGroup '%s'" % name)
allowed.append((g.name, g.owner_id))
def _get_zone_by_domain(self, domain):
zones = [z for z in self.driver.list_zones() if z.domain == domain]
if len(zones) > 1:
raise errors.Error(
"Found multiple zones that match domain name '%s'" % domain)
elif len(zones) == 1:
return zones[0]
raise error.ParseError(
"'%s' cannot be defined multiple times" % resource.id, anchor=instance.anchor)
self[resource.id] = resource
# Create implicit File[] nodes for any watched files
try:
for watched in resource.watch:
res = bind({
"name": watched,
"policy": "watched",
})
res.parent = instance
w = self.add("File", res)
w._original_hash = None
except errors.NoMatching:
pass
return resource