19
19
import numpy as np
20
20
import scipy
21
21
import scipy .stats
22
+ from lib .anomalies .anomaly_detection import detect_upper_bound_anomaly , detect_lower_bound_anomaly
22
23
23
24
24
25
# rule specific parameters object, contains values received from the quality check threshold configuration
@@ -104,30 +105,22 @@ def evaluate_rule(rule_parameters: RuleExecutionRunParameters) -> RuleExecutionR
104
105
if all (readout > 0 for readout in extracted ):
105
106
# using a 0-based calculation (scale from 0)
106
107
upper_median_multiples_array = [(readout / filtered_median_float - 1.0 ) for readout in extracted if readout >= filtered_median_float ]
107
- upper_multiples = np .array (upper_median_multiples_array , dtype = float )
108
- upper_multiples_median = np .median (upper_multiples )
109
- upper_multiples_std = scipy .stats .tstd (upper_multiples )
108
+ threshold_upper_multiple = detect_upper_bound_anomaly (values_above_median = upper_median_multiples_array ,
109
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
110
110
111
- if float (upper_multiples_std ) == 0 :
112
- threshold_upper = filtered_median_float
113
- else :
114
- # Assumption: the historical data follows t-student distribution
115
- upper_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = upper_multiples_median , scale = upper_multiples_std )
116
- threshold_upper_multiple = float (upper_readout_distribution .ppf (1 - tail ))
111
+ if threshold_upper_multiple is not None :
117
112
threshold_upper = (threshold_upper_multiple + 1.0 ) * filtered_median_float
113
+ else :
114
+ threshold_upper = rule_parameters .actual_value
118
115
119
116
lower_median_multiples_array = [(- 1.0 / (readout / filtered_median_float )) for readout in extracted if readout <= filtered_median_float if readout != 0 ]
120
- lower_multiples = np .array (lower_median_multiples_array , dtype = float )
121
- lower_multiples_median = np .median (lower_multiples )
122
- lower_multiples_std = scipy .stats .tstd (lower_multiples )
117
+ threshold_lower_multiple = detect_lower_bound_anomaly (values_below_median = lower_median_multiples_array ,
118
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
123
119
124
- if float (lower_multiples_std ) == 0 :
125
- threshold_lower = filtered_median_float
126
- else :
127
- # Assumption: the historical data follows t-student distribution
128
- lower_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = lower_multiples_median , scale = lower_multiples_std )
129
- threshold_lower_multiple = float (lower_readout_distribution .ppf (tail ))
120
+ if threshold_lower_multiple is not None :
130
121
threshold_lower = filtered_median_float * (- 1.0 / threshold_lower_multiple )
122
+ else :
123
+ threshold_lower = rule_parameters .actual_value
131
124
132
125
passed = threshold_lower <= rule_parameters .actual_value <= threshold_upper
133
126
@@ -139,28 +132,21 @@ def evaluate_rule(rule_parameters: RuleExecutionRunParameters) -> RuleExecutionR
139
132
else :
140
133
# using unrestricted method
141
134
upper_half_filtered = [readout for readout in extracted if readout >= filtered_median_float ]
142
- upper_half = np .array (upper_half_filtered , dtype = float )
143
- upper_half_median = np .median (upper_half )
144
- upper_half_std = scipy .stats .tstd (upper_half )
135
+ threshold_upper_result = detect_upper_bound_anomaly (values_above_median = upper_half_filtered ,
136
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
145
137
146
- if float ( upper_half_std ) == 0 :
147
- threshold_upper = filtered_median_float
138
+ if threshold_upper_result is not None :
139
+ threshold_upper = threshold_upper_result
148
140
else :
149
- # Assumption: the historical data follows t-student distribution
150
- upper_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = upper_half_median , scale = upper_half_std )
151
- threshold_upper = float (upper_readout_distribution .ppf (1 - tail ))
141
+ threshold_upper = rule_parameters .actual_value
152
142
153
143
lower_half_list = [readout for readout in extracted if readout <= filtered_median_float ]
154
- lower_half = np .array (lower_half_list , dtype = float )
155
- lower_half_median = np .median (lower_half )
156
- lower_half_std = scipy .stats .tstd (lower_half )
157
-
158
- if float (lower_half_std ) == 0 :
159
- threshold_lower = filtered_median_float
144
+ threshold_lower_result = detect_lower_bound_anomaly (values_below_median = lower_half_list ,
145
+ degrees_of_freedom = degrees_of_freedom , tail = tail )
146
+ if threshold_lower_result is not None :
147
+ threshold_lower = threshold_lower_result
160
148
else :
161
- # Assumption: the historical data follows t-student distribution
162
- lower_readout_distribution = scipy .stats .t (df = degrees_of_freedom , loc = lower_half_median , scale = lower_half_std )
163
- threshold_lower = float (lower_readout_distribution .ppf (tail ))
149
+ threshold_lower = rule_parameters .actual_value
164
150
165
151
passed = threshold_lower <= rule_parameters .actual_value <= threshold_upper
166
152
0 commit comments