How to use the msticpy.common.utility.MsticpyException function in msticpy

To help you get started, we’ve selected a few msticpy examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github microsoft / msticpy / tests / test_anom_seq_model.py View on Github external
def test__init__(self):
        self.assertRaises(MsticpyException, lambda: Model(sessions=[]))
        self.assertRaises(MsticpyException, lambda: Model(sessions=[[]]))
        self.assertRaises(MsticpyException, lambda: Model(sessions=["Set-User"]))
        self.assertRaises(MsticpyException, lambda: Model(sessions=[["Set-User"], []]))
        self.assertRaises(
            Exception, lambda: Model(sessions=[[{"Set-User": {"Identity"}}]])
        )
github microsoft / msticpy / tests / test_linuxsyslog.py View on Github external
def test_risky_sudo_sessions():
    input_file = os.path.join(_TEST_DATA, "sudo_session_test.csv")
    sudo_events = pd.read_csv(input_file, parse_dates=["TimeGenerated"])
    risky_actions = cl.risky_cmd_line(events=sudo_events, log_type="Syslog")
    suspicious_events = cl.cmd_speed(
        cmd_events=sudo_events, cmd_field="Command", time=60, events=2
    )
    sudo_sessions = ls.cluster_syslog_logons_df(logon_events=sudo_events)
    output = ls.risky_sudo_sessions(
        risky_actions=risky_actions,
        suspicious_actions=suspicious_events,
        sudo_sessions=sudo_sessions,
    )
    assert len(output) == 2  # nosec
    assert type(output) == dict  # nosec
    with raises(MsticpyException):
        ls.risky_sudo_sessions(sudo_sessions=sudo_sessions)
github microsoft / msticpy / tests / test_cmd_line.py View on Github external
def test_risky_cmd_line():
    input_file = os.path.join(_TEST_DATA, "sudo_data.csv")
    input_df = pd.read_csv(input_file)
    output = cl.risky_cmd_line(events=input_df, log_type="Syslog")
    assert len(output) >= 1  # nosec
    assert type(output) == dict  # nosec
    assert output["2019-07-05T18:19:52.873Z"] == "/bin/bash"  # nosec
    with raises(MsticpyException):
        cl.risky_cmd_line(events=input_df, log_type="Syslog", cmd_field="Test")
github microsoft / msticpy / tests / test_cmd_line.py View on Github external
def test_cmd_speed():
    input_file = os.path.join(_TEST_DATA, "sudo_data_speed.csv")
    input_df = pd.read_csv(input_file, parse_dates=["TimeGenerated"])
    output = cl.cmd_speed(cmd_events=input_df, cmd_field="Command")
    assert len(output) >= 1  # nosec
    assert type(output[0]) == dict  # nosec
    with raises(MsticpyException):
        output = cl.cmd_speed(cmd_events=input_df, cmd_field="Test")
github microsoft / msticpy / tests / test_anom_seq_model.py View on Github external
def test__init__(self):
        self.assertRaises(MsticpyException, lambda: Model(sessions=[]))
        self.assertRaises(MsticpyException, lambda: Model(sessions=[[]]))
        self.assertRaises(MsticpyException, lambda: Model(sessions=["Set-User"]))
        self.assertRaises(MsticpyException, lambda: Model(sessions=[["Set-User"], []]))
        self.assertRaises(
            Exception, lambda: Model(sessions=[[{"Set-User": {"Identity"}}]])
        )
github microsoft / msticpy / msticpy / analysis / anomalous_sequence / utils / cmds_params_only.py View on Github external
use_end_token: bool
        if set to True, the end_token will be appended to the window
        before the likelihood calculation is done
    start_token: str
        dummy command to signify the start of the session (e.g. "##START##")
    end_token: str
        dummy command to signify the end of the session (e.g. "##END##")

    Returns
    -------
    likelihood of the window

    """
    if use_end_token:
        if end_token is None:
            raise MsticpyException(
                "end_token should not be None, when use_end_token is True"
            )

    if use_start_token:
        if start_token is None:
            raise MsticpyException(
                "start_token should not be None, when use_start_token is True"
            )

    w_len = len(window)
    if w_len == 0:
        return np.nan
    prob: float = 1

    cur_cmd = window[0].name
    params = window[0].params
github microsoft / msticpy / msticpy / data / azure_data.py View on Github external
@attr.s
class InterfaceItems:
    """attr class to build network interface details dictionary."""

    interface_id = attr.ib()
    private_ip = attr.ib()
    private_ip_allocation = attr.ib()
    public_ip = attr.ib()
    public_ip_allocation = attr.ib()
    app_sec_group = attr.ib()
    subnet = attr.ib()
    subnet_nsg = attr.ib()
    subnet_route_table = attr.ib()


class MsticpyAzureException(MsticpyException):
    """Exception class for AzureData."""


# pylint: enable=too-few-public-methods, too-many-instance-attributes


class AzureData:
    """Class for returning data on an Azure tenant."""

    def __init__(self, connect: bool = False):
        """Initialize connector for Azure Python SDK."""
        self.connected = False
        self.credentials: Optional[ServicePrincipalCredentials] = None
        self.sub_client: Optional[SubscriptionClient] = None
        self.resource_client: Optional[ResourceManagementClient] = None
        self.network_client: Optional[NetworkManagementClient] = None
github microsoft / msticpy / msticpy / sectools / syslog_utils.py View on Github external
]
        )
        .set_index("TimeGenerated")
        .sort_index(ascending=True)
    )
    logons_closed = (
        (
            logon_events[
                logon_events["SyslogMessage"].str.contains("pam_unix.+session closed")
            ]
        )
        .set_index("TimeGenerated")
        .sort_index(ascending=True)
    )
    if logons_opened.empty or logons_closed.empty:
        raise MsticpyException("There are no logon sessions in the supplied data set")

    # For each session identify the likely start and end times
    while ses_opened < len(logons_opened.index) and ses_closed < len(
        logons_closed.index
    ):
        ses_start = (logons_opened.iloc[ses_opened]).name
        ses_end = (logons_closed.iloc[ses_closed]).name
        # If we can identify a user for the session add this to the details
        if "User" in logons_opened.columns:
            user = (logons_opened.iloc[ses_opened]).User
        elif "Sudoer" in logons_opened.columns:
            user = (logons_opened.iloc[ses_opened]).Sudoer
        else:
            user = "Unknown"
        if ses_start > ses_close_time or ses_opened == 0:
            pass
github microsoft / msticpy / msticpy / sectools / syslog_utils.py View on Github external
Dictionary of sudo sessions (as generated by cluster_syslog_logons)
    risky_actions: dict (Optional)
        Dictionary of risky sudo commands (as generated by cmd_line.risky_cmd_line)
    suspicious_actions: list (Optional)
        List of risky sudo commands (as generated by cmd_line.cmd_speed)

    Returns
    -------
    risky_sessions: dict
        A dictionary of sudo sessions with flags denoting risk

    """
    sessions = sudo_sessions[["User", "Start", "End"]].to_dict("index")

    if risky_actions is None and suspicious_actions is None:
        raise MsticpyException(
            "At least one of risky_actions or suspicious_actions must be supplied"
        )

    # Depending on whether we have risky or suspicious acitons or both
    # identify sessions which these actions occur in
    risky_act_sessions: Dict[str, Any] = {}
    susp_act_sessions: Dict[str, Any] = {}
    if risky_actions is not None:
        risky_act_sessions = _find_risky_sudo_session(
            risky_actions=risky_actions, sudo_sessions=sessions
        )
    if suspicious_actions is not None:
        susp_act_sessions = _find_suspicious_sudo_session(
            suspicious_actions=suspicious_actions, sudo_sessions=sessions
        )
    return {**risky_act_sessions, **susp_act_sessions}