Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_basic():
test_support.requires('network')
from eventlet.green import urllib
if test_support.verbose:
print "test_basic ..."
socket.RAND_status()
try:
socket.RAND_egd(1)
except TypeError:
pass
else:
print "didn't raise TypeError"
socket.RAND_add("this is a random string", 75.0)
try:
f = urllib.urlopen('https://sf.net')
except IOError, exc:
if exc.errno == errno.ETIMEDOUT:
raise test_support.ResourceDenied('HTTPS connection is timing out')
else:
raise
buf = f.read()
def _nfq_worker(self):
s = socket.fromfd(self.nfq.fd, socket.AF_UNIX, socket.SOCK_STREAM)
try:
while True:
LOG.debug('NFQ Recv')
nfad = s.recv(self.NFQ_SOCKET_BUFFER_SIZE)
self.nfq.handle_packet(nfad)
LOG.error('NFQ Worker loop leave!')
finally:
LOG.error('NFQ Worker closing socket!')
s.close()
self.nfq.close()
sys.exit(1)
def run_server():
import eventlet
import eventlet.wsgi
import eventlet.green
addresses = eventlet.green.socket.getaddrinfo(host, port)
if not addresses:
raise RuntimeError('Could not resolve host to a valid address')
eventlet_socket = eventlet.listen(addresses[0][4], addresses[0][0])
# If provided an SSL argument, use an SSL socket
ssl_args = ['keyfile', 'certfile', 'server_side', 'cert_reqs',
'ssl_version', 'ca_certs',
'do_handshake_on_connect', 'suppress_ragged_eofs',
'ciphers']
ssl_params = {k: kwargs[k] for k in kwargs if k in ssl_args}
if len(ssl_params) > 0:
for k in ssl_params:
kwargs.pop(k)
ssl_params['server_side'] = True # Listening requires true
eventlet_socket = eventlet.wrap_ssl(eventlet_socket,
**ssl_params)
def _get_local_address(node):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((node['ip'], node['port']))
result = s.getsockname()[0]
s.shutdown(socket.SHUT_RDWR)
s.close()
return result
def setup(self):
# overriding SocketServer.setup to correctly handle SSL.Connection objects
conn = self.connection = self.request
# TCP_QUICKACK is a better alternative to disabling Nagle's algorithm
# https://news.ycombinator.com/item?id=10607422
if getattr(socket, 'TCP_QUICKACK', None):
try:
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, True)
except socket.error:
pass
try:
self.rfile = conn.makefile('rb', self.rbufsize)
self.wfile = conn.makefile('wb', self.wbufsize)
except (AttributeError, NotImplementedError):
if hasattr(conn, 'send') and hasattr(conn, 'recv'):
# it's an SSL.Connection
self.rfile = socket._fileobject(conn, "rb", self.rbufsize)
self.wfile = socket._fileobject(conn, "wb", self.wbufsize)
else:
# it's a SSLObject, or a martian
raise NotImplementedError(
# Were we using https before?
old_use_ssl = (old_conf is not None and not (
not old_conf.get('key_file') or
not old_conf.get('cert_file')))
# Do we now need to perform an SSL wrap on the socket?
wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
# Do we now need to perform an SSL unwrap on the socket?
unwrap_sock = use_ssl is False and old_use_ssl is True
if new_sock:
self._sock = None
if old_conf is not None:
self.sock.close()
_sock = get_socket(self.default_port)
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
self._sock = _sock
if wrap_sock:
self.sock = ssl_wrap_socket(self._sock)
if unwrap_sock:
self.sock = self._sock
if new_sock and not use_ssl:
self.sock = self._sock
# Pick up newly deployed certs
if old_conf is not None and use_ssl is True and old_use_ssl is True:
"Connect to a host on a given (SSL) port."
super().connect()
if self._tunnel_host:
server_hostname = self._tunnel_host
else:
server_hostname = self.host
self.sock = self._context.wrap_socket(self.sock,
server_hostname=server_hostname)
if not self._context.check_hostname and self._check_hostname:
try:
ssl.match_hostname(self.sock.getpeercert(), server_hostname)
except Exception:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
if unwrap_sock:
self.sock = self._sock
if new_sock and not use_ssl:
self.sock = self._sock
# Pick up newly deployed certs
if old_conf is not None and use_ssl is True and old_use_ssl is True:
if has_changed('cert_file'):
self.sock.certfile = self.conf.cert_file
if has_changed('key_file'):
self.sock.keyfile = self.conf.key_file
if new_sock or (old_conf is not None and has_changed('tcp_keepidle')):
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
self.conf.tcp_keepidle)
if old_conf is not None and has_changed('backlog'):
self.sock.listen(self.conf.backlog)
not old_conf.get('cert_file')))
# Do we now need to perform an SSL wrap on the socket?
wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
# Do we now need to perform an SSL unwrap on the socket?
unwrap_sock = use_ssl is False and old_use_ssl is True
if new_sock:
self._sock = None
if old_conf is not None:
self.sock.close()
_sock = get_socket(self.default_port)
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
self._sock = _sock
if wrap_sock:
self.sock = ssl_wrap_socket(self._sock)
if unwrap_sock:
self.sock = self._sock
if new_sock and not use_ssl:
self.sock = self._sock
# Pick up newly deployed certs
if old_conf is not None and use_ssl is True and old_use_ssl is True:
if has_changed('cert_file') or has_changed('key_file'):
utils.validate_key_cert(CONF.api.key_file, CONF.api.cert_file)
if has_changed('cert_file'):