Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_disk(self, disk_id):
url_params = azure_helpers.parse_url(VOLUME_RESOURCE_ID,
disk_id)
disk_name = url_params.get(VOLUME_NAME)
self.compute_client.disks.delete(self.resource_group, disk_name).wait()
def deallocate_vm(self, vm_id):
url_params = azure_helpers.parse_url(VM_RESOURCE_ID,
vm_id)
vm_name = url_params.get(VM_NAME)
self.compute_client. \
virtual_machines.deallocate(self.resource_group,
vm_name).wait()
def delete_nic(self, nic_id):
nic_params = azure_helpers.\
parse_url(NETWORK_INTERFACE_RESOURCE_ID, nic_id)
nic_name = nic_params.get(NETWORK_INTERFACE_NAME)
self.network_management_client. \
network_interfaces.delete(self.resource_group,
nic_name).wait()
def is_gallery_image(self, image_id):
url_params = azure_helpers.parse_url(IMAGE_RESOURCE_ID,
image_id)
# If it is a gallery image, it will always have an offer
return 'offer' in url_params
def update_image_tags(self, image_id, tags):
url_params = azure_helpers.parse_url(IMAGE_RESOURCE_ID,
image_id)
if self.is_gallery_image(image_id):
return True
else:
name = url_params.get(IMAGE_NAME)
return self.compute_client.images. \
create_or_update(self.resource_group, name,
{
'tags': tags,
'location': self.region_name
}).result()
def update_snapshot_tags(self, snapshot_id, tags):
url_params = azure_helpers.parse_url(SNAPSHOT_RESOURCE_ID,
snapshot_id)
snapshot_name = url_params.get(SNAPSHOT_NAME)
return self.compute_client.snapshots.update(
self.resource_group,
snapshot_name,
{'tags': tags},
raw=True
)
def get_floating_ip(self, public_ip_id):
url_params = azure_helpers.parse_url(PUBLIC_IP_RESOURCE_ID,
public_ip_id)
public_ip_name = url_params.get(PUBLIC_IP_NAME)
return self.network_management_client. \
public_ip_addresses.get(self.resource_group, public_ip_name)
def get_network(self, network_id):
url_params = azure_helpers.parse_url(NETWORK_RESOURCE_ID,
network_id)
network_name = url_params.get(NETWORK_NAME)
return self.network_management_client.virtual_networks.get(
self.resource_group, network_name)
def create_vm_firewall_rule(self, fw_id,
rule_name, parameters):
url_params = azure_helpers.parse_url(VM_FIREWALL_RESOURCE_ID,
fw_id)
vm_firewall_name = url_params.get(VM_FIREWALL_NAME)
return self.network_management_client.security_rules. \
create_or_update(self.resource_group, vm_firewall_name,
rule_name, parameters).result()
def update_disk_tags(self, disk_id, tags):
url_params = azure_helpers.parse_url(VOLUME_RESOURCE_ID,
disk_id)
disk_name = url_params.get(VOLUME_NAME)
return self.compute_client.disks.update(
self.resource_group,
disk_name,
{'tags': tags},
raw=True
)