Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Connection string fields:
tenant_id
client_id
clien_secret
apiRoot
apiVersion
"""
if connection_str:
self.current_connection = connection_str
cs_dict = self._parse_connection_str(connection_str)
elif kwargs:
cs_dict = kwargs
# Allow user to specify location of connection variables in config file.
if "app_name" in cs_dict:
app_config = config.settings.get(cs_dict["app_name"])
if not app_config:
raise MsticpyException(
f"No configuration settings found for {cs_dict['app_name']}."
)
cs_dict = app_config["Args"]
else:
raise MsticpyException("No connection details provided.")
# self.oauth_url and self.req_body are correctly set in concrete
# instances __init__
req_url = self.oauth_url.format(tenantId=cs_dict["tenant_id"]) # type: ignore
req_body = dict(self.req_body) # type: ignore
req_body["client_id"] = cs_dict["client_id"]
req_body["client_secret"] = cs_dict["client_secret"]
# Authenticate and obtain AAD Token for future calls
def _read_pkg_config_values(self, workspace_name: str = None):
as_settings = pkg_config.settings.get("AzureSentinel")
if not as_settings:
return {}
ws_settings = as_settings.get("Workspaces") # type: ignore
if not ws_settings:
return {}
if workspace_name and workspace_name in ws_settings:
selected_workspace = ws_settings[workspace_name]
elif "Default" in ws_settings:
selected_workspace = ws_settings["Default"]
else:
return {}
if (
selected_workspace
and self.PKG_CONF_WS_KEY in selected_workspace
and self.PKG_CONF_TENANT_KEY in selected_workspace
):
raise TypeError(f"Unknown data environment {data_environment}")
self._environment = data_environment.name
if driver is None:
driver_class = _ENVIRONMENT_DRIVERS[data_environment]
if issubclass(driver_class, DriverBase):
driver = driver_class(**kwargs) # type: ignore
else:
raise LookupError(
"Could not find suitable data provider for", f" {self._environment}"
)
self._query_provider = driver
settings: Dict[str, Any] = config.settings.get( # type: ignore
"QueryDefinitions"
) # type: ignore
all_query_paths = []
for default_path in settings.get("Default"): # type: ignore
qry_path = self._resolve_path(default_path)
if qry_path:
all_query_paths.append(qry_path)
if settings.get("Custom") is not None:
for custom_path in settings.get("Custom"): # type: ignore
qry_path = self._resolve_path(custom_path)
if qry_path:
all_query_paths.append(qry_path)
if query_paths:
all_query_paths.extend(query_paths)
Dict[str, ProviderSettings]
Provider settings indexed by provider name.
"""
# pylint: disable=global-statement
global _SECRETS_CLIENT
# pylint: enable=global-statement
if "KeyVault" in config.settings:
if _SECRETS_CLIENT is None:
print(
"KeyVault enabled. Secrets access may require additional authentication."
)
_SECRETS_CLIENT = SecretsClient()
else:
_SECRETS_CLIENT = None
section_settings = config.settings.get(config_section)
if not section_settings:
return {}
settings = {}
for provider, item_settings in section_settings.items():
prov_args = item_settings.get("Args")
prov_settings = ProviderSettings(
name=provider,
description=item_settings.get("Description"),
args=_get_setting_args(
config_section=config_section,
provider_name=provider,
prov_args=prov_args,
),
primary=item_settings.get("Primary", False),
provider=item_settings.get("Provider", provider),
url : str
The url a screenshot is wanted for.
api_key : str (optional)
Browshot API key. If not set msticpyconfig checked for this.
Returns
-------
image_data: requests.models.Response
The final screenshot request response data.
"""
# Get Broshot API key from kwargs or config
if api_key is not None:
bs_api_key = api_key
elif config.settings.get("Browshot") is not None:
bs_api_key = config.settings.get("Browshot")["Args"]["AuthKey"] # type: ignore
else:
raise AttributeError("No configuration found for Browshot")
# Request screenshot from Browshot and get request ID
id_string = f"https://api.browshot.com/api/v1/screenshot/create?url={url}/&instance_id=26&size=screen&cache=0&key={bs_api_key}" # pylint: disable=line-too-long
id_data = requests.get(id_string)
bs_id = json.loads(id_data.content)["id"]
status_string = (
f"https://api.browshot.com/api/v1/screenshot/info?id={bs_id}&key={bs_api_key}"
)
image_string = f"https://api.browshot.com/api/v1/screenshot/thumbnail?id={bs_id}&zoom=50&key={bs_api_key}" # pylint: disable=line-too-long
# Wait until the screenshot is ready and keep user updated with progress
print("Getting screenshot")
progress = IntProgress(min=0, max=40)
display.display(progress)
ready = False
while ready is False:
def list_workspaces(cls) -> Dict:
"""
Return list of available workspaces.
Returns
-------
Dict
Dictionary of workspaces with workspace and tenantIds.
"""
ws_settings = pkg_config.settings.get("AzureSentinel", {}).get("Workspaces")
if not ws_settings:
return {}
return {
ws_name: {
cls.PKG_CONF_WS_KEY: ws.get(cls.PKG_CONF_WS_KEY),
cls.PKG_CONF_TENANT_KEY: ws.get(cls.PKG_CONF_TENANT_KEY),
}
for ws_name, ws in ws_settings.items()
}