Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test__init__(self):
self.assertRaises(MsticpyException, lambda: Model(sessions=[]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[[]]))
self.assertRaises(MsticpyException, lambda: Model(sessions=["Set-User"]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[["Set-User"], []]))
self.assertRaises(
Exception, lambda: Model(sessions=[[{"Set-User": {"Identity"}}]])
)
def test_risky_sudo_sessions():
input_file = os.path.join(_TEST_DATA, "sudo_session_test.csv")
sudo_events = pd.read_csv(input_file, parse_dates=["TimeGenerated"])
risky_actions = cl.risky_cmd_line(events=sudo_events, log_type="Syslog")
suspicious_events = cl.cmd_speed(
cmd_events=sudo_events, cmd_field="Command", time=60, events=2
)
sudo_sessions = ls.cluster_syslog_logons_df(logon_events=sudo_events)
output = ls.risky_sudo_sessions(
risky_actions=risky_actions,
suspicious_actions=suspicious_events,
sudo_sessions=sudo_sessions,
)
assert len(output) == 2 # nosec
assert type(output) == dict # nosec
with raises(MsticpyException):
ls.risky_sudo_sessions(sudo_sessions=sudo_sessions)
def test_risky_cmd_line():
input_file = os.path.join(_TEST_DATA, "sudo_data.csv")
input_df = pd.read_csv(input_file)
output = cl.risky_cmd_line(events=input_df, log_type="Syslog")
assert len(output) >= 1 # nosec
assert type(output) == dict # nosec
assert output["2019-07-05T18:19:52.873Z"] == "/bin/bash" # nosec
with raises(MsticpyException):
cl.risky_cmd_line(events=input_df, log_type="Syslog", cmd_field="Test")
def test_cmd_speed():
input_file = os.path.join(_TEST_DATA, "sudo_data_speed.csv")
input_df = pd.read_csv(input_file, parse_dates=["TimeGenerated"])
output = cl.cmd_speed(cmd_events=input_df, cmd_field="Command")
assert len(output) >= 1 # nosec
assert type(output[0]) == dict # nosec
with raises(MsticpyException):
output = cl.cmd_speed(cmd_events=input_df, cmd_field="Test")
def test__init__(self):
self.assertRaises(MsticpyException, lambda: Model(sessions=[]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[[]]))
self.assertRaises(MsticpyException, lambda: Model(sessions=["Set-User"]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[["Set-User"], []]))
self.assertRaises(
Exception, lambda: Model(sessions=[[{"Set-User": {"Identity"}}]])
)
use_end_token: bool
if set to True, the end_token will be appended to the window
before the likelihood calculation is done
start_token: str
dummy command to signify the start of the session (e.g. "##START##")
end_token: str
dummy command to signify the end of the session (e.g. "##END##")
Returns
-------
likelihood of the window
"""
if use_end_token:
if end_token is None:
raise MsticpyException(
"end_token should not be None, when use_end_token is True"
)
if use_start_token:
if start_token is None:
raise MsticpyException(
"start_token should not be None, when use_start_token is True"
)
w_len = len(window)
if w_len == 0:
return np.nan
prob: float = 1
cur_cmd = window[0].name
params = window[0].params
@attr.s
class InterfaceItems:
"""attr class to build network interface details dictionary."""
interface_id = attr.ib()
private_ip = attr.ib()
private_ip_allocation = attr.ib()
public_ip = attr.ib()
public_ip_allocation = attr.ib()
app_sec_group = attr.ib()
subnet = attr.ib()
subnet_nsg = attr.ib()
subnet_route_table = attr.ib()
class MsticpyAzureException(MsticpyException):
"""Exception class for AzureData."""
# pylint: enable=too-few-public-methods, too-many-instance-attributes
class AzureData:
"""Class for returning data on an Azure tenant."""
def __init__(self, connect: bool = False):
"""Initialize connector for Azure Python SDK."""
self.connected = False
self.credentials: Optional[ServicePrincipalCredentials] = None
self.sub_client: Optional[SubscriptionClient] = None
self.resource_client: Optional[ResourceManagementClient] = None
self.network_client: Optional[NetworkManagementClient] = None
]
)
.set_index("TimeGenerated")
.sort_index(ascending=True)
)
logons_closed = (
(
logon_events[
logon_events["SyslogMessage"].str.contains("pam_unix.+session closed")
]
)
.set_index("TimeGenerated")
.sort_index(ascending=True)
)
if logons_opened.empty or logons_closed.empty:
raise MsticpyException("There are no logon sessions in the supplied data set")
# For each session identify the likely start and end times
while ses_opened < len(logons_opened.index) and ses_closed < len(
logons_closed.index
):
ses_start = (logons_opened.iloc[ses_opened]).name
ses_end = (logons_closed.iloc[ses_closed]).name
# If we can identify a user for the session add this to the details
if "User" in logons_opened.columns:
user = (logons_opened.iloc[ses_opened]).User
elif "Sudoer" in logons_opened.columns:
user = (logons_opened.iloc[ses_opened]).Sudoer
else:
user = "Unknown"
if ses_start > ses_close_time or ses_opened == 0:
pass
Dictionary of sudo sessions (as generated by cluster_syslog_logons)
risky_actions: dict (Optional)
Dictionary of risky sudo commands (as generated by cmd_line.risky_cmd_line)
suspicious_actions: list (Optional)
List of risky sudo commands (as generated by cmd_line.cmd_speed)
Returns
-------
risky_sessions: dict
A dictionary of sudo sessions with flags denoting risk
"""
sessions = sudo_sessions[["User", "Start", "End"]].to_dict("index")
if risky_actions is None and suspicious_actions is None:
raise MsticpyException(
"At least one of risky_actions or suspicious_actions must be supplied"
)
# Depending on whether we have risky or suspicious acitons or both
# identify sessions which these actions occur in
risky_act_sessions: Dict[str, Any] = {}
susp_act_sessions: Dict[str, Any] = {}
if risky_actions is not None:
risky_act_sessions = _find_risky_sudo_session(
risky_actions=risky_actions, sudo_sessions=sessions
)
if suspicious_actions is not None:
susp_act_sessions = _find_suspicious_sudo_session(
suspicious_actions=suspicious_actions, sudo_sessions=sessions
)
return {**risky_act_sessions, **susp_act_sessions}