-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcramersvcorr.py
79 lines (60 loc) · 2.96 KB
/
cramersvcorr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pandas as pd
import numpy as np
import plotly.express as px
import scipy.stats as ss
import warnings
warnings.filterwarnings('ignore')
class Cramers:
def __init__(self):
pass
def detect_categorical_columns(self, df, threshold_unique=0.05, threshold_distribution=0.95):
""" auto-detects categorical columns as per thresholds"""
categorical_columns = []
for col in df.columns:
dtype = df[col].dtype
unique_count = df[col].nunique()
value_counts = df[col].value_counts(normalize=True)
if (dtype == 'object' or dtype.name == 'category') \
or (unique_count / len(df) <= threshold_unique) \
or (value_counts.max() >= threshold_distribution):
categorical_columns.append(col)
return categorical_columns
def cramers_v(self, x, y):
""" computes cramer's v correlation, given two pandas series objects"""
confusion_matrix = pd.crosstab(x,y)
chi2 = ss.chi2_contingency(confusion_matrix)[0]
n = confusion_matrix.sum().sum()
phi2 = chi2/n
r,k = confusion_matrix.shape
phi2corr = max(0, phi2-((k-1)*(r-1))/(n-1))
rcorr = r-((r-1)**2)/(n-1)
kcorr = k-((k-1)**2)/(n-1)
return np.sqrt(phi2corr/min((kcorr-1),(rcorr-1)))
def corr(self,data,add_cols=[],rem_cols=[],plot_htmp=False):
""" main function to calculate cramers correlation given the dataframe"""
coef_scores=[]
categorical_columns=self.detect_categorical_columns(data, threshold_unique=0.05, threshold_distribution=0.95)
for col in add_cols:
if col not in data.columns:
raise KeyError(f'Unable to locate column {col} in the dataframe')
data[col] = data[col].astype('object')
categorical_columns.append(col)
categorical_columns = list(set(categorical_columns) - set(rem_cols))
for i in categorical_columns:
for j in categorical_columns:
""" temp treatment of missing values to override exceptions """
col1=data[i].fillna(data[i].mode()[0]).values
col2=data[j].fillna(data[j].mode()[0]).values
coef= self.cramers_v(col1, col2)
coef_scores.append(coef)
reshape_val=int(np.sqrt(len(coef_scores)))
coef_scores=np.array(coef_scores).reshape(-reshape_val,reshape_val)
coef_scores_df=pd.DataFrame(coef_scores)
coef_scores_df.fillna(0, inplace=True)
coef_scores_df.columns=categorical_columns
coef_scores_df.index=categorical_columns
if plot_htmp==True:
fig = px.imshow(coef_scores_df, text_auto=True)
fig.show()
else:
return coef_scores_df