19
19
import numpy as np
20
20
import scipy
21
21
import scipy .stats
22
+ from lib .anomalies .anomaly_detection import detect_upper_bound_anomaly , detect_lower_bound_anomaly
22
23
23
24
24
25
# rule specific parameters object, contains values received from the quality check threshold configuration
@@ -109,31 +110,23 @@ def evaluate_rule(rule_parameters: RuleExecutionRunParameters) -> RuleExecutionR
109
110
# using a 0-based calculation (scale from 0)
110
111
upper_median_multiples_array = [(difference / differences_median_float - 1.0 ) for difference
111
112
in differences_list if difference >= differences_median_float ]
112
- upper_multiples = np .array (upper_median_multiples_array , dtype = float )
113
- upper_multiples_median = np .median (upper_multiples )
114
- upper_multiples_std = scipy .stats .tstd (upper_multiples )
113
+ threshold_upper_multiple = detect_upper_bound_anomaly (values_above_median = upper_median_multiples_array ,
114
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
115
115
116
- if float (upper_multiples_std ) == 0 :
117
- threshold_upper = differences_median_float
118
- else :
119
- # Assumption: the historical data follows t-student distribution
120
- upper_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = upper_multiples_median , scale = upper_multiples_std )
121
- threshold_upper_multiple = float (upper_readout_distribution .ppf (1 - tail ))
116
+ if threshold_upper_multiple is not None :
122
117
threshold_upper = (threshold_upper_multiple + 1.0 ) * differences_median_float
118
+ else :
119
+ threshold_upper = rule_parameters .actual_value
123
120
124
121
lower_median_multiples_array = [(- 1.0 / (difference / differences_median_float )) for difference
125
122
in differences_list if difference <= differences_median_float if difference != 0 ]
126
- lower_multiples = np .array (lower_median_multiples_array , dtype = float )
127
- lower_multiples_median = np .median (lower_multiples )
128
- lower_multiples_std = scipy .stats .tstd (lower_multiples )
123
+ threshold_lower_multiple = detect_lower_bound_anomaly (values_below_median = lower_median_multiples_array ,
124
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
129
125
130
- if float (lower_multiples_std ) == 0 :
131
- threshold_lower = differences_median_float
132
- else :
133
- # Assumption: the historical data follows t-student distribution
134
- lower_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = lower_multiples_median , scale = lower_multiples_std )
135
- threshold_lower_multiple = float (lower_readout_distribution .ppf (tail ))
126
+ if threshold_lower_multiple is not None :
136
127
threshold_lower = differences_median_float * (- 1.0 / threshold_lower_multiple )
128
+ else :
129
+ threshold_lower = rule_parameters .actual_value
137
130
138
131
passed = threshold_lower <= actual_difference <= threshold_upper
139
132
@@ -145,28 +138,21 @@ def evaluate_rule(rule_parameters: RuleExecutionRunParameters) -> RuleExecutionR
145
138
else :
146
139
# using unrestricted method for both positive and negative values
147
140
upper_half_filtered = [difference for difference in differences_list if difference >= differences_median_float ]
148
- upper_half = np .array (upper_half_filtered , dtype = float )
149
- upper_half_median = np .median (upper_half )
150
- upper_half_std = scipy .stats .tstd (upper_half )
141
+ threshold_upper_result = detect_upper_bound_anomaly (values_above_median = upper_half_filtered ,
142
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
151
143
152
- if float ( upper_half_std ) == 0 :
153
- threshold_upper = differences_median_float
144
+ if threshold_upper_result is not None :
145
+ threshold_upper = threshold_upper_result
154
146
else :
155
- # Assumption: the historical data follows t-student distribution
156
- upper_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = upper_half_median , scale = upper_half_std )
157
- threshold_upper = float (upper_readout_distribution .ppf (1 - tail ))
147
+ threshold_upper = rule_parameters .actual_value
158
148
159
149
lower_half_list = [difference for difference in differences_list if difference <= differences_median_float ]
160
- lower_half = np .array (lower_half_list , dtype = float )
161
- lower_half_median = np .median (lower_half )
162
- lower_half_std = scipy .stats .tstd (lower_half )
163
-
164
- if float (lower_half_std ) == 0 :
165
- threshold_lower = differences_median_float
150
+ threshold_lower_result = detect_lower_bound_anomaly (values_below_median = lower_half_list ,
151
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
152
+ if threshold_lower_result is not None :
153
+ threshold_lower = threshold_lower_result
166
154
else :
167
- # Assumption: the historical data follows t-student distribution
168
- lower_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = lower_half_median , scale = lower_half_std )
169
- threshold_lower = float (lower_readout_distribution .ppf (tail ))
155
+ threshold_lower = rule_parameters .actual_value
170
156
171
157
passed = threshold_lower <= actual_difference <= threshold_upper
172
158
0 commit comments