Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_test(args, test_case): # pylint: disable=too-many-branches,too-many-statements
"""Run a test."""
util.load_kube_config()
api_client = k8s_client.ApiClient()
t = test_util.TestCase()
t.class_name = "tfjob_test"
namespace, name, env = ks_util.setup_ks_app(args)
t.name = os.path.basename(name)
try: # pylint: disable=too-many-nested-blocks
util.run(["ks", "apply", env, "-c", args.component], cwd=args.app_dir)
logging.info("Created job %s in namespaces %s", name, namespace)
logging.info("Wait for conditions Failed")
results = tf_job_client.wait_for_condition(
api_client, namespace, name, ["Succeeded", "Failed"],
status_callback=tf_job_client.log_status)
def create_namespace(self, ns_name):
self.cluster.create_namespace(client.V1Namespace(metadata=client.V1ObjectMeta(name=ns_name)))
def setUpClass(cls):
cls.config = base.get_e2e_configuration()
cls.path_prefix = "kubernetes/e2e_test/test_yaml/"
cls.test_namespace = "e2e-test-utils"
k8s_client = client.api_client.ApiClient(configuration=cls.config)
core_v1 = client.CoreV1Api(api_client=k8s_client)
body = client.V1Namespace(metadata=client.V1ObjectMeta(name=cls.test_namespace))
core_v1.create_namespace(body=body)
def refresh(self):
"""Refresh the underlying Kubernetes Api Pod object."""
self.obj = client.CoreV1Api().read_namespaced_pod_status(
name=self.name,
namespace=self.namespace,
)
account=None,
namespace=namespace)
# service component
serviceComponent = "modelServer-service"
generate_command = [ks, "generate", "tf-serving-service", serviceComponent]
util.run(generate_command, cwd=app_dir)
ks_deploy(
app_dir,
serviceComponent,
params,
env=None,
account=None,
namespace=namespace)
core_api = k8s_client.CoreV1Api(api_client)
deploy = core_api.read_namespaced_service(args.deploy_name, args.namespace)
cluster_ip = deploy.spec.cluster_ip
if not cluster_ip:
raise ValueError("inception service wasn't assigned a cluster ip.")
util.wait_for_deployment(
api_client, namespace, args.deploy_name, timeout_minutes=10)
logging.info("Verified TF serving started.")
def test_create_pod_from_yaml(self):
"""
Should be able to create a pod.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "core-pod.yaml")
core_api = client.CoreV1Api(k8s_client)
pod = core_api.read_namespaced_pod(name="myapp-pod",
namespace="default")
self.assertIsNotNone(pod)
core_api.delete_namespaced_pod(
name="myapp-pod", namespace="default",
body={})
def __init__(self, get_cmd, env, image, labels={}):
global ApiException
global k8sclient
from kubernetes import config
from kubernetes import client as k8sclient
from kubernetes.client.rest import ApiException
config.load_kube_config()
super(KubernetesServer, self).__init__(get_cmd, env)
self._namespace = 'default'
self._name = 'server-fixtures-%s' % uuid.uuid4()
self._image = image
self._run_cmd = get_cmd()
self._labels = _merge_dicts(labels, {
'server-fixtures': 'kubernetes-server-fixtures',
'server-fixtures/session-id': CONFIG.session_id,
})
self._v1api = k8sclient.CoreV1Api()
def new(cls, name: str) -> 'Namespace':
"""Create a new Namespace with object backing.
Args:
name: The name of the new Namespace.
Returns:
A new Namespace instance.
"""
return cls(client.V1Namespace(
metadata=client.V1ObjectMeta(
name=name
)
def test_ns_create_new(self, mock_ns_read, mock_api, mock_log):
# TODO: We should ideally replicate the correct API exception
mock_ns_read.side_effect = ApiException()
ns_create("a-namespace")
mock_ns_read.assert_called_once_with("a-namespace")
mock_api.create_namespace.assert_called_once()
mock_log.info.assert_called_once_with('Created namespace "a-namespace"')
def wait_to_deployment_to_be_deleted(deployment_name, name_space, time_out=None):
total_sleep_time = 0
while True:
try:
resp = client.AppsV1Api().read_namespaced_deployment(name=deployment_name, namespace=name_space)
except ApiException as e:
if e.status == 404:
print("Total time waiting for delete deployment {0}: {1} sec".format(deployment_name, total_sleep_time))
break
time.sleep(1)
total_sleep_time += 1
if time_out and total_sleep_time > time_out:
raise Exception("Timeout waiting to delete deployment")