Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_azure_credentials(all_credentials):
"""Return the subscription_id and credentials for Azure.
Takes a dict where key is the cloud name, expected to be formatted like
cloud-city's credentials.
"""
azure_dict = all_credentials['azure']['credentials']
subscription_id = azure_dict['subscription-id']
return subscription_id, ServicePrincipalCredentials(
client_id=azure_dict['application-id'],
secret=azure_dict['application-password'],
tenant=azure_dict['tenant-id'],
subscription_id=azure_dict['subscription-id'],
)
def __init__(self, config):
self._config = config
self.subscription_id = str(config.get('azure_subscription_id'))
self._credentials = ServicePrincipalCredentials(
client_id=config.get('azure_client_id'),
secret=config.get('azure_secret'),
tenant=config.get('azure_tenant')
)
self._access_token = config.get('azure_access_token')
self._resource_client = None
self._storage_client = None
self._network_management_client = None
self._subscription_client = None
self._compute_client = None
self._access_key_result = None
self._block_blob_service = None
self._table_service = None
self._storage_account = None
graphrbac_credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id,
resource='https://graph.windows.net'
)
elif file_auth:
data = json.loads(file_auth.read())
subscription_id = data.get('subscriptionId')
tenant_id = data.get('tenantId')
client_id = data.get('clientId')
client_secret = data.get('clientSecret')
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
graphrbac_credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id,
resource='https://graph.windows.net'
)
elif msi:
credentials = MSIAuthentication()
graphrbac_credentials = MSIAuthentication(resource='https://graph.windows.net')
def auth_callback(server, resource, scope):
if self.credentials['client_id'] is None or self.credentials['secret'] is None:
self.fail('Please specify client_id, secret and tenant to access azure Key Vault.')
tenant = self.credentials.get('tenant')
if not self.credentials['tenant']:
tenant = "common"
authcredential = ServicePrincipalCredentials(
client_id=self.credentials['client_id'],
secret=self.credentials['secret'],
tenant=tenant,
cloud_environment=self._cloud_environment,
resource="https://vault.azure.net")
token = authcredential.token
return token['token_type'], token['access_token']
base_url = secrets.shared_key.batch_service_url
credentials = batch_auth.SharedKeyCredentials(secrets.shared_key.batch_account_name,
secrets.shared_key.batch_account_key)
else:
# Set up ServicePrincipalCredentials
arm_credentials = ServicePrincipalCredentials(
client_id=secrets.service_principal.client_id,
secret=secrets.service_principal.credential,
tenant=secrets.service_principal.tenant_id,
resource="https://management.core.windows.net/",
)
m = RESOURCE_ID_PATTERN.match(secrets.service_principal.batch_account_resource_id)
arm_batch_client = BatchManagementClient(arm_credentials, m.group("subscription"))
account = arm_batch_client.batch_account.get(m.group("resourcegroup"), m.group("account"))
base_url = "https://{0}/".format(account.account_endpoint)
credentials = ServicePrincipalCredentials(
client_id=secrets.service_principal.client_id,
secret=secrets.service_principal.credential,
tenant=secrets.service_principal.tenant_id,
resource="https://batch.core.windows.net/",
)
# Set up Batch Client
batch_client = batch.BatchServiceClient(credentials, base_url=base_url)
# Set retry policy
batch_client.config.retry_policy.retries = 5
batch_client.config.add_user_agent("aztk/{}".format(__version__))
return batch_client
"""
import settings
from common.methods import is_version_newer
set_progress("Connecting To Azure...")
cb_version = settings.VERSION_INFO["VERSION"]
if is_version_newer(cb_version, "9.2"):
from resourcehandlers.azure_arm.azure_wrapper import configure_arm_client
wrapper = handler.get_api_wrapper()
web_client = configure_arm_client(wrapper, WebSiteManagementClient)
resource_client = wrapper.resource_client
else:
# TODO: Remove once versions <= 9.2 are no longer supported.
credentials = ServicePrincipalCredentials(
client_id=handler.client_id,
secret=handler.secret,
tenant=handler.tenant_id,
)
web_client = WebSiteManagementClient(credentials, handler.serviceaccount)
resource_client = ResourceManagementClient(credentials, handler.serviceaccount)
set_progress("Connection to Azure established")
return web_client, resource_client
"""
Get the clients using newer methods from the CloudBolt main repo if this CB is running
a version greater than 9.2.2. These internal methods implicitly take care of much of the other
features in CloudBolt such as proxy and ssl verification.
Otherwise, manually instantiate clients without support for those other CloudBolt settings.
"""
import settings
from common.methods import is_version_newer
cb_version = settings.VERSION_INFO["VERSION"]
if is_version_newer(cb_version, "9.2.2"):
wrapper = handler.get_api_wrapper()
storage_client = wrapper.storage_client
else:
# TODO: Remove once versions <= 9.2.2 are no longer supported.
credentials = ServicePrincipalCredentials(
client_id=handler.client_id, secret=handler.secret, tenant=handler.tenant_id
)
storage_client = storage.StorageManagementClient(
credentials, handler.serviceaccount
)
set_progress("Connection to Azure established")
return storage_client
def create_service_credentials(self, request_id, dnary):
self.write_message(
request_id,
"Creating credentials {}".format(double_line_break)
)
credentials = ServicePrincipalCredentials(
client_id = dnary["client_id"],
secret = dnary["secret"],
tenant = dnary["tenant_id"],
)
return credentials
def auth_callback(server, resource, scope):
if self.credentials['client_id'] is None or self.credentials['secret'] is None:
self.fail('Please specify client_id, secret and tenant to access azure Key Vault.')
tenant = self.credentials.get('tenant')
if not self.credentials['tenant']:
tenant = "common"
authcredential = ServicePrincipalCredentials(
client_id=self.credentials['client_id'],
secret=self.credentials['secret'],
tenant=tenant,
cloud_environment=self._cloud_environment,
resource="https://vault.azure.net")
token = authcredential.token
return token['token_type'], token['access_token']
def get_credentials(self, auth_method, **kwargs):
if auth_method == 'Service Principal':
credentials = ServicePrincipalCredentials(client_id=kwargs['client_id'], secret=kwargs['secret'], tenant=kwargs['tenant_id'])
elif auth_method == 'User ID Password':
credentials = UserPassCredentials(username=kwargs['user_id'], password=kwargs['password'])
else:
credentials = None
return credentials