Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for cluster in clusters:
cmd = r'ps -ef | grep address\ {}'.format(cluster)
content = os.popen(cmd).read()
pattern = re.compile('--monitor_port (.*?)\n', re.S)
monitors = pattern.findall(content)
if len(monitors):
monitor_port, _, master_address = monitors[0].split(' ')
monitor_address = "{}:{}".format(get_ip_address(),
monitor_port)
socket = ctx.socket(zmq.REQ)
socket.setsockopt(zmq.RCVTIMEO, 10000)
socket.connect('tcp://{}'.format(master_address))
try:
socket.send_multipart([STATUS_TAG])
monitor_info = to_str(socket.recv_multipart()[1])
except zmq.error.Again as e:
click.echo(
'Can not connect to cluster {}, please try later.'.
format(master_address))
socket.close(0)
continue
msg = """
# Cluster {} {}
# If you want to check cluster status, please view: http://{}
""".format(master_address, monitor_info, monitor_address)
status.append(msg)
socket.close(0)
else:
msg = """
# Cluster {} fails to start the cluster monitor.
2. When the remote object is deleted, the job will quit and release
related computation resources.
Args:
reply_socket (sockert): main socket to accept commands of remote object.
job_address (String): address of reply_socket.
"""
while True:
message = reply_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.CALL_TAG:
try:
function_name = to_str(message[1])
data = message[2]
args, kwargs = loads_argument(data)
ret = getattr(obj, function_name)(*args, **kwargs)
ret = dumps_return(ret)
reply_socket.send_multipart(
[remote_constants.NORMAL_TAG, ret])
except Exception as e:
# reset the job
error_str = str(e)
logger.error(error_str)
if type(e) == AttributeError:
reply_socket.send_multipart([
def _create_job_monitor(self, job_heartbeat_socket):
"""Send heartbeat signals to check target's status"""
job_is_alive = True
while job_is_alive and self.client_is_alive:
try:
job_heartbeat_socket.send_multipart(
[remote_constants.HEARTBEAT_TAG])
job_message = job_heartbeat_socket.recv_multipart()
stop_job = to_str(job_message[1])
job_address = to_str(job_message[2])
if stop_job == 'True':
logger.error(
'Job {} exceeds max memory usage, will stop this job.'.
format(job_address))
self.lock.acquire()
self.actor_num -= 1
self.lock.release()
job_is_alive = False
else:
time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
except zmq.error.Again as e:
job_is_alive = False
self.lock.acquire()
self.actor_num -= 1
[remote_constants.CALL_TAG,
to_byte(attr), data])
message = self.job_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteDeserializeError(attr, error_str)
else:
self.job_shutdown = True
raise NotImplementedError()
def _create_job_monitor(self, job_heartbeat_socket):
"""Send heartbeat signals to check target's status"""
job_is_alive = True
while job_is_alive and self.client_is_alive:
try:
job_heartbeat_socket.send_multipart(
[remote_constants.HEARTBEAT_TAG])
job_message = job_heartbeat_socket.recv_multipart()
stop_job = to_str(job_message[1])
job_address = to_str(job_message[2])
if stop_job == 'True':
logger.error(
'Job {} exceeds max memory usage, will stop this job.'.
format(job_address))
self.lock.acquire()
self.actor_num -= 1
self.lock.release()
job_is_alive = False
else:
time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
except zmq.error.Again as e:
job_is_alive = False
self.lock.acquire()
self.command_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(attr), data])
message = self.command_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteDeserializeError(attr, error_str)
else:
raise NotImplementedError()
self.internal_lock.release()
return ret
attr, "This actor losts connection with the job.")
self.internal_lock.acquire()
data = dumps_argument(*args, **kwargs)
self.job_socket.send_multipart(
[remote_constants.CALL_TAG,
to_byte(attr), data])
message = self.job_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
self.job_socket.linger = 0
self.job_socket.connect("tcp://{}".format(job_address))
self.job_address = job_address
self.job_shutdown = False
self.send_file(self.job_socket)
self.job_socket.send_multipart([
remote_constants.INIT_OBJECT_TAG,
cloudpickle.dumps(cls),
cloudpickle.dumps([args, kwargs]),
])
message = self.job_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.EXCEPTION_TAG:
traceback_str = to_str(message[1])
self.job_shutdown = True
raise RemoteError('__init__', traceback_str)
def update_worker_status(self, update_status, worker_address, vacant_cpus,
total_cpus):
"""Update a worker status.
Args:
update_status (tuple): master status information (vacant_memory, used_memory, load_time, load_value).
worker_address (str): worker ip address.
vacant_cpus (int): vacant cpu number.
total_cpus (int): total cpu number.
"""
self.lock.acquire()
worker_status = self.status['workers'][worker_address]
worker_status['vacant_memory'] = float(to_str(update_status[1]))
worker_status['used_memory'] = float(to_str(update_status[2]))
worker_status['load_time'].append(to_str(update_status[3]))
worker_status['load_value'].append(float(update_status[4]))
worker_status['vacant_cpus'] = vacant_cpus
worker_status['used_cpus'] = total_cpus - vacant_cpus
self.lock.release()
def wrapper(*args, **kwargs):
self.internal_lock.acquire()
data = dumps_argument(*args, **kwargs)
self.command_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(attr), data])
message = self.command_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteDeserializeError(attr, error_str)
else:
raise NotImplementedError()
self.internal_lock.release()
return ret