Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list_all_resources_patch(mocker):
return mocker.patch.object(oci_utils, "list_all_resources")
def delete_volume_backup(block_storage_client, module):
result = oci_utils.delete_and_wait(
resource_type="volume_backup",
client=block_storage_client,
get_fn=block_storage_client.get_volume_backup,
kwargs_get={"volume_backup_id": module.params["id"]},
delete_fn=block_storage_client.delete_volume_backup,
kwargs_delete={"volume_backup_id": module.params["id"]},
module=module,
)
return result
),
volume_group_id=dict(type="str", required=False),
)
)
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=False,
mutually_exclusive=[["compartment_id", "volume_id"]],
required_one_of=[["compartment_id", "volume_id"]],
)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module.")
block_storage_client = oci_utils.create_service_client(module, BlockstorageClient)
volume_id = module.params["volume_id"]
try:
if volume_id is not None:
result = [
to_dict(
oci_utils.call_with_backoff(
block_storage_client.get_volume, volume_id=volume_id
).data
)
]
else:
compartment_id = module.params["compartment_id"]
availability_domain = module.params["availability_domain"]
def main():
module_args = oci_utils.get_taggable_arg_spec()
module_args.update(
dict(
hostname_label=dict(type="str", required=False),
ip_address=dict(type="str", required=False),
display_name=dict(type="str", required=False, aliases=["name"]),
vnic_id=dict(type="str", required=False),
state=dict(
type="str",
required=False,
default="present",
choices=["absent", "present"],
),
private_ip_id=dict(type="str", required=False, aliases=["id"]),
)
)
default="present",
choices=["present", "absent"],
),
)
)
module = AnsibleModule(
argument_spec=module_args,
required_if=[["state", "present", ["path_routes"]]],
mutually_exclusive=[["purge_path_routes", "delete_path_routes"]],
)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module")
lb_client = oci_utils.create_service_client(module, LoadBalancerClient)
state = module.params["state"]
if state == "present":
result = create_or_update_path_route_set(lb_client, module)
elif state == "absent":
result = delete_path_route_set(lb_client, module)
module.exit_json(**result)
peer_id=dict(type="str", required=False),
drg_id=dict(type="str", required=False),
)
)
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=False,
required_if=[("state", "absent", ["remote_peering_connection_id"])],
required_together=[["peer_id", "peer_region_name"]],
)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module.")
virtual_network_client = oci_utils.create_service_client(
module, VirtualNetworkClient
)
exclude_attributes = {"display_name": True}
state = module.params["state"]
if state == "absent":
result = delete_remote_peering_connection(virtual_network_client, module)
else:
remote_peering_connection_id = module.params["remote_peering_connection_id"]
if remote_peering_connection_id is not None:
result = update_remote_peering_connection(virtual_network_client, module)
# A RPC can be connected to another RPC. Perform this operation when peer_id is specified along with
# remote_peering_connection_id.
if module.params["peer_id"] is not None:
def main():
logger = oci_utils.get_logger("oci_load_balancer_path_route_set_facts")
set_logger(logger)
module_args = oci_utils.get_common_arg_spec()
module_args.update(
dict(
path_route_set_name=dict(type="str", required=False, aliases=["name"]),
load_balancer_id=dict(type="str", required=True, aliases=["id"]),
)
)
module = AnsibleModule(argument_spec=module_args)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module")
lb_client = oci_utils.create_service_client(module, LoadBalancerClient)
result = list_load_balancer_path_route_sets(lb_client, module)
module.exit_json(**result)
def get_export_options(export_options):
if export_options is None:
return None
HashedClientOptions = oci_utils.generate_subclass(ClientOptions)
result_client_options = []
for export_option_entry in export_options:
client_options = HashedClientOptions()
if export_option_entry.get("source") is None:
raise ClientError(
"Export Options attribute source must contain a valid value"
)
client_options.source = export_option_entry.get("source")
client_options.require_privileged_source_port = export_option_entry.get(
"require_privileged_source_port", True
)
client_options.access = export_option_entry.get("access", "READ_ONLY")
client_options.identity_squash = export_option_entry.get(
"identity_squash", "ROOT"
)
client_options.anonymous_uid = export_option_entry.get("anonymous_uid", 65534)
def main():
module_args = oci_utils.get_taggable_arg_spec(supports_wait=True)
module_args.update(
dict(
compartment_id=dict(type="str", required=False, aliases=["id"]),
name=dict(type="str", required=False),
description=dict(type="str", required=False),
state=dict(
type="str",
required=False,
default="present",
choices=["present", "absent"],
),
parent_compartment_id=dict(type="str", required=False),
)
)
module = AnsibleModule(
db_backup_config=dict(type=dict, required=False),
state=dict(
type="str",
required=False,
default="update",
choices=["restore", "update"],
),
)
)
module = AnsibleModule(argument_spec=module_args)
if not HAS_OCI_PY_SDK:
module.fail_json(msg="oci python sdk required for this module")
db_client = oci_utils.create_service_client(module, DatabaseClient)
state = module.params["state"]
if state == "restore":
result = restore_database(db_client, module)
elif state == "update":
result = update_database(db_client, module)
module.exit_json(**result)