-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspore_db_utils.py
384 lines (339 loc) · 12.9 KB
/
spore_db_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
import psycopg2, os
from dotenv import load_dotenv
from web3 import Web3
from web3.middleware import geth_poa_middleware
from web3._utils.filters import construct_event_filter_params
from web3._utils.events import get_event_data
from eth_abi.codec import ABICodec
import json
from flask import jsonify
import threading
import time
load_dotenv()
def verify_db_connection():
try:
conn = psycopg2.connect(database=os.getenv("DATABASE_NAME", "spore_db"),
host=os.getenv("host", "localhost"),
user=os.getenv("user", "postgres"),
password=os.getenv("password", ""),
port=os.getenv("port", "5432"))
conn.close()
return True
except psycopg2.Error as e:
print("Unable to connect to the database")
print(e.pgerror)
print(e.diag.message_detail)
return False
def initialize_connection():
try:
conn = psycopg2.connect(database=os.getenv("DATABASE_NAME", "spore_db"),
host=os.getenv("host", "localhost"),
user=os.getenv("user", "postgres"),
password=os.getenv("password", ""),
port=os.getenv("port", "5432"))
return conn
except psycopg2.Error as e:
print("Unable to connect to the database")
print(e.pgerror)
print(e.diag.message_detail)
return False
#in the case of the NFTs there are two tables to be created plus one for the control; the first one is the table filtered for all buy events with tx hash and block, to calculate the total volume of the token. The second one tells the price of each of the 72 NFTs; the third one is the control, which tells you the last block indexed
def create_nft_tables():
conn = initialize_connection()
c= conn.cursor()
query = """CREATE TABLE IF NOT EXISTS nft_buys (
id SERIAL PRIMARY KEY,
blockNumber INTEGER,
transactionHash TEXT,
tokenId INTEGER,
value TEXT
)"""
c.execute(query)
conn.commit()
query = """CREATE TABLE IF NOT EXISTS nft_prices (
id SERIAL PRIMARY KEY,
tokenId INTEGER,
price TEXT
)"""
c.execute(query)
conn.commit()
query = """
CREATE TABLE IF NOT EXISTS nft_control (
id SERIAL PRIMARY KEY,
table_name TEXT,
lastBlock INTEGER
);
INSERT INTO nft_control (table_name, lastBlock)
SELECT 'nft_buys', 1747108 WHERE NOT EXISTS (SELECT 1 FROM nft_control WHERE table_name = 'nft_buys');
INSERT INTO nft_control (table_name, lastBlock)
SELECT 'nft_prices', 1747108 WHERE NOT EXISTS (SELECT 1 FROM nft_control WHERE table_name = 'nft_prices');
"""
c.execute(query)
conn.commit()
conn.close()
def connect_and_get_Bought_event():
chain_url= "https://api.avax.network/ext/bc/C/rpc"
#spore nftv1 address avalanche
contract_address= "0xc2457F6Eb241C891EF74E02CCd50E5459c2E28Ea"
with open('abi/nftv1_abi.json') as abi:
nftv1_abi = json.load(abi)
web3 = Web3(Web3.HTTPProvider(chain_url))
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
# print("clientVersion: ",web3.client_version)
# print("isConnected: ",web3.is_connected())
contract = web3.eth.contract(address=contract_address, abi=nftv1_abi)
#attributes = vars(contract.events)
event=contract.events.Bought
return web3, event
def get_last_block(table_name):
conn = initialize_connection()
c= conn.cursor()
query = "SELECT lastBlock FROM nft_control WHERE table_name = %s"
c.execute(query, (table_name,))
result = c.fetchone()
last_block = result[0] if result else None
conn.close()
return last_block
def get_ava_latest_block():
try:
chain_url= "https://api.avax.network/ext/bc/C/rpc"
web3 = Web3(Web3.HTTPProvider(chain_url))
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
latest_block= web3.eth.get_block('latest')['number']
return latest_block
except Exception as e:
print(e)
return False
def bought_event_filter(fromBlock,toBlock, web3, event):
abi = event._get_event_abi()
codec: ABICodec = web3.codec
nftv1_address= "0xc2457F6Eb241C891EF74E02CCd50E5459c2E28Ea"
data_filter_set, event_filter_params = construct_event_filter_params(
abi,
codec,
contract_address=nftv1_address,
fromBlock=fromBlock,
toBlock=toBlock,
address=None,
topics=None,
)
# Call node over JSON-RPC API
try:
logs = web3.eth.get_logs(event_filter_params)
except Exception as e:
return
for log in logs:
# Convert raw JSON-RPC log result to human readable event by using ABI data
# More information how processLog works here
# https://github.com/ethereum/web3.py/blob/fbaf1ad11b0c7fac09ba34baff2c256cffe0a148/web3/_utils/events.py#L200
evt = get_event_data(codec, abi, log)
print(evt)
exit()
# handle_bought_event(evt)
def process_nft_buy_events(web3, event, from_block, to_block):
conn = initialize_connection()
c= conn.cursor()
logs = event.get_logs(fromBlock=from_block, toBlock=to_block)
for log in logs:
print(log)
blockNumber = log['blockNumber']
transactionHash = log['transactionHash'].hex()
tokenId = log['args']['tokenId']
value = log['args']['value']
query = "INSERT INTO nft_buys (blockNumber, transactionHash, tokenId, value) VALUES (%s, %s, %s, %s)"
c.execute(query, (blockNumber, transactionHash, tokenId, value))
conn.commit()
query = "UPDATE nft_control SET lastBlock = %s WHERE table_name = 'nft_buys'"
c.execute(query, (to_block,))
conn.commit()
conn.close()
def index_nft_bought_data():
connected = verify_db_connection()
if not connected:
print("Unable to connect to the database")
return False
create_nft_tables()
web3, event = connect_and_get_Bought_event()
current_block = get_last_block("nft_buys")
latest_block= web3.eth.get_block('latest')['number']
print(f"Last block indexed: {latest_block}, Current block: {current_block}", flush=True)
initial_block = 41756273
while current_block<latest_block:
if current_block==latest_block:
print("All blocks indexed", flush=True)
break
if current_block>initial_block:
from_block=current_block+1
else:
from_block=current_block
if current_block+2047>latest_block:
to_block=latest_block
else:
to_block=current_block+2047
print("indexing from {} to {}".format(from_block, to_block), flush=True)
process_nft_buy_events(web3, event, from_block, to_block)
current_block=to_block
print ("All blocks indexed", flush=True)
return True
def add_first_data_to_nft_prices():
conn = initialize_connection()
c= conn.cursor()
query = "SELECT * FROM nft_prices"
c.execute(query)
result = c.fetchone()
if result==None:
query = "INSERT INTO nft_prices (tokenId, price) VALUES (%s, %s)"
for i in range(1, 73):
c.execute(query, (i, "0"))
conn.commit()
conn.close()
def get_token_price(tokenId):
chain_url= "https://api.avax.network/ext/bc/C/rpc"
#spore nftv1 address avalanche
contract_address= "0xc2457F6Eb241C891EF74E02CCd50E5459c2E28Ea"
with open('abi/nftv1_abi.json') as abi:
nftv1_abi = json.load(abi)
web3 = Web3(Web3.HTTPProvider(chain_url))
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
contract = web3.eth.contract(address=contract_address, abi=nftv1_abi)
bazzar_mapping = contract.functions.Bazaar(tokenId).call()
price = bazzar_mapping[1]
return price
def index_nft_price_data():
conn = initialize_connection()
if not conn:
print("Unable to connect to the database")
return False
c= conn.cursor()
current_block = get_last_block("nft_prices")
chain_url= "https://api.avax.network/ext/bc/C/rpc"
web3 = Web3(Web3.HTTPProvider(chain_url))
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
latest_block= web3.eth.get_block('latest')['number']
if current_block<latest_block:
#populate nft_prices table if there are no entries
add_first_data_to_nft_prices()
#query all the tokens and update the price
query = "SELECT * FROM nft_prices"
c.execute(query)
result = c.fetchall()
for row in result:
tokenId = row[1]
price = get_token_price(tokenId)
query = "UPDATE nft_prices SET price = %s WHERE tokenId = %s"
c.execute(query, (price, tokenId))
conn.commit()
query = "UPDATE nft_control SET lastBlock = %s WHERE table_name = 'nft_prices'"
c.execute(query, (latest_block,))
conn.commit()
conn.close()
else:
print("All blocks indexed", flush=True)
return True
def nft_get_total_volume():
conn = initialize_connection()
c= conn.cursor()
query = "SELECT SUM(CAST(value AS DECIMAL)) FROM nft_buys"
c.execute(query)
result = c.fetchone()
total_volume = result[0]
conn.close()
return int(total_volume)/10**18
def nft_get_floor_price():
conn = initialize_connection()
c= conn.cursor()
#select min price that is not 0
query = "SELECT MIN(CAST(price AS DECIMAL)) FROM nft_prices WHERE CAST(price AS DECIMAL) != 0"
c.execute(query)
result = c.fetchone()
floor_price = result[0]
conn.close()
return int(floor_price)/10**18
def nft_get_last_sale():
conn = initialize_connection()
c= conn.cursor()
query = "SELECT value FROM nft_buys ORDER BY id DESC LIMIT 1"
c.execute(query)
result = c.fetchone()
last_sale = result[0]
conn.close()
return int(last_sale)/10**18
def set_nft_indexing_json(state= False, block_started=0, status="initialized"):
data = {
"indexing_in_progress": state,
"block_started": block_started,
"status": status
}
with open('nft_indexing.json', 'w+') as file:
json.dump(data, file)
def get_nft_indexing():
if not os.path.exists('nft_indexing.json'):
set_nft_indexing_json()
with open('nft_indexing.json', 'r') as file:
data = json.load(file)
return data["indexing_in_progress"], data["block_started"], data["status"]
def index_data():
try:
index_nft_price_data()
index_nft_bought_data()
current_block = get_ava_latest_block()
set_nft_indexing_json(False, current_block, "indexed")
except Exception as e:
print(e)
current_block = get_ava_latest_block()
set_nft_indexing_json(False, current_block, "reload")
return False
return True
# class IndexingThread(threading.Thread):
# def __init__(self, target=None, daemon=False):
# super(IndexingThread, self).__init__(target=target, daemon=daemon)
# self.stop = False
# def run(self):
# while not self.stop:
# print("Thread is running...")
# time.sleep(1)
# def stop_thread(self):
# self.stop = True
def nft_update_db():
last_block = get_last_block("nft_buys")
current_block = get_ava_latest_block()
(indexing_in_progress, block_started, status) = get_nft_indexing()
http_status = 200
print(f"last_block: {last_block}, current_block: {current_block}, indexing_in_progress: {indexing_in_progress}, block_started: {block_started}, status: {status}")
if last_block+100<current_block:
if not indexing_in_progress and block_started+120<current_block:
set_nft_indexing_json(True, current_block, "indexing")
thread= threading.Thread(target=index_data, daemon=True)
thread.start()
http_status = 201
elif indexing_in_progress and block_started+120>=current_block:
http_status = 202
else:
set_nft_indexing_json(False, block_started, "indexed")
(indexing_in_progress, block_started, status) = get_nft_indexing()
return {
"last_block": last_block, "http_status": http_status}
def nft_get_all_data():
conn = initialize_connection()
c= conn.cursor()
query = "SELECT tokenid, price FROM nft_prices WHERE price::numeric > 0"
c.execute(query)
result = c.fetchall()
conn.close()
data = [[item[0], int(item[1])] for item in result]
return data
def nft_get_id_data(id):
conn = initialize_connection()
c= conn.cursor()
query = "SELECT tokenid, price FROM nft_prices WHERE tokenid = %s"
c.execute(query, (id,))
result = c.fetchone()
conn.close()
data = [result[0], int(result[1])]
return data
def nft_get_data(req):
if not req:
resp= nft_get_all_data()
else:
resp= nft_get_id_data(req)
return jsonify(resp)