-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
305 lines (271 loc) · 9.57 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
from util.dealWithStream import DWS
from util.esTransmission import Estransmission
from util.od import AbnomalDector
from util.dataPretreatment import DataPretreatment
from threading import Thread,Condition
import queue
from concurrent.futures import ThreadPoolExecutor
from configparser import ConfigParser
from redis import ConnectionPool
import time
import psutil
import json
import joblib
from threading import Lock
APP_LIST=[
'HTTP.TikTok',
]
'''
very import !!!!
add new global announce here,aim to control model iteration
keys must be the same as APP_LIST
'''
MODEL_FLAG_TIKTOK = 0 #range is 0,1
MODEL_LIST={
'HTTP.TikTok':MODEL_FLAG_TIKTOK,
}
'''
algorithm list
'''
ALGORITHM_LIST=[
"iforest",#孤立森林算法
"lscp", #局部异常选择并行集成算法
"hbos",#直方图异常检测算法
"sod",#子空间异常检测
'''
下列为备选算法,有需要时提前训练模型保存至../models文件夹下 取消注释即可
'''
# "mcd",#最小方差行列式算法(寻找包含H个样本的最优椭球体)
# "abod", #基于角度量的异常检测
# "ae",#自编码器异常检测
# "lof",#局部异常检测
# "mogaal",#生成对抗主动学习的无监督异常检测
# "ocsvm",#one-class SVM
# "sos",#随机异常检测(关联度检测)
# "vae",#变分自编码器
# "xgbod",#基于XGboost的异常检测
]
ATTR_LIST=[
"bidirectional_duration_ms",
"bidirectional_ip_bytes",
"bidirectional_max_piat_ms",
"bidirectional_mean_piat_ms",
"bidirectional_min_piat_ms",
"bidirectional_packets",
"bidirectional_raw_bytes",
"bidirectional_stdev_piat_ms",
"dst2src_ack_packets",
"dst2src_duration_ms",
"dst2src_ip_bytes",
"dst2src_max_piat_ms",
"dst2src_mean_piat_ms",
"dst2src_packets",
"dst2src_raw_bytes",
"src2dst_ack_packets",
"src2dst_duration_ms",
"src2dst_ip_bytes",
"src2dst_max_piat_ms",
"src2dst_mean_piat_ms",
"dst2src_min_raw_ps",
"src2dst_raw_bytes",
"src2dst_stdev_piat_ms"
]
class CountDownLatch():
'''
thread synchronization
'''
def __init__(self,count):
self.count=count
self.condition=Condition()
def await(self):
try:
self.condition.acquire()
while self.count>0:
self.condition.wait()
finally:
self.condition.release()
def countDown(self):
try:
self.condition.acquire()
self.count-=1
self.condition.notifyAll()
finally:
self.condition.release()
def get_count(self):
return self.count
'''
性能测试:时间
'''
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print(func.__name__+'共耗时约 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
'''
内存、cpu、磁盘信息
'''
# def systemStatus(func):
# def wrapper(*args,**kwargs):
# mem = psutil.virtual_memory().percent
# cpu_status = psutil.cpu_times()
# disk_status = psutil.disk_usage("./").percent
# res = func(*args, **kwargs)
# memd=psutil.virtual_memory().percent
# cpu_statusd = psutil.cpu_times()
# disk_statusd = psutil.disk_usage("./").percent
# return res
# return wrapper
@timer
def model_controller(model_first,model_second,data_pretreatment,cycle,elasticsearch):
'''
:param model_first: 第一个模型集合列表
:param model_second: 第二个模型集合列表
:param data_pretreatment: 数据辅助类
:param cycle: 迭代周期(天)
:param elasticsearch: es对象
:return: None
'''
global APP_LIST,MODEL_LIST
start=time.time()
while True:
currentTime = time.time()
if start + cycle*24*60*60 < currentTime:
start = currentTime
for app in MODEL_LIST.keys():
index = APP_LIST.index(app_name)
data = data_pretreatment.getData(app)
if MODEL_LIST[app] == 1:
model_first[index]._models_train(data,app)
elif MODEL_LIST[app] == 0:
model_second[index]._models_train(data,app)
else:
raise RuntimeError("except 0 or 1 of "+str(app)+" flag,but get "+
str(MODEL_LIST[app])+" instead")
MODEL_LIST[app] = 1 - MODEL_LIST[app]
slide_controller(elasticsearch,app)
data_pretreatment.deleteData(app)
@timer
def slide_controller(elasticsearch,app):
'''
:param elasticsearch: es类
:param app: 应用名称
:return: None
只向es中添加数据
'''
elasticsearch.redis2es(app)
if __name__ == '__main__':
'''
初始化信号量 用于线程同步
初始化线程池
'''
thread_pool = ThreadPoolExecutor(1000)
latch = CountDownLatch(count=2)
'''
初始化redis连接池
'''
conf = ConfigParser()
conf.read("./config/config.ini")
try:
port = int(conf['default']['redis_port'])
pool = ConnectionPool(host='localhost', port=port, decode_responses=True)
except ValueError as e:
raise RuntimeError("fail to connect redis, please check the parameter again") from e
'''
初始化工作:
---处理csv文件
---建立索引
---上传csv文件
'''
try:
_csv_dir=conf['default']['csv_dir']
except:
_csv_dir=""
es = Estransmission(12,[],"./config/base.txt",attrlist=ATTR_LIST,pool=pool)
dws = DWS(pool=pool,attrlist=ATTR_LIST)
_data_pretreatment=DataPretreatment(pool=pool,attrlist=ATTR_LIST)
if _csv_dir == "":
for app in APP_LIST:
# es.createIndex(app) #将此操作放入下面函数一起操作,避免重复数据插入
es.redis2es(app)
else:
for file in _csv_dir:
es.solveCSV(_csv_dir+file)
es.createIndex(file.split(".")[0])
es.sendCSV(_csv_dir+file)
'''
----目标:迭代异常检测类(每个应用两个模型)
----思路:初始化两个模型列表,分别对应每个应用的模型1和模型2,载入预训练的算法模型,模型1用于第t个周期的预测
第t个周期结束时将第t个周期的数据训练模型2,同理第t+1周期使用模型2预测,t+1周期内数据迭代新的模型1,以此类推
----
'''
try:
cycle = int(conf['default']['cycle'])
except ValueError as e:
raise RuntimeError("expect a model cycle in config.ini") from e
_model_first=[]
_model_second=[]
for app in APP_LIST:
_od_algorithms_first = AbnomalDector(MODEL_LIST)
_od_algorithms_second = AbnomalDector(MODEL_LIST)
_model_first.append(_od_algorithms_first)
_model_second.append(_od_algorithms_second)
'''
控制异常检测模型迭代
'''
for i, odclass in enumerate(_model_first):
for j, model in enumerate(ALGORITHM_LIST):
_model_first[i]._model_list[j] = joblib.load(
"./models/" + APP_LIST[i] + str("-") + str(model.__class__.__name__) + ".model")
_model_first[i]._model_list[j] = joblib.load(
"./models/" + APP_LIST[i] + str("-") + str(model.__class__.__name__) + ".model")
thread_pool.submit(model_controller,_model_first,_model_second,_data_pretreatment,cycle,es)
# '''
# 控制滑动窗口模型定时更新的线程
# '''
# thread_pool.submit(slide_controller, (es))
# for app in app_list:
# data=_data_pretreatment.getData(app)
# _od_algorithms_first._models_train(data, app)
'''
网络流缓冲区 使用线程安全的队列 写入时不阻塞 读取有锁
第一个缓冲区中为滑动窗口模型的json序列;
第二个缓冲区为异常检测算法的numpy数组
'''
_buffer_slide=queue.Queue()
_buffer_od=queue.Queue()
'''
网卡监控程序,对数据进行捕获并将数据添加到缓冲区
'''
thread_pool.submit(dws.run,_buffer_slide,_buffer_od)
'''
设置两个线程池
检查程序将网络流与滑动数据进行比较,将网络流数据送入异常检查程序判定
'''
# lock=Lock()
while True:
try:
print(_buffer_slide.qsize(),_buffer_od.qsize())
if not (_buffer_slide.empty() and _buffer_od.empty()):
# lock.acquire()
flow = json.loads(_buffer_slide.get())
flow_matrix = _buffer_od.get()
# lock.release()
_slide_res=es.slidwindowOD(flow)
app_name = flow['application_name']
index = APP_LIST.index(app_name)
if MODEL_LIST[app_name] == 0:
_od_res = _model_first[index]._model_predict(flow_matrix)
elif MODEL_LIST[app_name] == 1:
_od_res = _model_second[index]._model_predict(flow_matrix)
else:
raise RuntimeError("except 0 or 1 of "+str(app_name)+" flag,but get "+str(MODEL_LIST[app_name])+" instead")
if _slide_res or _od_res:
with open("./log/"+str(app_name),"a+")as f:
f.write(str(flow))
pass
except Exception as e:
raise RuntimeError("error when detecting flow")from e
# _check_thread=Thread(target=es.slidwindowOD(buffer,))
# _check_thread.start()