How to use cbapi - 10 common examples

To help you get started, we’ve selected a few cbapi examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
def test_query(monkeypatch):
    _was_called = False

    def mock_post_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs"
        assert body["sql"] == "select * from whatever;"
        _was_called = True
        return MockResponse({"org_key": "Z100", "name": "FoobieBletch", "id": "abcdefg"})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.query("select * from whatever;")
    assert isinstance(query, RunQuery)
    run = query.submit()
    assert _was_called
    assert run.org_key == "Z100"
    assert run.name == "FoobieBletch"
    assert run.id == "abcdefg"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
_was_called = False

    def mock_post_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs"
        assert body["sql"] == "select * from whatever;"
        assert body["name"] == "AmyWasHere"
        assert body["notify_on_finish"]
        df = body["device_filter"]
        assert df["device_ids"] == [1, 2, 3]
        assert df["device_types"] == ["Alpha", "Bravo", "Charlie"]
        assert df["policy_ids"] == [16, 27, 38]
        _was_called = True
        return MockResponse({"org_key": "Z100", "name": "FoobieBletch", "id": "abcdefg"})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.query("select * from whatever;").device_ids([1, 2, 3])
    query = query.device_types(["Alpha", "Bravo", "Charlie"])
    query = query.policy_ids([16, 27, 38])
    query = query.name("AmyWasHere").notify_on_finish()
    assert isinstance(query, RunQuery)
    run = query.submit()
    assert _was_called
    assert run.org_key == "Z100"
    assert run.name == "FoobieBletch"
    assert run.id == "abcdefg"
github carbonblack / cbapi-python / test / cbapi / psc / test_rest_api.py View on Github external
def test_query_cbanalyticsalert_invalid_not_blocked_threat_categories():
    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
                       org_key="Z100", ssl_verify=True)
    with pytest.raises(ApiError):
        api.select(CBAnalyticsAlert).not_blocked_threat_categories(["MINOR"])
github carbonblack / cbapi-python / test / cbapi / psc / test_rest_api.py View on Github external
def test_query_basealert_invalid_types():
    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
                       org_key="Z100", ssl_verify=True)
    with pytest.raises(ApiError):
        api.select(BaseAlert).types(["ERBOSOFT"])
github carbonblack / cbapi-python / test / cbapi / psc / test_models.py View on Github external
def test_BaseAlert_undismiss(monkeypatch):
    _was_called = False

    def _do_update(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/appservices/v6/orgs/Z100/alerts/ESD14U2C/workflow"
        assert body == {"state": "OPEN", "remediation_state": "Fixed", "comment": "NoSir"}
        _was_called = True
        return StubResponse({"state": "OPEN", "remediation": "Fixed", "comment": "NoSir",
                             "changed_by": "Robocop", "last_update_time": "2019-10-31T16:03:13.951Z"})

    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234", org_key="Z100", ssl_verify=True)
    patch_cbapi(monkeypatch, api, POST=_do_update)
    alert = BaseAlert(api, "ESD14U2C", {"id": "ESD14U2C", "workflow": {"state": "DISMISS"}})
    alert.update("Fixed", "NoSir")
    assert _was_called
    assert alert.workflow_.changed_by == "Robocop"
    assert alert.workflow_.state == "OPEN"
    assert alert.workflow_.remediation == "Fixed"
    assert alert.workflow_.comment == "NoSir"
    assert alert.workflow_.last_update_time == "2019-10-31T16:03:13.951Z"
github carbonblack / cbapi-python / test / cbapi / psc / test_rest_api.py View on Github external
def test_query_device_last_contact_time_start_specified_bad():
    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
                       org_key="Z100", ssl_verify=True)
    with pytest.raises(ApiError):
        api.select(Device).last_contact_time(start="2019-09-30T12:34:56", \
                                             range="-3w")
github carbonblack / cbapi-python / test / cbapi / psc / test_rest_api.py View on Github external
assert t["os"] == [ "LINUX" ]
        assert t["policy_id"] == [ 8675309 ]
        assert t["status"] == [ "ALL" ]
        assert t["target_priority"] == [ "HIGH" ]
        t = body.get("exclusions", {})
        assert t["sensor_version"] == [ "0.1" ]
        t = body.get("sort", [])
        t2 = t[0]
        assert t2["field"] == "name"
        assert t2["order"] == "DESC"
        _was_called = True
        body = { "id": 6023, "organization_name": "thistestworks" }
        envelope = { "results": [ body ], "num_found": 1 }
        return MockResponse(envelope)
    
    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
                       org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.select(Device).where("foobar").ad_group_ids([ 14, 25 ]) \
        .os([ "LINUX" ]).policy_ids([ 8675309 ]).status([ "ALL" ]) \
        .target_priorities(["HIGH"]).exclude_sensor_versions(["0.1"]) \
        .sort_by("name", "DESC")
    d = query.one()
    assert _was_called
    assert d.id == 6023
    assert d.organization_name == "thistestworks"
github carbonblack / cbapi-python / test / cbapi / psc / test_rest_api.py View on Github external
def test_query_basealert_invalid_workflows():
    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
                       org_key="Z100", ssl_verify=True)
    with pytest.raises(ApiError):
        api.select(BaseAlert).workflows(["IN_LIMBO"])
github carbonblack / cbapi-python / test / cbapi / psc / test_models.py View on Github external
def test_Device_lr_session(monkeypatch):

    def _get_session(url, parms=None, default=None):
        assert url == "/appservices/v6/orgs/Z100/devices/6023"
        return {"id": 6023}

    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234", org_key="Z100", ssl_verify=True)
    sked = StubScheduler(6023)
    api._lr_scheduler = sked
    patch_cbapi(monkeypatch, api, GET=_get_session)
    dev = Device(api, 6023, {"id": 6023})
    sess = dev.lr_session()
    assert sess["itworks"]
    assert sked.was_called
github carbonblack / cbapi-python / test / cbapi / psc / test_rest_api.py View on Github external
assert t["policy_applied"] == ["APPLIED"]
        assert t["reason_code"] == ["ATTACK_VECTOR"]
        assert t["run_state"] == ["RAN"]
        assert t["sensor_action"] == ["DENY"]
        assert t["threat_cause_vector"] == ["WEB"]
        
        t = body["sort"]
        t2 = t[0]
        assert t2["field"] == "name"
        assert t2["order"] == "DESC"
        _was_called = True
        body = {"id": "S0L0", "org_key": "Z100", "threat_id": "B0RG", "workflow": {"state": "OPEN"}}
        envelope = { "results": [ body ], "num_found": 1 }
        return MockResponse(envelope)
        
    api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
                       org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.select(CBAnalyticsAlert).where("Blort").categories(["SERIOUS", "CRITICAL"]).device_ids([6023]) \
        .device_names(["HAL"]).device_os(["LINUX"]).device_os_versions(["0.1.2"]).device_username(["JRN"]) \
        .group_results(True).alert_ids(["S0L0"]).legacy_alert_ids(["S0L0_1"]).minimum_severity(6) \
        .policy_ids([8675309]).policy_names(["Strict"]).process_names(["IEXPLORE.EXE"]) \
        .process_sha256(["0123456789ABCDEF0123456789ABCDEF"]).reputations(["SUSPECT_MALWARE"]) \
        .tags(["Frood"]).target_priorities(["HIGH"]).threat_ids(["B0RG"]).types(["WATCHLIST"]) \
        .workflows(["OPEN"]).blocked_threat_categories(["RISKY_PROGRAM"]).device_locations(["ONSITE"]) \
        .kill_chain_statuses(["EXECUTE_GOAL"]).not_blocked_threat_categories(["NEW_MALWARE"]) \
        .policy_applied(["APPLIED"]).reason_code(["ATTACK_VECTOR"]).run_states(["RAN"]) \
        .sensor_actions(["DENY"]).threat_cause_vectors(["WEB"]).sort_by("name", "DESC")
    a = query.one()