Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_loop():
n_iterations = 5
x_init = np.random.rand(5, 1)
y_init = np.random.rand(5, 1)
# Make GPy model
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
space = ParameterSpace([ContinuousParameter('x', 0, 1)])
acquisition = ExpectedImprovement(model)
# Make loop and collect points
bo = BayesianOptimizationLoop(model=model, space=space, acquisition=acquisition)
bo.run_loop(UserFunctionWrapper(f), FixedIterationsStoppingCondition(n_iterations))
# Check we got the correct number of points
assert bo.loop_state.X.shape[0] == n_iterations + 5
# Check the obtained results
results = bo.get_results()
assert results.minimum_location.shape[0] == 1
assert results.best_found_value_per_iteration.shape[0] == n_iterations + 5
def test_user_function_wrapper_evaluation_single_output():
function = lambda x: 2 * x
function_input = np.array([[1], [2], [3]])
ufw = UserFunctionWrapper(function)
output = ufw.evaluate(function_input)
assert len(output) == function_input.shape[0]
for i, record in enumerate(output):
assert_array_equal(output[i].X, function_input[i])
assert_array_equal(output[i].Y, function(function_input[i]))
def test_loop():
n_iterations = 5
x_init = np.random.rand(5, 1)
y_init = np.random.rand(5, 1)
# Make GPy model
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
space = ParameterSpace([ContinuousParameter('x', 0, 1)])
acquisition = ModelVariance(model)
# Make loop and collect points
exp_design = ExperimentalDesignLoop(space, model, acquisition)
exp_design.run_loop(UserFunctionWrapper(f), FixedIterationsStoppingCondition(n_iterations))
# Check we got the correct number of points
assert exp_design.loop_state.X.shape[0] == 10
def test_user_function_too_many_outputs_outputs_fails():
function = lambda x: (2 * x, np.array([1]))
function_input = np.array([[1], [2], [3]])
ufw = UserFunctionWrapper(function)
with pytest.raises(ValueError):
ufw.evaluate(function_input)
def test_user_function_wrapper_invalid_input():
# invalid input
with pytest.raises(ValueError):
function = lambda x: 2 * x
function_input = np.array([1])
ufw = UserFunctionWrapper(function)
ufw.evaluate(function_input)
# invalid function output
with pytest.raises(ValueError):
function = lambda x: np.array([2])
function_input = np.array([[1]])
ufw = UserFunctionWrapper(function)
ufw.evaluate(function_input)
# invalid function output type
with pytest.raises(ValueError):
function = lambda x: [2]
function_input = np.array([[1]])
ufw = UserFunctionWrapper(function)
ufw.evaluate(function_input)
def test_user_function_too_few_outputs_outputs_fails():
function = lambda x: 2 * x
function_input = np.array([[1], [2], [3]])
ufw = UserFunctionWrapper(function, extra_output_names=['cost'])
with pytest.raises(ValueError):
ufw.evaluate(function_input)
def test_random_search_with_init_data():
np.random.seed(42)
branin_fcn, parameter_space = branin_function()
branin_fcn_with_cost = lambda x: (branin_fcn(x), np.zeros((x.shape[0], 1)))
# Ensure function returns a value for cost
wrapped_fcn = UserFunctionWrapper(branin_fcn_with_cost, extra_output_names=['cost'])
x_init = parameter_space.sample_uniform(5)
y_init = branin_fcn(x_init)
cost_init = np.ones([5, 1])
rs = RandomSearch(parameter_space, x_init=x_init, y_init=y_init, cost_init=cost_init)
rs.run_loop(wrapped_fcn, 5)
assert len(rs.loop_state.Y) == 10
assert len(rs.loop_state.X) == 10
assert len(rs.loop_state.cost) == 10
for it in range(n_init):
func_val, cost = func(x=grid[it], s=subsets[it % len(subsets)])
X_init[it] = np.concatenate((grid[it], np.array([subsets[it % len(subsets)]])))
Y_init[it] = func_val
cost_init[it] = cost
def wrapper(x):
y, c = func(x[0, :-1], np.exp(x[0, -1]))
return np.array([[y]]), np.array([[c]])
loop = FabolasLoop(X_init=X_init, Y_init=Y_init, cost_init=cost_init, space=space, s_min=s_min,
s_max=s_max, marginalize_hypers=marginalize_hypers)
loop.run_loop(user_function=UserFunctionWrapper(wrapper),
stopping_condition=FixedIterationsStoppingCondition(n_iters - n_init))
return loop.loop_state
def __init__(self, objective: Callable, input_domain: ParameterSpace) -> None:
"""
:param objective: python function in which the sensitivity analysis will be performed.
:param input_domain: parameter space.
"""
self.objective = UserFunctionWrapper(objective)
self.input_domain = input_domain
self.random_design = RandomDesign(self.input_domain)
function that takes in initial x and y training data and returns a loop to be
benchmarked
:param test_function: The function to benchmark the loop against
:param parameter_space: Parameter space describing the input domain of the function to be benchmarked against
:param metrics: List of metric objects that assess the performance of the loop at every iteration
:param initial_design: An object that returns a set of samples in the input domain that are used as the initial
data set
"""
self.loop_names = [loop[0] for loop in loops_with_names]
self.loops = [loop[1] for loop in loops_with_names]
if isinstance(test_function, UserFunction):
self.test_function = test_function
else:
self.test_function = UserFunctionWrapper(test_function)
self.parameter_space = parameter_space
if initial_design is None:
initial_design = RandomDesign(parameter_space)
self.initial_design = initial_design
self.metrics = metrics
self.metric_names = [metric.name for metric in metrics]
if len(set(self.metric_names)) != len(self.metric_names):
raise ValueError('Names of metrics are not unique')