Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_exp(flow_params, **kwargs):
alg_run, env_name, config = setup_rllib_exps(flow_params, 1, 1, **kwargs)
try:
ray.init(num_cpus=1)
except Exception as e:
print("ERROR", e)
config['train_batch_size'] = 50
config['horizon'] = 50
config['sample_batch_size'] = 50
config['num_workers'] = 0
config['sgd_minibatch_size'] = 32
run_experiments({
'test': {
'run': alg_run,
'env': env_name,
'config': {
**config
},
'checkpoint_freq': 1,
'stop': {
'training_iteration': 1,
},
def tune_run(config):
agent = config['env_config']['agent']
experiment, scheduler = get_tune_experiment(config, agent)
tune.run_experiments(experiment, scheduler=scheduler)
flow_json = json.dumps(
flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
config['env_config']['flow_params'] = flow_json
config['env_config']['run'] = alg_run
create_env, gym_name = make_create_env(params=flow_params, version=0)
# Register as rllib env
register_env(gym_name, create_env)
return alg_run, gym_name, config
if __name__ == "__main__":
alg_run, gym_name, config = setup_exps()
ray.init(num_cpus=N_CPUS + 1, redirect_output=False)
trials = run_experiments({
flow_params["exp_tag"]: {
"run": alg_run,
"env": gym_name,
"config": {
**config
},
"checkpoint_freq": 20,
"max_failures": 999,
"stop": {
"training_iteration": 200,
},
"upload_dir" : "s3://kanaad.experiments",
"num_samples": 1
}
def run(result_dir, nmaxepochs, nthreads):
# experiment = fft_experiment()
# experiment = fft_experiment_temp_annealing()
# experiment = fft_experiment_learn_perm()
# experiment = fft_experiment_block2x2()
# experiment = fft_experiment_blockperm()
experiment = fft_experiment_blockperm_transpose()
# We'll use multiple processes so disable MKL multithreading
os.environ['MKL_NUM_THREADS'] = str(nthreads)
ray.init()
ahb = AsyncHyperBandScheduler(reward_attr='negative_loss', max_t=nmaxepochs)
trials = run_experiments(experiment, scheduler=ahb, raise_on_failed_trial=False)
losses = [-trial.last_result['negative_loss'] for trial in trials]
# Polish solutions with L-BFGS
pool = mp.Pool()
sorted_trials = sorted(trials, key=lambda trial: -trial.last_result['negative_loss'])
# polished_losses = pool.map(polish_fft, sorted_trials[:N_TRIALS_TO_POLISH])
# polished_losses = pool.map(polish_fft_learn_perm, sorted_trials[:N_TRIALS_TO_POLISH])
# polished_losses = pool.map(polish_fft_block2x2, sorted_trials[:N_TRIALS_TO_POLISH])
# polished_losses = pool.map(polish_fft_blockperm, sorted_trials[:N_TRIALS_TO_POLISH])
polished_losses = pool.map(polish_fft_blockperm_transpose, sorted_trials[:N_TRIALS_TO_POLISH])
pool.close()
pool.join()
for i in range(min(N_TRIALS_TO_POLISH, len(trials))):
sorted_trials[i].last_result['polished_negative_loss'] = -polished_losses[i]
print(np.array(losses))
print(np.sort(losses))
flow_json = json.dumps(
flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
config['env_config']['flow_params'] = flow_json
config['env_config']['run'] = alg_run
create_env, gym_name = make_create_env(params=flow_params, version=0)
# Register as rllib env
register_env(gym_name, create_env)
return alg_run, gym_name, config
if __name__ == "__main__":
alg_run, gym_name, config = setup_exps()
ray.init(num_cpus=N_CPUS + 1, redirect_output=False)
trials = run_experiments({
flow_params["exp_tag"]: {
"run": alg_run,
"env": gym_name,
"config": {
**config
},
"checkpoint_freq": 20,
"checkpoint_at_end": True,
"max_failures": 999,
"stop": {
"training_iteration": 200,
},
#"soft_horizon": True,
"num_workers": 0,
# "remote_worker_envs": True,
"autoregressive": True,
"residual": True,
"imitation": True,
"model": {
#"max_seq_len": unroll_length,
"use_lstm": True,
"lstm_cell_size": 256,
"lstm_use_prev_action_reward": True,
"fcnet_hiddens": [fc_width] * fc_depth,
},
}
tune.run_experiments({
exp_name: {
"env": "",
# "env": imitation_env.ImitationEnv,
#"run": agents.impala.ImpalaTrainer,
"run": imitation_trainer.ImitationTrainer,
#"run": agents.a3c.A3CAgent,
#"run": agents.a3c.A2CAgent,
"checkpoint_freq": args.checkpoint_freq,
"config": config,
}},
resume=args.resume,
raise_on_failed_trial=True,
)
config["sgd_minibatch_size"] = 64
config["kl_target"] = 0.02
config["num_sgd_iter"] = 10
config["horizon"] = HORIZON
# save the flow params for replay
flow_json = json.dumps(
flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
config['env_config']['flow_params'] = flow_json
create_env, env_name = make_create_env(flow_params, version=0)
# Register as rllib env
register_env(env_name, create_env)
trials = run_experiments({
flow_params["exp_tag"]: {
"run": "PPO",
"env": "RLRampMeterEnv-v0",
"config": {
**config
},
"checkpoint_freq": 20,
"max_failures": 999,
"stop": {
"training_iteration": 400,
},
"num_samples": 1
}
config["stepsize"] = 0.01
config["observation_filter"] = "NoFilter"
config["model"] = {"custom_model": "pixel_flow_network",
"custom_options": {},}
# save the flow params for replay
flow_json = json.dumps(
flow_params, cls=FlowParamsEncoder, sort_keys=True, indent=4)
config['env_config']['flow_params'] = flow_json
create_env, env_name = make_create_env(params=flow_params, version=0)
# Register as rllib env
register_env(env_name, create_env)
trials = run_experiments({
flow_params["exp_tag"]: {
"run": "ES",
"env": env_name,
"config": {
**config
},
"checkpoint_freq": 10,
"max_failures": 999,
"stop": {
"training_iteration": 100,
},
"num_samples": 6,
},
"stop": {
"time_total_s": 12 * 60 * 60 # 12 hours
},
"env": "ship-gym-v1",
"config": {
"num_gpus": 1,
"num_workers" : multiprocessing.cpu_count() - 1,
"num_sgd_iter" : 10,
"sgd_minibatch_size" : 2048,
"train_batch_size" : 10000,
"lr_schedule" : [[0, 0.001], [5e6, 0.0001], [1e7, 0.00001]]
},
},
}
tune.register_env("ship-gym-v1", env_creator)
tune.run_experiments(experiments)
def run(self, experiments: Union[AllenNlpExperiment, List[AllenNlpExperiment]]):
if type(experiments) is not list:
experiments = [experiments]
ray_experiments = [experiment.to_ray_experiment() for experiment in experiments]
# TODO: this needs to be investigated further
time.sleep(5)
run_experiments(ray_experiments)