Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
scmr_handle = client._service._scmr_handle
services = scmr.enum_services_status_w(scmr_handle,
ServiceType.
SERVICE_WIN32_OWN_PROCESS,
EnumServiceState.
SERVICE_STATE_ALL)
for service in services:
if service['service_name'].lower().startswith("paexec"):
paexec_services.append(service['service_name'])
smb_tree = TreeConnect(client.session,
r"\\%s\ADMIN$" % client.connection.server_name)
smb_tree.connect()
share = Open(smb_tree, "")
share.create(ImpersonationLevel.Impersonation,
DirectoryAccessMask.FILE_READ_ATTRIBUTES |
DirectoryAccessMask.SYNCHRONIZE |
DirectoryAccessMask.FILE_LIST_DIRECTORY,
FileAttributes.FILE_ATTRIBUTE_DIRECTORY,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_DIRECTORY_FILE)
try:
paexec_files = share.query_directory("PAExec-*.exe",
FileInformationClass.
FILE_NAMES_INFORMATION)
except SMBResponseException as exc:
if exc.status != NtStatus.STATUS_NO_SUCH_FILE:
password = os.environ.get('PYPSEXEC_PASSWORD', None)
if server and username and password:
connection = Connection(uuid.uuid4(), server, 445)
session = Session(connection, username, password)
tree = TreeConnect(session, r"\\%s\ADMIN$" % server)
paexec_file = Open(tree, "PAExec.exe")
connection.connect()
try:
session.connect()
tree.connect()
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
paexec_file.write(pkgutil.get_data('pypsexec', 'paexec.exe'), 0)
paexec_file.close(get_attributes=False)
yield session
finally:
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.DELETE,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_DELETE_ON_CLOSE)
paexec_file.close(get_attributes=False)
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
paexec_file.write(pkgutil.get_data('pypsexec', 'paexec.exe'), 0)
paexec_file.close(get_attributes=False)
yield session
finally:
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.DELETE,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_DELETE_ON_CLOSE)
paexec_file.close(get_attributes=False)
connection.disconnect(True)
else:
pytest.skip("PYPSEXEC_SERVER, PYPSEXEC_USERNAME, PYPSEXEC_PASSWORD"
" environment variables were not set. Integration "
session = Session(connection, username, password)
tree = TreeConnect(session, r"\\%s\ADMIN$" % server)
paexec_file = Open(tree, "PAExec.exe")
connection.connect()
try:
session.connect()
tree.connect()
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
paexec_file.write(pkgutil.get_data('pypsexec', 'paexec.exe'), 0)
paexec_file.close(get_attributes=False)
yield session
finally:
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.DELETE,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_DELETE_ON_CLOSE)
paexec_file.close(get_attributes=False)
connection.disconnect(True)
else:
pytest.skip("PYPSEXEC_SERVER, PYPSEXEC_USERNAME, PYPSEXEC_PASSWORD"
smb_tree = TreeConnect(client.session,
r"\\%s\ADMIN$" % client.connection.server_name)
smb_tree.connect()
share = Open(smb_tree, "")
share.create(ImpersonationLevel.Impersonation,
DirectoryAccessMask.FILE_READ_ATTRIBUTES |
DirectoryAccessMask.SYNCHRONIZE |
DirectoryAccessMask.FILE_LIST_DIRECTORY,
FileAttributes.FILE_ATTRIBUTE_DIRECTORY,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_DIRECTORY_FILE)
try:
paexec_files = share.query_directory("PAExec-*.exe",
FileInformationClass.
FILE_NAMES_INFORMATION)
except SMBResponseException as exc:
if exc.status != NtStatus.STATUS_NO_SUCH_FILE:
raise exc
paexec_files = []
return client, paexec_services, paexec_files
def test_marshal_string_no_padding(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
expected = b"\x02\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x02\x00\x00\x00" \
b"\x68\x00\x00\x00"
actual = api._marshal_string("h")
assert actual == expected
def test_parse_error(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
with pytest.raises(SCMRException) as exc:
api._parse_error(5, "function_name")
assert str(exc.value) == "Exception calling function_name. Code: 5" \
", Msg: ERROR_ACCESS_DENIED"
def test_parse_error_unknown(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
with pytest.raises(SCMRException) as exc:
api._parse_error(999, "function_name")
assert str(exc.value) == "Exception calling function_name. Code: 999" \
", Msg: ERROR_UNKNOWN"
def test_parse_pdu_failure(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
fault_pdu = FaultPDU()
fault_pdu['packed_drep'] = DataRepresentationFormat()
with pytest.raises(PDUException) as exc:
api._parse_pdu(fault_pdu.pack(), 10)
assert "Expecting ResponsePDU for opnum 10 response but got: " \
"FaultPDU" in str(exc.value)
def test_marshal_string_as_referent(self):
connection = Connection(uuid.uuid4(), "server", 445)
session = Session(connection, "user", "password")
api = SCMRApi(session)
expected = b"\x00\x00\x00\x01" \
b"\x03\x00\x00\x00" \
b"\x00\x00\x00\x00" \
b"\x03\x00\x00\x00" \
b"\x68\x00\x69\x00\x00\x00" \
b"\x00\x00"
actual = api._marshal_string("hi", unique=True)
assert actual == expected