Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_validate_against_shap(self):
# Validate our explainer against shap library directly
X, y = shap.datasets.adult()
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.02, random_state=7)
# Fit several classifiers
tree_classifiers = [create_sklearn_random_forest_classifier(x_train, y_train)]
non_tree_classifiers = [create_sklearn_logistic_regressor(x_train, y_train)]
tree_regressors = [create_sklearn_random_forest_regressor(x_train, y_train)]
non_tree_regressors = [create_sklearn_linear_regressor(x_train, y_train)]
# For each model, validate we get the same results as calling shap directly
test_logger.info("Running tree classifiers in test_validate_against_shap")
for model in tree_classifiers:
# Run shap directly for comparison
exp = shap.TreeExplainer(model)
explanation = exp.shap_values(x_test)
shap_overall_imp = get_shap_imp_classification(explanation)
overall_imp = tabular_explainer_imp(model, x_train, x_test)
validate_correlation(overall_imp, shap_overall_imp, 0.95)
def test_explain_model_local_pytorch_classification(self, tabular_explainer):
X, y = shap.datasets.adult()
x_train, x_test, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=7)
# Fit a DNN pytorch model
model = create_pytorch_classifier(x_train.values, y_train)
test_logger.info('Running explain local for test_explain_model_local_keras_classification')
self._explain_model_local_dnn_classification_common(tabular_explainer, model, x_train,
x_test, y_train, X.columns.values)
def verify_explain_model_subset_classification_dense(self, is_local=True,
true_labels_required=False):
# Verify explaining a subset of the features
X, y = shap.datasets.adult()
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.01, random_state=7)
# Fit a tree model
model = create_sklearn_logistic_regressor(x_train, y_train)
# Create tabular explainer
classes = [" <=50K", " >50K"]
explainer = self.create_explainer(model, x_train, features=list(range(x_train.shape[1])), classes=classes)
self.test_logger.info('Running explain global for verify_explain_model_subset_classification_dense')
# Get most important features
if true_labels_required:
o16n_explanation = explainer.explain_global(x_test, y_test)
else:
o16n_explanation = explainer.explain_global(x_test)
ranked_global_names = o16n_explanation.get_ranked_global_names()
column_subset = ranked_global_names[:5]
# Run explain model again but this time only on the feature subset and on a single row
def test_explain_model_random_forest_classification(self, tabular_explainer):
X, y = shap.datasets.adult()
x_train, x_test, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=7)
# Fit a tree model
model = create_sklearn_random_forest_classifier(x_train, y_train)
# Create tabular explainer
exp = tabular_explainer(model, x_train, features=X.columns.values)
test_logger.info('Running explain global for test_explain_model_random_forest_classification')
explanation = exp.explain_global(x_test)
self.verify_adult_overall_features(explanation.get_ranked_global_names(),
explanation.get_ranked_global_values())
self.verify_adult_per_class_features(explanation.get_ranked_per_class_names(),
explanation.get_ranked_per_class_values())
self.verify_top_rows_local_features_with_and_without_top_k(explanation,
self.adult_local_features_first_three_rf,
is_classification=True, top_rows=3)
def test_front_page_model_agnostic():
import sklearn
import shap
from sklearn.model_selection import train_test_split
# print the JS visualization code to the notebook
shap.initjs()
# train a SVM classifier
X_train, X_test, Y_train, Y_test = train_test_split(*shap.datasets.iris(), test_size=0.2, random_state=0)
svm = sklearn.svm.SVC(kernel='rbf', probability=True)
svm.fit(X_train, Y_train)
# use Kernel SHAP to explain test set predictions
explainer = shap.KernelExplainer(svm.predict_proba, X_train, nsamples=100, link="logit")
shap_values = explainer.shap_values(X_test)
# plot the SHAP values for the Setosa output of the first instance
shap.force_plot(shap_values[0][0, :], X_test.iloc[0, :], link="logit")
# SV machine with a linear kernel
svc_linear = sklearn.svm.SVC(kernel='linear', probability=True)
svc_linear.fit(X_train, Y_train)
v = 100*np.sum(svc_linear.predict(X_test) == Y_test)/len(Y_test)
print("Accuracy = {0}%".format(v))
# Explain all the predictions in the test set
shapexplainer = KernelExplainer(svc_linear.predict_proba, X_train)
shap_values = shapexplainer.explain_instance(X_test)
print('svc X_test')
print(shap_values)
print(shapexplainer.explainer.expected_value[0])
print(shap_values[0])
np.random.seed(1)
X,y = shap.datasets.adult()
X_train, X_valid, y_train, y_valid = sklearn.model_selection.train_test_split(X, y, test_size=0.2, random_state=7)
knn = sklearn.neighbors.KNeighborsClassifier()
knn.fit(X_train, y_train)
f = lambda x: knn.predict_proba(x)[:,1]
med = X_train.median().values.reshape((1,X_train.shape[1]))
shapexplainer = KernelExplainer(f, med)
shap_values_single = shapexplainer.explain_instance(X.iloc[0,:], nsamples=1000)
print('Shap Tabular Example')
print(shapexplainer.explainer.expected_value)
print(shap_values_single)
print("Invoked Shap KernelExplainer")
def test_kernel_shap_with_a1a_sparse_nonzero_background():
np.set_printoptions(threshold=100000)
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.utils.sparsefuncs import csc_median_axis_0
import shap
np.random.seed(0)
X, y = shap.datasets.a1a() # pylint: disable=unbalanced-tuple-unpacking
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.01, random_state=0)
linear_model = LinearRegression()
linear_model.fit(x_train, y_train)
# Calculate median of background data
median_dense = csc_median_axis_0(x_train.tocsc())
median = sp.sparse.csr_matrix(median_dense)
explainer = shap.KernelExplainer(linear_model.predict, median)
shap_values = explainer.shap_values(x_test)
def dense_to_sparse_predict(data):
sparse_data = sp.sparse.csr_matrix(data)
return linear_model.predict(sparse_data)
explainer_dense = shap.KernelExplainer(dense_to_sparse_predict, median_dense.reshape((1, len(median_dense))))
x_test_dense = x_test.toarray()
shap_values_dense = explainer_dense.shap_values(x_test_dense)
def test_verify_pipeline_model_coefficient_explanation(self):
# Validate our explainer against an explainable linear model
X, y = shap.datasets.adult()
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=7)
# Note: in pipeline case, we use KernelExplainer;
# in linear case we use LinearExplainer which is much faster
pipeline = [True, False]
threshold = [0.85, 0.76]
for idx, is_pipeline in enumerate(pipeline):
# Fit a logistic regression classifier
model = create_sklearn_logistic_regressor(x_train, y_train, pipeline=is_pipeline)
# Create tabular explainer
exp = TabularExplainer(model, x_train, features=list(range(x_train.shape[1])))
test_logger.info("Running explain model for test_verify_linear_model_coefficient_explanation")
# Validate evaluation sampling
policy = {ExplainParams.SAMPLING_POLICY: SamplingPolicy(allow_eval_sampling=True)}
explanation = exp.explain_global(x_test, **policy)
mean_train = np.mean(x_train.values, axis=0)
def test_front_page_xgboost():
import xgboost
import shap
# load JS visualization code to notebook
shap.initjs()
# train XGBoost model
X,y,X_display = shap.datasets.boston()
bst = xgboost.train({"learning_rate": 0.01}, xgboost.DMatrix(X, label=y), 100)
# explain the model's predictions using SHAP values (use pred_contrib in LightGBM)
shap_values = bst.predict(xgboost.DMatrix(X), pred_contribs=True)
# visualize the first prediction's explaination
shap.visualize(shap_values[0,:], X.iloc[0,:])
# visualize the training set predictions
shap.visualize(shap_values, X)
def run_experiment(experiment, use_cache=True, cache_dir="/tmp"):
dataset_name, model_name, method_name, metric_name = experiment
# see if we have a cached version
cache_id = __gen_cache_id(experiment)
cache_file = os.path.join(cache_dir, cache_id + ".pickle")
if use_cache and os.path.isfile(cache_file):
with open(cache_file, "rb") as f:
#print(cache_id.replace("__", " ") + " ...loaded from cache.")
return pickle.load(f)
# compute the scores
print(cache_id.replace("__", " ", 4) + " ...")
sys.stdout.flush()
start = time.time()
X,y = getattr(datasets, dataset_name)()
score = getattr(metrics, metric_name)(
X, y,
getattr(models, dataset_name+"__"+model_name),
method_name
)
print("...took %f seconds.\n" % (time.time() - start))
# cache the scores
with open(cache_file, "wb") as f:
pickle.dump(score, f)
return score