Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# If we're not syncing to the cloud, we're done
if not self._cloud:
wandb.termlog("You can sync this run to the cloud by running: ")
wandb.termlog("wandb sync %s" % os.path.relpath(self._run.dir))
sys.exit(exitcode)
elif exitcode != 0 and crash_nosync_time and time.time() - START_TIME < crash_nosync_time:
wandb.termlog("Process crashed early, not syncing files")
logger.info("process only ran for %d seconds, not syncing files" % (time.time() - START_TIME))
sys.exit(exitcode)
# Show run summary/history
self._run.summary.load()
summary = self._run.summary._json_dict
if len(summary):
logger.info("rendering summary")
wandb.termlog('Run summary:')
max_len = max([len(k) for k in summary.keys()])
format_str = ' {:>%s} {}' % max_len
for k, v in summary.items():
# arrays etc. might be too large. for now we just don't print them
if isinstance(v, six.string_types):
if len(v) >= 20:
v = v[:20] + '...'
wandb.termlog(format_str.format(k, v))
elif isinstance(v, numbers.Number):
wandb.termlog(format_str.format(k, v))
self._run.history.load()
history_keys = self._run.history.keys()
# Only print sparklines if the terminal is utf-8
# In some python 2.7 tests sys.stdout is a 'cStringIO.StringO' object
# which doesn't have the attribute 'encoding'
parent.job_type = "parent"
parent.save()
#TODO: maybe call save
os.environ[env.RUN_GROUP] = run.data.tags["mlflow.parentRunId"]
os.environ[env.JOB_TYPE] = "child"
project = os.getenv(env.PROJECT, client.get_experiment(run.info.experiment_id).name)
config = run.data.tags
config["mlflow.tracking_uri"] = mlflow.get_tracking_uri()
config["mlflow.experiment_id"] = run.info.experiment_id
wandb_run = RUNS.get(run.info.run_id)
if wandb_run is None:
wandb_run = wandb.init(id=run.info.run_id, project=project,
name=name, config=config, reinit=True)
wandb.termlog("Syncing MLFlow metrics, params, and artifacts to: %s" %
wandb_run.get_url().split("/runs/")[0], repeat=False, force=True)
wandb_run.config._set_wandb('mlflow_version', mlflow.__version__)
RUNS[wandb_run.id] = {"step": 0, "last_log": time.time(), "run": wandb_run}
return wandb_run
except Exception as e:
wandb.termerror("Failed to intialize wandb, disable by setting WANDB_SYNC_MLFLOW=false", force=True)
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
print('\n'.join(lines))
if exitcode == 0:
self._meta.data["state"] = "finished"
elif exitcode == 255:
self._meta.data["state"] = "killed"
else:
self._meta.data["state"] = "failed"
# TODO(adrian): these can be slow to complete (due to joining?)
logger.info("closing log streams and sending exitcode to W&B")
self._close_stdout_stderr_streams()
self.shutdown(exitcode)
crash_nosync_time = wandb_env.get_crash_nosync_time(self.CRASH_NOSYNC_TIME)
# If we're not syncing to the cloud, we're done
if not self._cloud:
wandb.termlog("You can sync this run to the cloud by running: ")
wandb.termlog("wandb sync %s" % os.path.relpath(self._run.dir))
sys.exit(exitcode)
elif exitcode != 0 and crash_nosync_time and time.time() - START_TIME < crash_nosync_time:
wandb.termlog("Process crashed early, not syncing files")
logger.info("process only ran for %d seconds, not syncing files" % (time.time() - START_TIME))
sys.exit(exitcode)
# Show run summary/history
self._run.summary.load()
summary = self._run.summary._json_dict
if len(summary):
logger.info("rendering summary")
wandb.termlog('Run summary:')
max_len = max([len(k) for k in summary.keys()])
format_str = ' {:>%s} {}' % max_len
for k, v in summary.items():
def write_netrc(host, entity, key):
"""Add our host and key to .netrc"""
key_prefix, key_suffix = key.split('-', 1) if '-' in key else ('', key)
if len(key_suffix) != 40:
wandb.termlog('API-key must be exactly 40 characters long: {} ({} chars)'.format(key_suffix, len(key_suffix)))
return None
try:
normalized_host = host.split("/")[-1].split(":")[0]
wandb.termlog("Appending key for {} to your netrc file: {}".format(normalized_host, os.path.expanduser('~/.netrc')))
machine_line = 'machine %s' % normalized_host
path = os.path.expanduser('~/.netrc')
orig_lines = None
try:
with open(path) as f:
orig_lines = f.read().strip().split('\n')
except (IOError, OSError) as e:
pass
with open(path, 'w') as f:
if orig_lines:
# delete this machine from the file if it's already there.
skip = 0
def _update(self):
try:
config_dict = util.load_yaml(open(self.file_path))
except yaml.parser.ParserError:
wandb.termlog(
"Unable to parse config file; probably being modified by user process?")
return
# TODO(adrian): ensure the file content will exactly match Bucket.config
# ie. push the file content as a string
self._api.upsert_run(id=self._run.storage_id, config=config_dict)
self._file_pusher.file_changed(self.save_name, self.file_path)
self._last_sent = time.time()
else:
wandb.termlog("Checking out %s in detached mode" % commit)
api.git.repo.git.checkout(commit)
if patch_path:
# we apply the patch from the repository root so git doesn't exclude
# things outside the current directory
root = api.git.root
patch_rel_path = os.path.relpath(patch_path, start=root)
# --reject is necessary or else this fails any time a binary file
# occurs in the diff
# we use .call() instead of .check_call() for the same reason
# TODO(adrian): this means there is no error checking here
subprocess.call(['git', 'apply', '--reject',
patch_rel_path], cwd=root)
wandb.termlog("Applied patch")
# TODO: we should likely respect WANDB_DIR here.
util.mkdir_exists_ok("wandb")
config = Config(run_dir="wandb")
config.load_json(json_config)
config.persist()
wandb.termlog("Restored config variables to %s" % config._config_path())
if image:
if not metadata["program"].startswith("<") and metadata.get("args") is not None:
# TODO: we may not want to default to python here.
runner = util.find_runner(metadata["program"]) or ["python"]
command = runner + [metadata["program"]] + metadata["args"]
cmd = " ".join(command)
else:
wandb.termlog("Couldn't find original command, just restoring environment")
cmd = None
# arrays etc. might be too large. for now we just don't print them
if isinstance(v, six.string_types):
if len(v) >= 20:
v = v[:20] + '...'
wandb.termlog(format_str.format(k, v))
elif isinstance(v, numbers.Number):
wandb.termlog(format_str.format(k, v))
self._run.history.load()
history_keys = self._run.history.keys()
# Only print sparklines if the terminal is utf-8
# In some python 2.7 tests sys.stdout is a 'cStringIO.StringO' object
# which doesn't have the attribute 'encoding'
if len(history_keys) and hasattr(sys.stdout, 'encoding') and sys.stdout.encoding == "UTF_8":
logger.info("rendering history")
wandb.termlog('Run history:')
max_len = max([len(k) for k in history_keys])
for key in history_keys:
vals = util.downsample(self._run.history.column(key), 40)
if any((not isinstance(v, numbers.Number) for v in vals)):
continue
line = sparkline.sparkify(vals)
format_str = u' {:>%s} {}' % max_len
wandb.termlog(format_str.format(key, line))
wandb_files = set([save_name for save_name in self._file_pusher.files() if util.is_wandb_file(save_name)])
media_files = set([save_name for save_name in self._file_pusher.files() if save_name.startswith('media')])
other_files = set(self._file_pusher.files()) - wandb_files - media_files
logger.info("syncing files to cloud storage")
if other_files:
wandb.termlog('Syncing files in %s:' % os.path.relpath(self._run.dir))
for save_name in sorted(other_files):
try:
self.proc.kill()
except OSError:
pass
"""TODO(adrian): garbage that appears in the logs sometimes
Exception ignored in: >
Traceback (most recent call last):
File "/Users/adrian/.pyenv/versions/3.6.0/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 760, in __del__
AttributeError: 'NoneType' object has no attribute 'warn'
"""
if exitcode is None:
exitcode = 254
wandb.termlog(
'Killing program failed; syncing files anyway. Press ctrl-c to abort syncing.')
else:
if exitcode == 0:
wandb.termlog('Program ended successfully.')
resume_path = os.path.join(wandb.wandb_dir(), wandb_run.RESUME_FNAME)
if os.path.exists(resume_path):
os.remove(resume_path)
else:
wandb.termlog(
'Program failed with code %d. Press ctrl-c to abort syncing.' % exitcode)
self._meta.data["exitcode"] = exitcode
if exitcode == 0:
self._meta.data["state"] = "finished"
elif exitcode == 255:
self._meta.data["state"] = "killed"
def download_h5(run_id, entity=None, project=None, out_dir=None):
api = Api()
meta = api.download_url(project or api.settings(
"project"), DEEP_SUMMARY_FNAME, entity=entity or api.settings("entity"), run=run_id)
if meta and 'md5' in meta and meta['md5'] is not None:
# TODO: make this non-blocking
wandb.termlog("Downloading summary data...")
path, res = api.download_write_file(meta, out_dir=out_dir)
return path