Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fetch_latest_rollout(management_service, service_name, access_token):
"""Fetch rollouts"""
if access_token is None:
headers = {}
else:
headers = {"Authorization": "Bearer {}".format(access_token)}
client = urllib3.PoolManager(ca_certs=certifi.where())
service_mgmt_url = SERVICE_MGMT_ROLLOUTS_URL_TEMPLATE.format(management_service,
service_name)
try:
response = client.request("GET", service_mgmt_url, headers=headers)
except:
raise FetchError(1, "Failed to fetch rollouts")
status_code = response.status
if status_code != 200:
message_template = ("Fetching rollouts failed "\
"(status code {}, reason {}, url {})")
raise FetchError(1, message_template.format(status_code,
response.data,
service_mgmt_url))
rollouts = json.loads(response.data)
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
# ca_certs
if configuration.ssl_ca_cert:
ca_certs = configuration.ssl_ca_cert
else:
# if not set certificate file, use Mozilla's root certificates.
ca_certs = certifi.where()
addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501
if configuration.retries is not None:
addition_pool_args['retries'] = configuration.retries
if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
else:
maxsize = 4
# https pool manager
if configuration.proxy:
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'Swagger-Codegen/1.0/python'
self.configuration = configuration
# ca_certs
if configuration.ssl_ca_cert:
ca_certs = configuration.ssl_ca_cert
else:
# if not set certificate file, use Mozilla's root certificates.
ca_certs = certifi.where()
ssl_context = ssl.create_default_context(cafile=ca_certs)
if configuration.cert_file:
ssl_context.load_cert_chain(
configuration.cert_file, keyfile=configuration.key_file
)
connector = aiohttp.TCPConnector(
limit=4,
ssl_context=ssl_context,
verify_ssl=configuration.verify_ssl
)
# https pool manager
self.session = aiohttp.ClientSession(connector=connector)
def getList(listUrl):
http = urllib3.PoolManager(
cert_reqs='CERT_REQUIRED', # Force certificate check.
ca_certs=certifi.where(), # Path to the Certifi bundle.
)
data = http.request('GET', listUrl, timeout=10).data
return data
def es_client():
es = Elasticsearch(
os.getenv('ELASTIC_HOST', 'elasticsearch'),
port=os.getenv('ELASTIC_PORT', '9200'),
http_auth=(os.getenv('ELASTIC_USER', ''),
os.getenv('ELASTIC_PASSWORD', '')),
use_ssl=bool(os.getenv('ELASTIC_SSL', False)),
verify_certs=bool(os.getenv('ELASTIC_VERIFY_CERTS', False)),
ca_certs=certifi.where()
)
for i in range(20):
if es.ping():
return es
print("Elasticsearch not up yet")
time.sleep(1)
print("Elasticsearch doesn't respond to ping")
raise Exception()
def _get_log(url, user_name, password, log_file=None):
import certifi
import urllib3
try:
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
pass
http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where())
headers = urllib3.util.make_headers(basic_auth='{0}:{1}'.format(user_name, password))
r = http.request(
'GET',
url,
headers=headers,
preload_content=False
)
if r.status != 200:
raise CLIError("Failed to connect to '{}' with status code '{}' and reason '{}'".format(
url, r.status, r.reason))
if log_file: # download logs
with open(log_file, 'wb') as f:
while True:
data = r.read(1024)
if not data:
break
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Sending %s request to: %s", self.http_method, url)
logger.debug(" headers: %s", headers)
if self.http_method == 'GET':
logger.debug(" params: %s", req_params['params'])
else:
logger.debug(" params: %s", req_params['data'])
if self.http_session is None:
resp = requests.request(
self.http_method, url, timeout=self.timeout,
auth=auth, verify=certifi.where(), **req_params)
else:
resp = self.http_session.request(
self.http_method, url, timeout=self.timeout,
auth=auth, verify=certifi.where(), **req_params)
# response logging
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Response received from %s", url)
logger.debug(" encoding=%s status:%s",
resp.encoding, resp.status_code)
logger.debug(" content:\n%s", resp.content)
parsed_body = self.parse_body(resp)
self.raise_errors_on_failure(resp)
self.set_rate_limit_details(resp)
return parsed_body
from certifi import where
print(where())
def get_client(self, configuration, pools_size=4, maxsize=None, *args, **kwargs):
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
# ca_certs
if configuration.ssl_ca_cert:
ca_certs = configuration.ssl_ca_cert
else:
# if not set certificate file, use Mozilla's root certificates.
ca_certs = certifi.where()
addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args[
'assert_hostname'] = configuration.assert_hostname
if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
else:
maxsize = 4
# https pool manager
if configuration.proxy:
self.pool_manager = urllib3.ProxyManager(
num_pools=pools_size,