-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathtest_4_2.dos
152 lines (122 loc) · 6.82 KB
/
test_4_2.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
login("admin", "123456")
undef all
clearAllCache()
def createDB(dbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 10])
db = database(dbName, COMPO, [db1, db2], , "TSDB")
db = database(dbName)
}
def createTb_First(dbName, tbName){
db = database(dbName)
colName = `SecurityID`TradeDate`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNo`SellNo
colType = `SYMBOL`DATE`TIME`DOUBLE`INT`DOUBLE`INT`INT
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema, tableName=tbName, partitionColumns=`TradeDate`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`SecurityID`TradeDate`TradeTime, keepDuplicates=FIRST)
}
def createTb_Last(dbName, tbName){
db = database(dbName)
colName = `SecurityID`TradeDate`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNo`SellNo
colType = `SYMBOL`DATE`TIME`DOUBLE`INT`DOUBLE`INT`INT
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema, tableName=tbName, partitionColumns=`TradeDate`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`SecurityID`TradeDate`TradeTime, keepDuplicates=LAST)
}
def createTb_All(dbName, tbName){
db = database(dbName)
colName = `SecurityID`TradeDate`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNo`SellNo
colType = `SYMBOL`DATE`TIME`DOUBLE`INT`DOUBLE`INT`INT
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema, tableName=tbName, partitionColumns=`TradeDate`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`SecurityID`TradeDate`TradeTime, keepDuplicates=ALL)
}
/* 模拟数据 */
def writeData(dbName, tbName, date){
n = 10000000
SecurityID = stretch("st" + string(1..10), n)
TradeDate = take(date, n)
TradeTime = segmentby(sort, 09:00:00.000 + rand(9000000, n), SecurityID)
TradePrice = stretch(rand(200, 10), n) + rand(10.0, n)
TradeQty = stretch(rand(10, 10) * 100, n) + rand(100, n)
TradeAmount = TradePrice * TradeQty
allOrderPool=rand(1..n,n)
modes=allOrderPool%2
buyOrderPool=allOrderPool[bool(modes)]//奇数作为买单号池子
sellOrderPool=allOrderPool[bool(modes==0)]//偶数作为买单号池子
BuyNo=rand(buyOrderPool,n)
SellNo=rand(sellOrderPool,n)
tick = table(SecurityID, TradeDate, TradeTime, TradePrice, TradeQty, TradeAmount, BuyNo, SellNo)
loadTable(dbName, tbName).append!(tick)
pnodeRun(flushTSDBCache)
}
setMaxMemSize(128)
// 创库创表
dbName = "dfs://SH_TSDB_tick"
createDB(dbName)
tbName_all = "tick_all"
createTb_All(dbName, tbName_all)
tbName_last = "tick_last"
createTb_Last(dbName, tbName_last)
tbName_first = "tick_first"
createTb_First(dbName, tbName_first)
// 分别写入 10 天数据
for(i in 1..10){
dateList=2023.07.01..2023.07.10
each(submitJob{,,writeData, dbName, tbName_all,}, "writeData"+ string(1..dateList.size()), string(dateList), dateList);
}
getRecentJobs(100)
for(i in 1..10){
dateList=2023.07.01..2023.07.10
each(submitJob{,,writeData, dbName, tbName_last,}, "writeData"+ string(1..dateList.size()), string(dateList), dateList);
}
getRecentJobs(100)
for(i in 1..10){
dateList=2023.07.01..2023.07.10
each(submitJob{,,writeData, dbName, tbName_first,}, "writeData"+ string(1..dateList.size()), string(dateList), dateList);
}
getRecentJobs(100)
// 检查数据是否完全写入
select count(*) from loadTable(dbName, tbName_first) // 603,730,363
select count(*) from loadTable(dbName, tbName_last) // 603,729,137
select count(*) from loadTable(dbName, tbName_all) // 1,000,000,000
getTabletsMeta()
// 合并前
re_all = select * from pnodeRun(getTSDBMetaData) where chunkPath like "%/SH_TSDB_tick%" and table like "tick_all%" context by chunkId order by files
fileNum_all = re_all.files.split(",").flatten().dropna()
re_first = select * from pnodeRun(getTSDBMetaData) where chunkPath like "%/SH_TSDB_tick%" and table like "tick_first%" context by chunkId order by files
fileNum_first = re_first.files.split(",").flatten().dropna()
re_last = select * from pnodeRun(getTSDBMetaData) where chunkPath like "%/SH_TSDB_tick%" and table like "tick_last%" context by chunkId order by files
fileNum_last = re_last.files.split(",").flatten().dropna()
// 合并后
re_all_ac = select * from pnodeRun(getTSDBMetaData) where chunkPath like "%/SH_TSDB_tick%" and table like "tick_all%" context by chunkId order by files
fileNum_all_ac = re_all_ac.files.split(",").flatten().dropna()
re_first_ac = select * from pnodeRun(getTSDBMetaData) where chunkPath like "%/SH_TSDB_tick%" and table like "tick_first%" context by chunkId order by files
fileNum_first_ac = re_first_ac.files.split(",").flatten().dropna()
re_last_ac = select * from pnodeRun(getTSDBMetaData) where chunkPath like "%/SH_TSDB_tick%" and table like "tick_last%" context by chunkId order by files
fileNum_last_ac = re_last_ac.files.split(",").flatten().dropna()
// 触发合并
chunkIDs = exec chunkID from pnodeRun(getChunksMeta{"/SH_TSDB_tick%"}) where dfsPath not like "%tbl%" and dfsPath not like "%domain%"
for(chunkID in chunkIDs){
pnodeRun(triggerTSDBCompaction{chunkID})
}
// 检测是否完成合并
select * from pnodeRun(getTSDBCompactionTaskStatus) where isNull(endTime)
// ALL 测试
timer select * from loadTable(dbName, tbName_all) where SecurityID="st5" // 8273.835 ms -> 7673.581 ms
timer select * from loadTable(dbName, tbName_all) where SecurityID="st9" and TradeDate=2023.07.10 // 1713.206 ms -> 1608.503 ms
timer select * from loadTable(dbName, tbName_all) // 63297.3 ms -> 57777.834 ms
timer select count(*) from loadTable(dbName, tbName_all) // 26.845 ms -> 18.655 ms
timer select cumsum(TradeAmount) from loadTable(dbName, tbName_all) context by TradeDate, SecurityID // 11645.403 ms -> 11490.736 ms
// FIRST 测试
timer select * from loadTable(dbName, tbName_first) where SecurityID="st5" // 7995.086 ms -> 5077.544 ms
timer select * from loadTable(dbName, tbName_first) where SecurityID="st9" and TradeDate=2023.07.10 // 1928.873 ms -> 1064.022 ms
timer select * from loadTable(dbName, tbName_first) // 49672.931 ms -> 37186.295 ms
timer select count(*) from loadTable(dbName, tbName_first) // 14206.989 ms -> 24.599 ms
timer(10) select cumsum(TradeAmount) from loadTable(dbName, tbName_first) context by TradeDate, SecurityID // 23586.545 ms -> 8255.123 ms
// LAST 测试
timer select * from loadTable(dbName, tbName_last) where SecurityID="st5" // 8078.446 ms -> 4902.045 ms
timer select * from loadTable(dbName, tbName_last) where SecurityID="st9" and TradeDate=2023.07.10 // 1924.393 ms -> 948.727 ms
timer select * from loadTable(dbName, tbName_last) // 45825.662 ms -> 30251.118 ms
timer select count(*) from loadTable(dbName, tbName_last) // 14449.656 ms -> 24.483 ms
timer select cumsum(TradeAmount) from loadTable(dbName, tbName_last) context by TradeDate, SecurityID // 27112.269 ms -> 8695.760 ms