Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_family_to_string():
assert sshuttle.helpers.family_to_string(AF_INET) == "AF_INET"
assert sshuttle.helpers.family_to_string(AF_INET6) == "AF_INET6"
if sys.version_info < (3, 0):
expected = "1"
assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == "1"
else:
expected = 'AddressFamily.AF_UNIX'
assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == expected
def test_family_to_string():
assert sshuttle.helpers.family_to_string(AF_INET) == "AF_INET"
assert sshuttle.helpers.family_to_string(AF_INET6) == "AF_INET6"
if sys.version_info < (3, 0):
expected = "1"
assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == "1"
else:
expected = 'AddressFamily.AF_UNIX'
assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == expected
def ipt_chain_exists(family, table, name):
if family == socket.AF_INET6:
cmd = 'ip6tables'
elif family == socket.AF_INET:
cmd = 'iptables'
else:
raise Exception('Unsupported family "%s"' % family_to_string(family))
argv = [cmd, '-t', table, '-nL']
env = {
'PATH': os.environ['PATH'],
'LC_ALL': "C",
}
try:
output = ssubprocess.check_output(argv, env=env)
for line in output.decode('ASCII').split('\n'):
if line.startswith('Chain %s ' % name):
return True
except ssubprocess.CalledProcessError as e:
raise Fatal('%r returned %d' % (argv, e.returncode))
def setup_firewall(self, port, dnsport, nslist, family, subnets, udp,
user):
# IPv6 not supported
if family not in [socket.AF_INET]:
raise Exception(
'Address family "%s" unsupported by ipfw method_name'
% family_to_string(family))
# XXX: Any risk from this?
ipfw_noexit('delete', '1')
while _changedctls:
name = _changedctls.pop()
oldval = _oldctls[name]
_sysctl_set(name, oldval)
if subnets or dnsport:
sysctl_set('net.inet.ip.fw.enable', 1)
ipfw('add', '1', 'check-state', 'ip',
'from', 'any', 'to', 'any')
ipfw('add', '1', 'skipto', '2',
def ipt(family, table, *args):
if family == socket.AF_INET6:
argv = ['ip6tables', '-t', table] + list(args)
elif family == socket.AF_INET:
argv = ['iptables', '-t', table] + list(args)
else:
raise Exception('Unsupported family "%s"' % family_to_string(family))
debug1('>> %s\n' % ' '.join(argv))
env = {
'PATH': os.environ['PATH'],
'LC_ALL': "C",
}
rv = ssubprocess.call(argv, env=env)
if rv:
raise Fatal('%r returned %d' % (argv, rv))
def restore_firewall(self, port, family, udp, user):
if family not in [socket.AF_INET, socket.AF_INET6]:
raise Exception(
'Address family "%s" unsupported by tproxy method'
% family_to_string(family))
table = "mangle"
def _ipt(*args):
return ipt(family, table, *args)
def _ipt_ttl(*args):
return ipt_ttl(family, table, *args)
mark_chain = 'sshuttle-m-%s' % port
tproxy_chain = 'sshuttle-t-%s' % port
divert_chain = 'sshuttle-d-%s' % port
# basic cleanup/setup of chains
if ipt_chain_exists(family, table, mark_chain):
_ipt('-D', 'OUTPUT', '-j', mark_chain)
def nft(family, table, action, *args):
if family == socket.AF_INET:
argv = ['nft', action, 'ip', table] + list(args)
elif family == socket.AF_INET6:
argv = ['nft', action, 'ip6', table] + list(args)
else:
raise Exception('Unsupported family "%s"' % family_to_string(family))
debug1('>> %s\n' % ' '.join(argv))
env = {
'PATH': os.environ['PATH'],
'LC_ALL': "C",
}
rv = ssubprocess.call(argv, env=env)
if rv:
raise Fatal('%r returned %d' % (argv, rv))
def restore_firewall(self, port, family, udp, user):
# only ipv4 supported with NAT
if family != socket.AF_INET:
raise Exception(
'Address family "%s" unsupported by nat method_name'
% family_to_string(family))
if udp:
raise Exception("UDP not supported by nat method_name")
table = "nat"
def _ipt(*args):
return ipt(family, table, *args)
def _ipt_ttl(*args):
return ipt_ttl(family, table, *args)
def _ipm(*args):
return ipt(family, "mangle", *args)
chain = 'sshuttle-%s' % port
def restore_firewall(self, port, family, udp, user):
if family not in [socket.AF_INET, socket.AF_INET6]:
raise Exception(
'Address family "%s" unsupported by pf method_name'
% family_to_string(family))
if udp:
raise Exception("UDP not supported by pf method_name")
pf.disable(pf_get_anchor(family, port))
def setup_firewall(self, port, dnsport, nslist, family, subnets, udp,
user):
if family not in [socket.AF_INET, socket.AF_INET6]:
raise Exception(
'Address family "%s" unsupported by pf method_name'
% family_to_string(family))
if udp:
raise Exception("UDP not supported by pf method_name")
if subnets:
includes = []
# If a given subnet is both included and excluded, list the
# exclusion first; the table will ignore the second, opposite
# definition
for _, swidth, sexclude, snet, fport, lport \
in sorted(subnets, key=subnet_weight):
includes.append((sexclude, b"%s/%d%s" % (
snet.encode("ASCII"),
swidth,
b" port %d:%d" % (fport, lport) if fport else b"")))
anchor = pf_get_anchor(family, port)