Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def swift_auth_v2(self):
self.tenant, self.user = self.user.split(';')
auth_dict = {}
auth_dict['auth'] = {'passwordCredentials':
{
'username': self.user,
'password': self.password,
},
'tenantName': self.tenant}
auth_json = json_dumps(auth_dict)
headers = {'Content-Type': 'application/json'}
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
path = urlparse.urlparse(self.auth_url).path
if not path.endswith('tokens'):
path = posixpath.join(path, 'tokens')
ret = auth_httpclient.request('POST', path,
body=auth_json,
headers=headers)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v2.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
def swift_auth_v2(self):
self.tenant, self.user = self.user.split(';')
auth_dict = {}
auth_dict['auth'] = {'passwordCredentials':
{
'username': self.user,
'password': self.password,
},
'tenantName': self.tenant}
auth_json = json_dumps(auth_dict)
headers = {'Content-Type': 'application/json'}
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
path = urlparse.urlparse(self.auth_url).path
if not path.endswith('tokens'):
path = posixpath.join(path, 'tokens')
ret = auth_httpclient.request('POST', path,
body=auth_json,
headers=headers)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v2.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
self.conf.getint('swift', 'http_pool_length') or 10
self.region_name = self.conf.get("swift", "region_name") or "RegionOne"
self.endpoint_type = \
self.conf.get("swift", "endpoint_type") or "internalURL"
self.cache_length = self.conf.getint("swift", "cache_length") or 20
self.chunk_length = self.conf.getint("swift", "chunk_length") or 12228
self.root = root
block_size = 1024 * 12 # 12KB
if self.auth_ver == "1":
self.storage_url, self.token = self.swift_auth_v1()
else:
self.storage_url, self.token = self.swift_auth_v2()
token_header = {'X-Auth-Token': str(self.token)}
self.httpclient = \
HTTPClient.from_url(str(self.storage_url),
concurrency=self.http_pool_length,
block_size=block_size,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
headers=token_header)
self.base_path = str(posixpath.join(
urlparse.urlparse(self.storage_url).path, self.root))
'oauth_consumer_key': oauthlib_consumer.key,
'locations': '-122.75,36.8,-121.75,37.8' # San Francisco
}
url = URL('https://stream.twitter.com/1/statuses/filter.json')
req = oauthlib.Request.from_consumer_and_token(
oauthlib_consumer,
token=token,
http_url=str(url),
http_method='POST')
signature_method = oauthlib.SignatureMethod_HMAC_SHA1()
req = oauthlib.Request(method="POST", url=str(url), parameters=params)
req.sign_request(signature_method, oauthlib_consumer, token)
http = HTTPClient.from_url(url)
response = http.request('POST', url.request_uri,
body=req.to_postdata(),
headers={'Content-Type': 'application/x-www-form-urlencoded',
'Accept': '*/*'})
data = json.loads(response.readline())
while data:
pp(data)
data = json.loads(response.readline())
#!/usr/bin/env python
from geventhttpclient import HTTPClient, URL
if __name__ == "__main__":
url = URL('http://127.0.0.1:80/100.dat')
http = HTTPClient.from_url(url)
response = http.get(url.request_uri)
assert response.status_code == 200
CHUNK_SIZE = 1024 * 16 # 16KB
with open('/tmp/100.dat', 'w') as f:
data = response.read(CHUNK_SIZE)
while data:
f.write(data)
data = response.read(CHUNK_SIZE)
self.conf.getint('swift', 'http_pool_length') or 10
self.region_name = self.conf.get("swift", "region_name") or "RegionOne"
self.endpoint_type = \
self.conf.get("swift", "endpoint_type") or "internalURL"
self.cache_length = self.conf.getint("swift", "cache_length") or 20
self.chunk_length = self.conf.getint("swift", "chunk_length") or 12228
self.root = root
block_size = 1024 * 12 # 12KB
if self.auth_ver == "1":
self.storage_url, self.token = self.swift_auth_v1()
else:
self.storage_url, self.token = self.swift_auth_v2()
token_header = {'X-Auth-Token': str(self.token)}
self.httpclient = \
HTTPClient.from_url(str(self.storage_url),
concurrency=self.http_pool_length,
block_size=block_size,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
headers=token_header)
self.base_path = str(posixpath.join(
urlparse.urlparse(self.storage_url).path, self.root))