Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.event_time = time.time ()
try:
return self.socket.send(data)
except ssl.SSLError as why:
if why.errno == ssl.SSL_ERROR_WANT_WRITE:
return 0
else:
raise
if __name__ == "__main__":
from rs4 import logger
from skitai import lifetime
log = logger.screen_logger ()
m = composer.Composer ("smtp.gmail.com:587", "", "")
m.add_content ("Hello World<div><img src="cid:A"></div>", "text/html")
#m.add_attachment (r"1.png", cid="A")
for i in range (1):
SMTP_SSL (m, log)
asyncore.loop ()
def get (server, dbname, auth, dbtype):
return pool.get (server, dbname, auth, dbtype)
def cleanup ():
pool.cleanup ()
if __name__ == "__main__":
from skitai import lifetime
from rs4 import logger
from aquests.server.threads import trigger
trigger.start_trigger ()
pool = DBPool (logger.screen_logger ())
def query ():
conn = pool.get ("mydb.us-east-1.rds.amazonaws.com:5432", "mydb", "postgres", "")
conn.execute ("SELECT * FROM cities;")
rs = conn.fetchwait (5)
print(rs.status, rs.result)
conn.execute ("INSERT INTO weather VALUES ('San Francisco', 46, 50, 0.25, '1994-11-27');")
rs = conn.wait (5)
print(rs.status, rs.result)
conn.execute ("INSERT INTO weather VALUES ('San Francisco', 54, 67, 0.25, '1994-11-27');")
rs = conn.wait (5)
print(rs.status, rs.result)
timeout = 10,
cookie = False,
force_http1 = False,
http2_constreams = 1,
allow_redirects = True,
qrandom = False,
use_pool = True,
tracking = False,
backend = False,
dns = []
):
global _logger, _cb_gateway, _concurrent, _initialized, _timeout
global _workers, _que, _allow_redirects, _force_h1
if logger is None:
logger = logger_f.screen_logger ()
_logger = logger
if qrandom:
_que = queue.RandomQueue ()
else:
_que = queue.Queue ()
_allow_redirects = allow_redirects
_force_h1 = request_handler.RequestHandler.FORCE_HTTP_11 = force_http1
if not use_pool:
asynconnect.AsynConnect.keep_connect = use_pool
asynconnect.AsynSSLConnect.keep_connect = use_pool
if not _force_h1:
asynconnect.AsynConnect.fifo_class = await_fifo
asynconnect.AsynSSLConnect.fifo_class = await_fifo
if testset:
item = testset.pop ()
self.handler.handle_request (item, protocol = "udp", callback = test_callback, qtype = "a")
def _print (ans):
if ans:
print (ans[0]['name'], ans[-1]['data'])
else:
print ("FAILED")
if __name__ == "__main__":
from rs4 import logger
import pprint
create_pool (PUBLIC_DNS_SERVERS, logger.screen_logger ())
for i in range (4):
#query ("www.microsoft.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.cnn.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.gitlab.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.alexa.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.yahoo.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.github.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.google.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.amazon.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.almec.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.alamobeauty.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.alphaworld.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.allrightsales.com", protocol = "udp", callback = _print, qtype="a")
query ("www.glasteel.com", protocol = "udp", callback = _print, qtype="a")
pop_all ()
if testset:
item = testset.pop ()
self.handler.handle_request (item, protocol = "udp", callback = test_callback, qtype = "a")
def _print (ans):
if ans:
print (ans[0]['name'], ans[-1]['data'])
else:
print ("FAILED")
if __name__ == "__main__":
from rs4 import logger
import pprint
create_pool (PUBLIC_DNS_SERVERS, logger.screen_logger ())
for i in range (4):
#query ("www.microsoft.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.cnn.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.gitlab.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.alexa.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.yahoo.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.github.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.google.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.amazon.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.almec.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.alamobeauty.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.alphaworld.com", protocol = "udp", callback = _print, qtype="a")
#query ("www.allrightsales.com", protocol = "udp", callback = _print, qtype="a")
query ("www.glasteel.com", protocol = "udp", callback = _print, qtype="a")
pop_all ()