Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Raises:
YARLError: If a graph_fn with the given name cannot be found in the component.
"""
# The component object that the method belongs to.
self.component = component
self.flatten_ops = flatten_ops
self.split_ops = split_ops
self.add_auto_key_as_first_param = add_auto_key_as_first_param
if isinstance(method, str):
self.name = method
self.method = getattr(self.component, "_graph_fn_" + method, None)
if not self.method:
raise YARLError("ERROR: No `_graph_fn_...` method with name '{}' found!".format(method))
else:
self.method = method
self.name = re.sub(r'^_graph_fn_', "", method.__name__)
# List of lists of op-records that get passed through this graph_fn.
self.inputs = list()
# List of lists of op-records that come out of this graph_fn.
self.outputs = list()
# Whether we have all necessary input-sockets for passing at least one input-op combination through
# our computation method. As long as this is False, we return prematurely and wait for more ops to come in
# (through other Sockets).
self.input_complete = False
# Registry for which incoming Sockets' op-records we have already passed through the graph_fn to generate
# which output op-records.
# key=tuple of input-op-records (len==number of input params).
if shape is None or shape == ():
if isinstance(low, (int, float)) and isinstance(high, (int, float)):
assert low < high
self.low = float(low)
self.high = float(high)
self.is_scalar = True
elif low is None:
assert high is None
self.has_unknown_bounds = True
self.has_flex_bounds = True
self.is_scalar = True
self.low = float("inf")
self.high = float("-inf")
else:
if shape == ():
raise YARLError("ERROR: Shape cannot be () if low and/or high are given as shape-tuples!")
self.low = np.array(low)
self.high = np.array(high)
assert self.low.shape == self.high.shape
# A box (R^n) (may be bounded).
else:
if low is None:
assert high is None
self.has_unknown_bounds = True
self.has_flex_bounds = True
self.low = np.zeros(shape)
self.high = np.zeros(shape)
else:
assert np.isscalar(low) and np.isscalar(high)
self.low = low + np.zeros(shape)
self.high = high + np.zeros(shape)
in_space = input_spaces["nn_output"] # type: Space
# Must not be ContainerSpace (not supported yet for NNLayers, doesn't seem to make sense).
assert not isinstance(in_space, ContainerSpace), "ERROR: Cannot handle container input Spaces " \
"in NNOutputCleanup '{}' (atm; may soon do)!".format(self.name)
# Check action/target Space.
self.target_space = action_space.with_batch_rank()
assert self.target_space.has_batch_rank, "ERROR: `self.target_space ` does not have batch rank!"
if not isinstance(self.target_space, IntBox):
raise YARLError("ERROR: `target_space` must be IntBox. Continuous target spaces will be supported later!")
# Discrete action space. Make sure, all dimensions have the same bounds and the lower bound is 0.
if self.target_space.global_bounds is False:
raise YARLError("ERROR: `target_space` must not have individual lower and upper bounds!")
elif self.target_space.num_categories is None or self.target_space.num_categories == 0:
raise YARLError("ERROR: `target_space` must have a `num_categories` of larger 0!")
# Make sure target_space matches NN output space.
self.flat_dim_target_space = self.target_space.flat_dim_with_categories
# NN output may have a batch-rank inferred or not (its first rank may be '?' or some memory-batch number).
# Hence, always assume first rank to be batch.
#flat_dim_nn_output = in_space.flat_dim if in_space.has_batch_rank else np.product(in_space.get_shape()[1:])
#assert flat_dim_nn_output == flat_dim_target_space, \
# "ERROR: `flat_dim_target_space` ({}) must match `flat_dim_nn_output` " \
# "({})!".format(flat_dim_target_space, flat_dim_nn_output)
# Do some remaining interface assembly.
self.last_nn_layer = DenseLayer(
units=self.flat_dim_target_space,
biases_spec=self.biases_spec if np.isscalar(self.biases_spec) or self.biases_spec is None else
[log(b) for _ in range_(self.target_space.flat_dim) for b in self.biases_spec]
)
def check_input_spaces(self, input_spaces, action_space):
in_space = input_spaces["nn_output"] # type: Space
# Must not be ContainerSpace (not supported yet for NNLayers, doesn't seem to make sense).
assert not isinstance(in_space, ContainerSpace), "ERROR: Cannot handle container input Spaces " \
"in NNOutputCleanup '{}' (atm; may soon do)!".format(self.name)
# Check action/target Space.
self.target_space = action_space.with_batch_rank()
assert self.target_space.has_batch_rank, "ERROR: `self.target_space ` does not have batch rank!"
if not isinstance(self.target_space, IntBox):
raise YARLError("ERROR: `target_space` must be IntBox. Continuous target spaces will be supported later!")
# Discrete action space. Make sure, all dimensions have the same bounds and the lower bound is 0.
if self.target_space.global_bounds is False:
raise YARLError("ERROR: `target_space` must not have individual lower and upper bounds!")
elif self.target_space.num_categories is None or self.target_space.num_categories == 0:
raise YARLError("ERROR: `target_space` must have a `num_categories` of larger 0!")
# Make sure target_space matches NN output space.
self.flat_dim_target_space = self.target_space.flat_dim_with_categories
# NN output may have a batch-rank inferred or not (its first rank may be '?' or some memory-batch number).
# Hence, always assume first rank to be batch.
#flat_dim_nn_output = in_space.flat_dim if in_space.has_batch_rank else np.product(in_space.get_shape()[1:])
#assert flat_dim_nn_output == flat_dim_target_space, \
# "ERROR: `flat_dim_target_space` ({}) must match `flat_dim_nn_output` " \
# "({})!".format(flat_dim_target_space, flat_dim_nn_output)
def check_input_spaces(self, input_spaces, action_space):
in_space = input_spaces["nn_output"] # type: Space
# Must not be ContainerSpace (not supported yet for NNLayers, doesn't seem to make sense).
assert not isinstance(in_space, ContainerSpace), "ERROR: Cannot handle container input Spaces " \
"in NNOutputCleanup '{}' (atm; may soon do)!".format(self.name)
# Check action/target Space.
self.target_space = action_space.with_batch_rank()
assert self.target_space.has_batch_rank, "ERROR: `self.target_space ` does not have batch rank!"
if not isinstance(self.target_space, IntBox):
raise YARLError("ERROR: `target_space` must be IntBox. Continuous target spaces will be supported later!")
# Discrete action space. Make sure, all dimensions have the same bounds and the lower bound is 0.
if self.target_space.global_bounds is False:
raise YARLError("ERROR: `target_space` must not have individual lower and upper bounds!")
elif self.target_space.num_categories is None or self.target_space.num_categories == 0:
raise YARLError("ERROR: `target_space` must have a `num_categories` of larger 0!")
# Make sure target_space matches NN output space.
self.flat_dim_target_space = self.target_space.flat_dim_with_categories
# NN output may have a batch-rank inferred or not (its first rank may be '?' or some memory-batch number).
# Hence, always assume first rank to be batch.
#flat_dim_nn_output = in_space.flat_dim if in_space.has_batch_rank else np.product(in_space.get_shape()[1:])
#assert flat_dim_nn_output == flat_dim_target_space, \
# "ERROR: `flat_dim_target_space` ({}) must match `flat_dim_nn_output` " \
# "({})!".format(flat_dim_target_space, flat_dim_nn_output)
# Do some remaining interface assembly.
self.last_nn_layer = DenseLayer(
units=self.flat_dim_target_space,
biases_spec=self.biases_spec if np.isscalar(self.biases_spec) or self.biases_spec is None else