Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_operation():
for vmname, image in images.iteritems():
print "\n{} - {}: {} \n".format(operation, vmname, image)
uid = rstr.xeger(r'[0-9a-fA-F]{8}')
vmname = vmname.lower() + '-' + uid
dnsname = vmname
if operation == 'Create VM and Add Extension':
create_vm(rGroup, vmname, image, username, password, location, dnsname, size, nsg)
remove_extension(extension, vmname, rGroup)
copy_to_vm(dnsname, username, password, location)
run_command(rGroup, vmname, 'RunShellScript', 'python /tmp/omsStatusCheck.py -preinstall')
add_extension(extension, publisher, vmname, rGroup, private_settings, public_settings)
run_command(rGroup, vmname, 'RunShellScript', 'python /tmp/omsStatusCheck.py -postinstall')
copy_from_vm(dnsname, username, password, location)
elif operation == 'Create VM':
create_vm(rGroup, vmname, image, username, password, location, dnsname, size, nsg)
elif operation == 'Add Extension':
add_extension(extension, publisher, vmname, rGroup, private_settings, public_settings)
run_command(rGroup, vmname, 'RunShellScript', 'python /tmp/omsStatusCheck.py -status')
copy_from_vm(dnsname, username, password, location)
def _get_string(self, low=None, high=None, pattern=None):
"""Return string from `pattern` or a universal pattern
[0-9a-zA-Z].
"""
low, high = self._render_range(low, high)
if pattern is None or is_match_all(pattern):
pattern = r'[0-9a-zA-Z]'
return rstr.xeger(pattern).rstrip("\\\"")
def create_vm_and_install_old_extension():
"""Create vm and install a specific version of the extension, returning HTML results."""
message = ""
update_option = '--version {0} --no-auto-upgrade'.format(old_version)
install_times.clear()
for distname, image in images.iteritems():
uid = rstr.xeger(r'[0-9a-f]{8}')
vmname = get_vm_name(distname, uid)
vmnames.append(vmname)
dnsname = vmname
vm_log_file = distname.lower() + "result.log"
vm_html_file = distname.lower() + "result.html"
log_open = open(vm_log_file, 'a+')
html_open = open(vm_html_file, 'a+')
print("\nCreate VM and Install Extension {0} v-{1} - {2}: {3} \n".format(extension, old_version, vmname, image))
create_vm(resource_group, vmname, image, username, ssh_public, location, dnsname, size, nsg_uri)
copy_to_vm(dnsname, username, ssh_private, location)
delete_extension(extension, vmname, resource_group)
run_az_command(resource_group, vmname, 'RunShellScript', 'python -u /tmp/oms_extension_run_script.py -preinstall')
add_extension(extension, publisher, vmname, resource_group, private_settings, public_settings, update_option)
run_az_command(resource_group, vmname, 'RunShellScript', 'python -u /home/scratch/oms_extension_run_script.py -postinstall')
install_times.update({vmname: datetime.now()})
run_az_command(resource_group, vmname, 'RunShellScript', 'python -u /home/scratch/oms_extension_run_script.py -injectlogs')
Creates stub/plus status url based on context
:param ctx: {} of current parsing context
:param server_preferred: bool - use server_name instead of listen
:return: [] of urls
"""
location = ctx.get('location', '/')
# remove all modifiers
location_parts = location.split(' ')
final_location_part = location_parts[-1]
# generate a random sting that will fit regex location
if location.startswith('~'):
try:
exact_location = rstr.xeger(final_location_part)
# check that regex location has / and add it
if not exact_location.startswith('/'):
exact_location = '/%s' % exact_location
except:
context.log.debug('bad regex location: %s' % final_location_part)
exact_location = None
else:
exact_location = final_location_part
# if an exact location doesn't have / that's not a working location, we should not use it
if not exact_location.startswith('/'):
context.log.debug('bad exact location: %s' % final_location_part)
exact_location = None
if exact_location:
def gen_regex(regex):
return rstr.xeger(regex)
example = []
samples = random.randint(3, 5)
if 'type' not in rule['schema']:
specification = {'type': 'string'}
if 'regex' in rule['schema']:
if rule['schema']['regex'] == '\w+\.\w+\_\w+':
# special on for permissions
p = Permission.objects.all()
for _ in range(samples):
c = random.randint(0, p.count())
example.append("{1}.{0}".format(p[c].codename, p[c].content_type.app_label))
else:
specification['pattern'] = rule['schema']['regex']
for _ in range(samples):
example.append(rstr.xeger(rule['schema']['regex']))
elif rule['schema']['type'] in ['email', 'date', 'uuid']:
specification = {'type': 'string', 'format': rule['schema']['type']}
if rule['schema']['type'] == 'email':
for _ in range(samples):
example.append(fake.email())
if rule['schema']['type'] == 'uuid':
for _ in range(samples):
example.append(str(uuid.uuid4()))
else:
specification = {'type': rule['schema']['type']}
converted = {'type': 'array', 'items': specification}
def generate_value(self, avoided_value):
if not self.REGEX:
return ["Example String Value"]
return [rstr.xeger(self.REGEX)]
def random_value(self, override=None):
if self.validate_value(override):
return override
return rstr.xeger(self.pattern)
from contextlib import contextmanager
import errno
import os
import re
from random import SystemRandom
import tempfile
from rstr import Rstr
from ._compat import which
rstr = Rstr(SystemRandom())
import_module = __import__
def genpass(pattern=r'[\w]{32}'):
"""generates a password with random chararcters
"""
try:
return rstr.xeger(pattern)
except re.error as e:
raise ValueError(str(e))
@contextmanager
def mkdir_open(path, mode="r"):