Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
plot_name = path + '/' + plot_name
vis_object.title = idx
print("You can save plot as SVG or PNG by open three-dotted button at right =>")
watermark = alt.Chart().mark_text(
align='center', baseline='top', dy=vis_object.height // 2 + 30, fontSize=32, fontWeight=200,
color='#d3d3d3', text='Retentioneering'
)
vis_object.save(plot_name)
if kwargs.get('interactive', True):
alt.renderers.enable('notebook')
display(vis_object + watermark)
if cfg.get('mongo_client') is not None:
print(f'DB {idx}')
ml = MongoLoader(cfg.get('mongo_client'), collection=cfg.get('mongo_user'))
ml.put(plot_name if '.' in plot_name else plot_name + '.svg', idx.split(' ')[1])
return res
return altair_save_plot_wrapper
tmp.index = tmp.target
if plot_cnt <= 2:
ax[i].pie(tmp.target_dist.reindex(targets).fillna(0).values, labels=targets, autopct='%1.1f%%')
ax[i].set_title('Class {}\nCluster volume {}%\nMean dist from center {:.2f}'.format(
i, round(volumes[i], 1), metrics['mean_fc'][j] if (metrics or {}).get('mean_fc') is not None else 0))
else:
ax[i // 2][i % 2].pie(tmp.target_dist.reindex(targets).fillna(0).values, labels=targets, autopct='%1.1f%%')
ax[i // 2][i % 2].set_title('Class {}\nCluster volume {}%\nMean dist from center {:.2f}'.format(
i, round(volumes[i], 1), metrics['mean_fc'][j] if (metrics or {}).get('mean_fc') is not None else 0))
if plot_cnt % 2 == 1:
fig.delaxes(ax[plot_cnt // 2, 1])
plot_name = plot_name if plot_name is not None else 'clusters_pie_{}.svg'.format(
datetime.now()).replace(':', '_').replace('.', '_')
plot_name = data.retention.retention_config['experiments_folder'] + '/' + plot_name
return ___FigureWrapper__(fig), plot_name, None, data.retention.retention_config
else: # if tables is a list
files = tables
agents = []
for idx, file in enumerate(files):
if type(num_pops) == int:
agents_num = num_pops # int(pd.read_csv(os.path.join(stats, file)).users_count.iloc[0])
else:
agents_num = num_pops[idx]
if type(tables) == str:
clus_dyn = pd.read_csv(os.path.join(dyn_mat, file), index_col=[0])
for i in range(agents_num):
agents.append(Agent(clus_dyn, file.split('_')[1].split('.')[0], self.restriction_pos, config))
else:
step_matr = self.prepare_step_matrix(file)
for i in range(agents_num):
agents.append(Agent(step_matr, idx, self.restriction_pos, config))
return agents
if type(tables) == str: # through folders
dyn_mat = os.path.join(path, tables)
files = sorted(
list(filter(lambda x: x[0] != '.', os.listdir(dyn_mat)))) # collect all files except MacOS system files
else: # if tables is a list
files = tables
agents = []
for idx, file in enumerate(files):
if type(num_pops) == int:
agents_num = num_pops # int(pd.read_csv(os.path.join(stats, file)).users_count.iloc[0])
else:
agents_num = num_pops[idx]
if type(tables) == str:
clus_dyn = pd.read_csv(os.path.join(dyn_mat, file), index_col=[0])
for i in range(agents_num):
agents.append(Agent(clus_dyn, file.split('_')[1].split('.')[0], self.restriction_pos, config))
else:
step_matr = self.prepare_step_matrix(file)
for i in range(agents_num):
agents.append(Agent(step_matr, idx, self.restriction_pos, config))
return agents
'accept_privacy_policy': 'bad_node',
}
```
If ``node_params=None``, it will be constructed from ``retention_config`` variable, so that:
```
{
'positive_target_event': 'nice_target',
'negative_target_event': 'bad_target',
'source_event': 'source',
}
```
Default: ``None``
"""
self.show_quality_metrics(test_sample, test_target)
if hasattr(self.mod, 'coef_'):
self._plot_perm_imp(__LogRegWrapper__(self.mod.coef_[0]), test_sample, node_params, **kwargs)
return
perm = PermutationImportance(self.mod, random_state=0).fit(test_sample, test_target)
eli5.show_weights(perm, feature_names=[' '.join(i) if type(i) == tuple else i for i in test_sample.columns])
self._plot_perm_imp(perm, test_sample, node_params, **kwargs)
if len(res) == 2:
(vis_object, name), res, cfg = res, None, None
elif len(res) == 3:
(vis_object, name, res), cfg = res, None
else:
vis_object, name, res, cfg = res
idx = 'id: ' + str(int(datetime.now().timestamp()))
coords = vis_object.axis()
if '_3d_' not in name:
vis_object.text((coords[0] - (coords[1] - coords[0]) / 10),
(coords[3] + (coords[3] - coords[2]) / 10), idx, fontsize=8)
vis_object.text(0, 0.05, 'Retentioneering', fontsize=50, color='gray', va='bottom', alpha=0.1)
vis_object.get_figure().savefig(name, bbox_inches="tight", dpi=cfg.get('save_dpi') or 200)
if cfg.get('mongo_client') is not None:
print(f'DB {idx}')
ml = MongoLoader(cfg.get('mongo_client'), collection=cfg.get('mongo_user'))
ml.put(name if '.' in name else name + '.png', idx.split(' ')[1])
if '.html' in name:
ml.put(vis_object.get_raw(name), idx.split(' ')[1] + '_config')
return res
return save_plot_wrapper
def core_event_distribution(self, core_events, index_col=None, event_col=None,
thresh=None, plotting=True, use_greater=True, **kwargs):
self._init_cols(locals())
if type(core_events) == str:
core_events = [core_events]
self._obj['is_core_event'] = self._obj[self._event_col()].isin(core_events)
rates = self._obj.groupby(self._index_col()).is_core_event.mean()
if plotting:
plot.core_event_dist(rates, thresh, **kwargs)
if use_greater:
f = set(rates[rates >= thresh].index.values)
else:
f = set(rates[rates < thresh].index.values)
return self._obj[self._obj[self._index_col()].isin(f)].reset_index(drop=True)
[self._event_col()]
.value_counts()
.loc[top_cluster['index']]
/ clus2.shape[0]).reset_index()
cr1 = (
clus2[
clus2[self._event_col()] == self.retention_config['positive_target_event']
][self._index_col()].nunique()
) / clus2[self._index_col()].nunique()
top_all.columns = [self._event_col(), 'freq', ]
top_cluster.columns = [self._event_col(), 'freq', ]
top_all['hue'] = 'all' if cl2 is None else f'cluster {cl2}'
top_cluster['hue'] = f'cluster {cl1}'
plot.cluster_event_dist(
top_all.append(top_cluster, ignore_index=True, sort=False),
self._event_col(),
cl1,
[
clus[self._index_col()].nunique() / self._obj[self._index_col()].nunique(),
clus2[self._index_col()].nunique() / self._obj[self._index_col()].nunique(),
],
[cr0, cr1],
cl2
)
features = self.extract_features(**kwargs)
if not hasattr(self, 'clusters') or refit_cluster:
clusterer = getattr(clustering, method)
self.clusters, self._metrics = clusterer(features, **kwargs)
self._create_cluster_mapping(features.index.values)
if hasattr(self, 'datatype') and self.datatype == 'features':
target = kwargs.pop('target')
else:
target = self.get_positive_users(**kwargs)
target = features.index.isin(target)
self._metrics['homogen'] = clustering.homogeneity_score(target, self.clusters)
if hasattr(self, '_tsne'):
features.retention._tsne = self._tsne
if plot_type:
func = getattr(plot, plot_type)
res = func(
features,
clustering.aggregate_cl(self.clusters, 7) if method == 'dbscan' else self.clusters,
target,
metrics=self._metrics,
**kwargs
)
if res is not None:
self._tsne = res
return self.clusters
f_cur = self._obj[self._event_col()] == event_order[0]
f_next = self._obj['next_event'] == event_order[1]
s_next = self._obj[f_cur & f_next].copy()
s_cur = self._obj[f_cur & (~f_next)].copy()
s_cur.time_diff[s_cur.time_diff < limit].hist(alpha=0.5, log=True,
bins=bins, label='Others {:.2f}'.format(
(s_cur.time_diff < limit).sum() / f_cur.sum()
))
s_next.time_diff[s_next.time_diff < limit].hist(alpha=0.7, log=True,
bins=bins,
label='Selected event order {:.2f}'.format(
(s_next.time_diff < limit).sum() / f_cur.sum()
))
plot.sns.mpl.pyplot.legend()
plot.sns.mpl.pyplot.show()
(s_cur.next_event.value_counts() / f_cur.sum()).iloc[:topk].plot.bar()