Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
if opts["action"] not in ACTION_SUPPORTING_STREAMING:
die("Source YAML is multi-document, "
"which doesn't support any other action than %s"
% ", ".join(ACTION_SUPPORTING_STREAMING))
if opts["dump"] is yaml_dump:
print("---\n", end="")
else:
print("\0", end="")
if opts.get("loader") is LineLoader:
sys.stdout.flush()
print(output, end="")
if opts.get("loader") is LineLoader:
sys.stdout.flush()
except (InvalidPath, ActionTypeError) as e:
if quiet:
exit(1)
else:
die(str(e))
except InvalidAction as e:
die("'%s' is not a valid action.\n%s"
% (e.args[0], USAGE))
pass
_E.__name__ = str(node.tag)
_E._node = node
return _E(data)
def represent_encapsulated_node(s, o):
value = s.represent_data(o.__class__.__bases__[0](o))
value.tag = o.__class__.__name__
return value
ShyamlSafeDumper.add_multi_representer(EncapsulatedNode,
represent_encapsulated_node)
ShyamlSafeLoader.add_constructor(None, mk_encapsulated_node)
##
## Key specifier
##
def tokenize(s):
r"""Returns an iterable through all subparts of string splitted by '.'
So:
>>> list(tokenize('foo.bar.wiz'))
['foo', 'bar', 'wiz']
Contrary to traditional ``.split()`` method, this function has to
deal with any type of data in the string. So it actually
if opts["dump"] is yaml_dump:
print("---\n", end="")
else:
print("\0", end="")
if opts.get("loader") is LineLoader:
sys.stdout.flush()
print(output, end="")
if opts.get("loader") is LineLoader:
sys.stdout.flush()
except (InvalidPath, ActionTypeError) as e:
if quiet:
exit(1)
else:
die(str(e))
except InvalidAction as e:
die("'%s' is not a valid action.\n%s"
% (e.args[0], USAGE))
if arg in args:
args.remove(arg)
opts["dump"] = yaml_dump
opts["quiet"] = False
for arg in ["-q", "--quiet"]:
if arg in args:
args.remove(arg)
opts["quiet"] = True
for arg in ["-L", "--line-buffer"]:
if arg not in args:
continue
args.remove(arg)
opts["loader"] = LineLoader
if len(args) == 0:
stderr("Error: Bad number of arguments.\n")
die(USAGE, errlvl=1, prefix="")
if len(args) == 1 and args[0] in ("-h", "--help"):
stdout(HELP)
exit(0)
if len(args) == 1 and args[0] in ("-V", "--version"):
version_info = get_version_info()
print("version: %s\nPyYAML: %s\nlibyaml available: %s\nlibyaml used: %s\nPython: %s"
% version_info)
exit(0)
opts["action"] = args[0]
def build_pluginlist( plugin_type='all' ):
"""
Return a list of dicts with a dict for each plugin of the requested type
The dict contains the plugin name, type and description
"""
result = []
plugin_type = plugin_type.lower()
for metaplugin in plugins_git:
metafile = metaplugin + '/plugin.yaml'
plg_dict = {}
if metaplugin in plugins_git: #pluginsyaml_git
if os.path.isfile(metafile):
plugin_yaml = shyaml.yaml_load(metafile, ordered=True)
else:
plugin_yaml = ''
if plugin_yaml != '':
section_dict = plugin_yaml.get('plugin')
if section_dict != None:
if section_dict.get('type') != None:
if section_dict.get('type').lower() in plugin_types:
plgtype = section_dict.get('type').lower()
plg_dict['name'] = metaplugin.lower()
plg_dict['type'] = plgtype
plg_dict['desc'] = get_description(section_dict, 85, language)
plg_dict['maint'] = get_maintainer(section_dict, 15)
plg_dict['test'] = get_tester(section_dict, 15)
plg_dict['doc'] = html_escape(section_dict.get('documentation', ''))
plg_dict['sup'] = html_escape(section_dict.get('support', ''))
else:
def build_pluginlist( plugin_type='all' ):
"""
Return a list of dicts with a dict for each plugin of the requested type
The dict contains the plugin name, type and description
"""
result = []
plugin_type = plugin_type.lower()
for metaplugin in plugins_git:
metafile = metaplugin + '/plugin.yaml'
plg_dict = {}
if metaplugin in plugins_git: #pluginsyaml_git
if os.path.isfile(metafile):
plugin_yaml = shyaml.yaml_load(metafile)
else:
plugin_yaml = ''
if plugin_yaml != '':
section_dict = plugin_yaml.get('plugin')
if section_dict != None:
if section_dict.get('type') != None:
if section_dict.get('type').lower() in plugin_types:
plgtype = section_dict.get('type').lower()
plg_dict['name'] = metaplugin.lower()
plg_dict['type'] = plgtype
plg_dict['desc'] = get_description(section_dict, 85, language)
plg_dict['maint'] = get_maintainer(section_dict, 15)
plg_dict['test'] = get_tester(section_dict, 15)
plg_dict['doc'] = html_escape(section_dict.get('documentation', ''))
plg_dict['sup'] = html_escape(section_dict.get('support', ''))
else:
def write_configfile(plg, configfile_dir, language='de'):
"""
Create a .rst file with configuration information for the passed plugin
"""
plgname = plg['name']
# ---------------------------------
# read metadata for plugin
# ---------------------------------
metafile = plgname + '/plugin.yaml'
if os.path.isfile(metafile):
meta_yaml = shyaml.yaml_load(metafile, ordered=True)
plugin_yaml = meta_yaml.get('plugin', {})
parameter_yaml = meta_yaml.get('parameters', {})
iattributes_yaml = meta_yaml.get('item_attributes', {})
lparameter_yaml = meta_yaml.get('logic_parameter', {})
functions_yaml = meta_yaml.get('plugin_functions', {})
no_parameters = (parameter_yaml == 'NONE')
if no_parameters or parameter_yaml is None:
parameter_yaml = {}
no_attributes = (iattributes_yaml == 'NONE')
if no_attributes or iattributes_yaml is None:
iattributes_yaml = {}
no_lparameters = (lparameter_yaml == 'NONE')
if no_lparameters or lparameter_yaml is None:
def readMetadata(metaplugin, plugins_local):
metafile = metaplugin + '/plugin.yaml'
plg_dict = {}
plugin_yaml = {}
if metaplugin in plugins_local: # pluginsyaml_local
if os.path.isfile(metafile):
plugin_yaml = shyaml.yaml_load(metafile, ordered=True)
else:
print("There is no plugin named '" + metaplugin + "'")
print()
return None
return plugin_yaml
## Note: ``\n`` will be transformed by ``universal_newlines`` mecanism for
## any platform
termination = "\0" if action.endswith("-0") else "\n"
if action == "get-value":
return str(dump(value))
elif action in ("get-values", "get-values-0"):
if isinstance(value, dict):
return "".join("".join((dump(k), termination,
dump(v), termination))
for k, v in value.items())
elif isinstance(value, list):
return "".join("".join((dump(l), termination))
for l in value)
else:
raise ActionTypeError(
action, provided=tvalue, expected=["sequence", "struct"])
elif action == "get-type":
return tvalue
elif action == "get-length":
if isinstance(value, (dict, list)):
return len(value)
else:
raise ActionTypeError(
action, provided=tvalue, expected=["sequence", "struct"])
elif action in ("keys", "keys-0",
"values", "values-0",
"key-values", "key-values-0"):
if isinstance(value, dict):
method = value.keys if action.startswith("keys") else \
value.items if action.startswith("key-values") else \
value.values
return "".join("".join((dump(k), termination,
dump(v), termination))
for k, v in value.items())
elif isinstance(value, list):
return "".join("".join((dump(l), termination))
for l in value)
else:
raise ActionTypeError(
action, provided=tvalue, expected=["sequence", "struct"])
elif action == "get-type":
return tvalue
elif action == "get-length":
if isinstance(value, (dict, list)):
return len(value)
else:
raise ActionTypeError(
action, provided=tvalue, expected=["sequence", "struct"])
elif action in ("keys", "keys-0",
"values", "values-0",
"key-values", "key-values-0"):
if isinstance(value, dict):
method = value.keys if action.startswith("keys") else \
value.items if action.startswith("key-values") else \
value.values
output = (lambda x: termination.join(str(dump(e)) for e in x)) \
if action.startswith("key-values") else \
dump
return "".join("".join((str(output(k)), termination)) for k in method())
else:
raise ActionTypeError(
action=action, provided=tvalue, expected=["struct"])
else: