Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _predict_random_if_unfit(self, X, output_score):
warnings.warn("Model object has not been fit to data, predictions will be random.")
X = _check_X_input(X)
pred = self._name_arms(np.random.randint(self.nchoices, size = X.shape[0]))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : (1.0 / self.nchoices) * np.ones(size = X.shape[0], dtype = "float64")}
Returns
-------
pred : array (n_samples,) or dict("choice" : array(n_samples,), "score" : array(n_samples,))
Actions chosen by the policy. If passing output_score=True, it will be a dictionary
with the chosen arm and the score that the arm got following this policy with the classifiers used.
"""
if not self.is_fitted:
return _BasePolicy._predict_random_if_unfit(self, X, output_score)
X = _check_X_input(X)
pred = np.zeros((X.shape[0], self.nchoices))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict)(choice, pred, exploit, X) for choice in range(self.nchoices))
if output_score:
score_max = np.max(pred, axis=1)
pred = _BasePolicy._name_arms(self, np.argmax(pred, axis = 1))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : score_max}
New observations for which to choose an action according to this policy.
exploit : bool
Whether to make a prediction according to the policy, or to just choose the
arm with the highest expected reward according to current models.
output_score : bool
Whether to output the score that this method predicted, in case it is desired to use
it with this pakckage's offpolicy and evaluation modules.
Returns
-------
pred : array (n_samples,) or dict("choice" : array(n_samples,), "score" : array(n_samples,))
Actions chosen by the policy. If passing output_score=True, it will be a dictionary
with the chosen arm and the score that the arm got following this policy with the classifiers used.
"""
if not self.is_fitted:
return _BasePolicy._predict_random_if_unfit(self, X, output_score)
X = _check_X_input(X)
pred = np.zeros((X.shape[0], self.nchoices))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict)(choice, pred, exploit, X) for choice in range(self.nchoices))
if output_score:
score_max = np.max(pred, axis=1)
pred = _BasePolicy._name_arms(self, np.argmax(pred, axis = 1))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : score_max}
if not self.is_fitted:
return self._predict_random_if_unfit(X, False)
X = _check_X_input(X)
pred = self._oracles.decision_function(X)
if not exploit:
change_greedy = np.random.random(size=X.shape[0]) <= self.explore_prob
if change_greedy.sum() > 0:
pred[change_greedy, :] = self._crit_active(X[change_greedy, :], pred[change_greedy, :], gradient_calc)
if self.decay is not None:
self.explore_prob *= self.decay ** X.shape[0]
return self._name_arms(np.argmax(pred, axis = 1))
class SoftmaxExplorer(_BasePolicy):
"""
SoftMax Explorer
Selects an action according to probabilites determined by a softmax transformation
on the scores from the decision function that predicts each class.
Note
----
Will apply an inverse sigmoid transformations to the probabilities that come from the base algorithm
before applying the softmax function.
Parameters
----------
base_algorithm : obj
Base binary classifier for which each sample for each class will be fit.
Actions chosen by the policy. If passing output_score=True, it will be a dictionary
with the chosen arm and the score that the arm got following this policy with the classifiers used.
"""
if not self.is_fitted:
return self._predict_random_if_unfit(X, output_score)
scores = self.decision_function(X)
pred = self._name_arms(np.argmax(scores, axis = 1))
if not output_score:
return pred
else:
score_max = np.max(scores, axis=1).reshape((-1, 1))
return {"choice" : pred, "score" : score_max}
class EpsilonGreedy(_BasePolicy):
"""
Epsilon Greedy
Takes a random action with probability p, or the action with highest
estimated reward with probability 1-p.
Parameters
----------
base_algorithm : obj
Base binary classifier for which each sample for each class will be fit.
Will look for, in this order:
1) A 'predict_proba' method with outputs (n_samples, 2), values in [0,1], rows suming to 1
2) A 'decision_function' method with unbounded outputs (n_samples,) to which it will apply a sigmoid function.
3) A 'predict' method with outputs (n_samples,) with values in [0,1].
Can also pass a list with a different (or already-fit) classifier for each arm.
nchoices : int or list-like
X = _check_X_input(X)
if not self.is_fitted:
raise ValueError("Object has not been fit to data.")
return self._oracles.decision_function(X)
def _predict_random_if_unfit(self, X, output_score):
warnings.warn("Model object has not been fit to data, predictions will be random.")
X = _check_X_input(X)
pred = self._name_arms(np.random.randint(self.nchoices, size = X.shape[0]))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : (1.0 / self.nchoices) * np.ones(size = X.shape[0], dtype = "float64")}
class _BasePolicyWithExploit(_BasePolicy):
def _add_bootstrapped_inputs(self, base_algorithm, batch_sample_method, nsamples, njobs_samples, percentile):
assert (batch_sample_method == 'gamma') or (batch_sample_method == 'poisson')
assert isinstance(nsamples, int)
assert nsamples >= 2
self.batch_sample_method = batch_sample_method
self.nsamples = nsamples
self.njobs_samples = _check_njobs(njobs_samples)
if "predict_proba" in dir(base_algorithm):
self.base_algorithm = _BootstrappedClassifier_w_predict_proba(
base_algorithm, self.nsamples, percentile,
self.batch_train, self.batch_sample_method, njobs = self.njobs_samples
)
elif "decision_function" in dir(base_algorithm):
self.base_algorithm = _BootstrappedClassifier_w_decision_function(
base_algorithm, self.nsamples, percentile,
self.batch_train, self.batch_sample_method, njobs = self.njobs_samples
r_more_onehalf = r_node >= .5
y = ( np.in1d(a_node, self.tree.node_comparisons[classif][2]) ).astype('uint8')
y_node = y.copy()
y_node[r_more_onehalf] = 1 - y[r_more_onehalf]
w_node = (.5 - r_node) / p_node
w_node[r_more_onehalf] = ( (r_node - .5) / p_node )[r_more_onehalf]
w_node = w_node * w_node.shape[0] / np.sum(w_node)
if y_node.shape[0] == 0:
self._oracles[classif] = _RandomPredictor()
elif y_node.sum() == y_node.shape[0]:
self._oracles[classif] = _OnePredictor()
elif y_node.sum() == 0:
self._oracles[classif] = _ZeroPredictor()
else:
self._oracles[classif].fit(X_node, y_node, sample_weight = w_node)
def _fit_single(self, sample, ix_take_all, X, y):
ix_take = ix_take_all[:, sample]
xsample = X[ix_take, :]
ysample = y[ix_take]
nclass = ysample.sum()
if not self.partialfit:
if nclass == ysample.shape[0]:
self.bs_algos[sample] = _OnePredictor()
return None
elif nclass == 0:
self.bs_algos[sample] = _ZeroPredictor()
return None
self.bs_algos[sample].fit(xsample, ysample)
def _full_fit_single(self, choice, X, a, r):
yclass, this_choice = self._filter_arm_data(X, a, r, choice)
n_pos = yclass.sum()
if self.smooth is not None:
self.counters[0, choice] += yclass.shape[0]
if (n_pos < self.thr) or ((yclass.shape[0] - n_pos) < self.thr):
if not self.force_fit:
self.algos[choice] = _BetaPredictor(self.alpha + n_pos, self.beta + yclass.shape[0] - n_pos)
return None
if n_pos == 0:
if not self.force_fit:
self.algos[choice] = _ZeroPredictor()
return None
if n_pos == yclass.shape[0]:
if not self.force_fit:
self.algos[choice] = _OnePredictor()
return None
xclass = X[this_choice, :]
self.algos[choice].fit(xclass, yclass)
if self.force_counters or (self.thr > 0 and not self.force_fit):
self._update_beta_counters(yclass, choice)
Parameters
----------
X : array (n_samples, n_features)
Matrix of covariates for the available data.
a : array (n_samples), int type
Arms or actions that were chosen for each observations.
r : array (n_samples), {0,1}
Rewards that were observed for the chosen actions. Must be binary rewards 0/1.
p : array (n_samples)
Reward estimates for the actions that were chosen by the policy.
"""
try:
from costsensitive import RegressionOneVsRest, WeightedAllPairs
except:
raise ValueError("This functionality requires package 'costsensitive'.\nCan be installed with 'pip install costsensitive'.")
p = _check_1d_inp(p)
assert p.shape[0] == X.shape[0]
l = -r
if type(self.reward_estimator) == np.ndarray:
C = self.reward_estimator
elif 'predict_proba_separate' in dir(self.reward_estimator):
C = -self.reward_estimator.predict_proba_separate(X)
elif 'predict_proba' in dir(self.reward_estimator):
reward_estimator = SeparateClassifiers(self.reward_estimator, self.nchoices, beta_prior = self.beta_prior, smoothing = self.smoothing)
reward_estimator.fit(X, a, r)
C = -reward_estimator.predict_proba_separate(X)
else:
raise ValueError("Error: couldn't obtain reward estimates. Are you passing the right input to 'reward_estimator'?")
if self.handle_invalid:
C[C == 1] = np.random.beta(3, 1, size = C.shape)[C == 1]