Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._fixed_delta_seconds = self._env_config["fixed_delta_seconds"]
# self.config["server_map"] = "/Game/Carla/Maps/" + args.map
# Initialize to be compatible with cam_manager to set HUD.
pygame.font.init() # for HUD
self._hud = HUD(self._render_x_res, self._render_y_res)
# Needed by macad_agents
if self._discrete_actions:
self.action_space = Dict({
actor_id: Discrete(len(DISCRETE_ACTIONS))
for actor_id in self._actor_configs.keys()
})
else:
self.action_space = Dict({
actor_id: Box(-1.0, 1.0, shape=(2, ))
for actor_id in self._actor_configs.keys()
})
if self._use_depth_camera:
self._image_space = Box(-1.0,
1.0,
shape=(self._y_res, self._x_res,
1 * self._framestack))
else: # Use RGB camera
self._image_space = Box(0.0,
255.0,
shape=(self._y_res, self._x_res,
3 * self._framestack))
if self._env_config["send_measurements"]:
self.observation_space = Dict({
append_goal_to_obs=False,
):
self.quick_init(locals())
super(FlatGoalEnv, self).__init__(wrapped_env)
if obs_keys is None:
obs_keys = ['observation']
if goal_keys is None:
goal_keys = ['desired_goal']
if append_goal_to_obs:
obs_keys += goal_keys
for k in obs_keys:
assert k in self.wrapped_env.observation_space.spaces
for k in goal_keys:
assert k in self.wrapped_env.observation_space.spaces
assert isinstance(self.wrapped_env.observation_space, Dict)
self.obs_keys = obs_keys
self.goal_keys = goal_keys
# TODO: handle nested dict
self.observation_space = Box(
np.hstack([
self.wrapped_env.observation_space.spaces[k].low
for k in obs_keys
]),
np.hstack([
self.wrapped_env.observation_space.spaces[k].high
for k in obs_keys
]),
)
self.goal_space = Box(
np.hstack([
config=teacher_config,
with_language=False,
vocab_sequence_length=self._seq_length,
use_image_observation=teacher_use_image_observation,
resized_image_size=resized_image_size,
image_with_internal_states=teacher_image_with_internal_states)
# setup action and observation space
if not self._demo_by_human:
self._teacher_control_space = self._teacher_embodied.get_control_space(
)
self._teacher_action_space = self._teacher_embodied.get_action_space(
)
teacher_observation_space = self._teacher_embodied.get_observation_space(
self._teacher)
self.action_space = gym.spaces.Dict(
learner=self.action_space, teacher=self._teacher_action_space)
self.observation_space = gym.spaces.Dict(
learner=self.observation_space,
teacher=teacher_observation_space)
def __init__(self, number_generators, number_consumers, number_power_lines, number_substations,
n_timesteps_horizon_maintenance):
self.number_productions = number_generators
self.number_loads = number_consumers
self.number_power_lines = number_power_lines
self.number_substations = number_substations
self.n_timesteps_horizon_maintenance = n_timesteps_horizon_maintenance
self.grid_number_of_elements = self.number_productions + self.number_loads + 2 * self.number_power_lines
dict_spaces = OrderedDict([
('MinimalistACObservation', Dict(OrderedDict([
('MinimalistObservation', Dict(OrderedDict([
('active_loads', Box(low=-np.inf, high=np.inf, shape=(number_consumers,), dtype=np.float32)),
('are_loads_cut', MultiBinary(n=number_consumers)),
('planned_active_loads', Box(low=-np.inf, high=np.inf, shape=(number_consumers,),
dtype=np.float32)),
('loads_nodes', Box(-np.inf, np.inf, (number_consumers,), np.int32)),
('active_productions', Box(low=-np.inf, high=np.inf, shape=(number_generators,), dtype=np.float32)),
('are_productions_cut', MultiBinary(n=number_generators)),
('planned_active_productions', Box(low=-np.inf, high=np.inf, shape=(number_generators,),
dtype=np.float32)),
('productions_nodes', Box(-np.inf, np.inf, (number_generators,), np.int32)),
('lines_or_nodes', Box(-np.inf, np.inf, (number_power_lines,), np.int32)),
('lines_ex_nodes', Box(-np.inf, np.inf, (number_power_lines,), np.int32)),
S = Tuple([Discrete(5),
Box(-1.0, 1.0, shape=(2, 3), dtype=np.float32),
Dict({'success': Discrete(2), 'velocity': Box(-1, 1, shape=(1, 3), dtype=np.float32)})])
sample = S.sample()
assert flatdim(S) == 5+2*3+2+3
assert flatten(S, sample).shape == (16,)
_sample = unflatten(S, flatten(S, sample))
assert sample[0] == _sample[0]
assert np.allclose(sample[1], _sample[1])
assert sample[2]['success'] == _sample[2]['success']
assert np.allclose(sample[2]['velocity'], _sample[2]['velocity'])
# Dict
D0 = Dict({'position': Box(-100, 100, shape=(3,), dtype=np.float32),
'velocity': Box(-1, 1, shape=(4,), dtype=np.float32)})
D = Dict({'sensors': D0, 'score': Discrete(100)})
sample = D.sample()
assert flatdim(D) == 3+4+100
assert flatten(D, sample).shape == (107,)
_sample = unflatten(D, flatten(D, sample))
assert sample['score'] == _sample['score']
assert np.allclose(sample['sensors']['position'], _sample['sensors']['position'])
assert np.allclose(sample['sensors']['velocity'], _sample['sensors']['velocity'])
self.viewer = None
self.state = None
self.steps_beyond_done = None
self.step_id = 0
self.frame_id = 0
self.last_reward = None
self.cum_reward = None
self.time_stamp = time.time()
self.step_times = np.zeros(25)
self.last_action = None
# setup observation space
self.observation_space = spaces.Dict({'perf': spaces.Box(0, 255, self.rl_pool.perf_shape, dtype=np.float32),
'score': spaces.Box(0, 255, self.rl_pool.score_shape, dtype=np.float32)})
if len(config['actions']) == 0:
self.action_space = spaces.Box(low=-128, high=128, shape=(1,), dtype=np.float32)
else:
self.action_space = spaces.Discrete(len(self.actions))
self.reward_range = (-1, 1)
self.obs_image = None
self.prev_reward = 0.0
self.debug_info = {'song_history': self.rl_pool.get_song_history()}
self.reward = Reward(config['reward_name'], threshold=self.score_dist_threshold, pool=self.rl_pool,
params=config['reward_params'])
# resize factors for rendering
self.resz_spec = 4
# TODO: use EnvSpec?
# length of a sequence
self.tags = {'max_episode_steps': max_episode_steps}
self.logger = logger or logging.getLogger(__name__)
# starting positing of the pen for each episode
self.tile_offset = mypaintlib.TILE_SIZE
self.start_x = start_x
self.start_color = (0, 0, 0) # initial color of the brush
self.imsize = imsize
self.pos_resolution = pos_resolution
# action space
self.action_space = spaces.Dict({
'position':
spaces.Discrete(self.pos_resolution**2),
'pressure':
spaces.Box(low=0, high=1.0, shape=(), dtype=float),
'color':
spaces.Box(low=0, high=1.0, shape=(3, ), dtype=float),
'prob':
spaces.Discrete(2)
})
# observation space
self.observation_space = spaces.Dict({
'image':
spaces.Box(low=0,
high=255,
shape=(self.imsize, self.imsize, 3),
(history_len * np.ones((space.nvec.shape[0], 1)), space.nvec)
)
result = spaces.MultiDiscrete(nvec)
elif isinstance(space, spaces.Box):
result = spaces.Box(
low=_extend_to_history_len(space.low),
high=_extend_to_history_len(space.high),
# shape=(history_len,) + space.shape,
dtype=space.dtype
)
elif isinstance(space, spaces.Tuple):
result = []
for value in space.spaces:
result.append(extend_space(value, history_len))
result = spaces.Tuple(result)
elif isinstance(space, spaces.Dict):
result = OrderedDict()
for key, value in space.spaces.items():
result[key] = extend_space(value, history_len)
result = spaces.Dict(result)
else:
raise NotImplementedError("not yet implemented")
return result
def observation_space(self):
return spaces.Dict({
str(k): self._documents[k].observation_space()
for k in self._documents.keys()
})
self.reward_type = reward_type
self.norm_order = norm_order
self.indicator_threshold = indicator_threshold
self.fix_goal = fix_goal
self.fixed_goal = np.array(fixed_goal)
self._state_goal = None
self.hide_goal_markers = hide_goal_markers
self.action_space = Box(np.array([-1, -1, -1]), np.array([1, 1, 1]), dtype=np.float32)
self.hand_space = Box(self.hand_low, self.hand_high, dtype=np.float32)
self.observation_space = Box(
np.hstack((self.hand_low, self.hand_low, 0)),
np.hstack((self.hand_high, self.hand_high, 1)),
dtype=np.float32
)
self.observation_dict = Dict([
('observation', self.hand_space),
('desired_goal', self.hand_space),
('achieved_goal', self.hand_space),
('state_observation', self.hand_space),
('state_desired_goal', self.hand_space),
('state_achieved_goal', self.hand_space),
('proprio_observation', self.hand_space),
('proprio_desired_goal', self.hand_space),
('proprio_achieved_goal', self.hand_space),
])
self.reset()