Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def control_wait_for(self, goal, interval_sec=1):
"""
control_wait_for will poll Eva's state, waiting for Eva to reach the goal state
"""
parsed_goal = RobotState(goal)
while True:
robot_state = RobotState(self.data_snapshot()['control']['state'])
if robot_state == RobotState.ERROR:
eva_error('Eva is in error control state')
elif robot_state == parsed_goal:
return
time.sleep(interval_sec)
def data_snapshot(self):
r = self.api_call_with_auth('GET', 'data/snapshot')
if r.status_code != 200:
eva_error('data_snapshot request error', r)
return r.json()['snapshot']
def lock_status(self):
r = self.api_call_with_auth('GET', 'controls/lock')
if r.status_code != 200:
eva_error('lock_status error', r)
return r.json()
def name(self):
r = self.api_call_no_auth('GET', 'name')
if r.status_code != 200:
eva_error('api_versions request error', r)
return r.json()
def calc_pose_valid(self, joints, tcp_config=None):
body = {'joints': joints}
if tcp_config is not None:
body['tcp_config'] = tcp_config
r = self.api_call_with_auth('PUT', 'calc/pose_valid', json.dumps(body))
if r.status_code != 200:
eva_error('calc_pose_valid error', r)
return r.json()['pose']['valid']
def auth_renew_session(self):
self.__logger.debug('Renewing session token {}'.format(self.session_token))
# Bypass api_call_with_auth to avoid getting in a renewal loop
r = self.__api_request('POST', 'auth/renew')
if r.status_code == 401:
self.session_token = None
self.auth_create_session()
elif r.status_code != 204:
eva_error('auth_renew_session request error', r)
else:
self.__last_renew = time.time()
def data_snapshot_property(self, prop):
snapshot = self.data_snapshot()
if prop in snapshot:
return snapshot[prop]
else:
eva_error('data_snapshot_property request error, property {} not found'.format(prop))
def toolpaths_use(self, toolpathRepr):
r = self.api_call_with_auth('POST', 'toolpath/use', json.dumps({'toolpath': toolpathRepr}), timeout=300)
if r.status_code != 200:
eva_error('toolpaths_use error', r)
return
if timeout is not None:
timeoutT = time.time() + timeout
while True:
try:
self.lock_lock()
return
except Exception as e:
if not isinstance(e, EvaError):
raise e
pass
if timeout is not None:
if timeoutT < time.time():
eva_error('lock_wait_for timeout triggered')
time.sleep(interval_sec)