-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
556 lines (421 loc) · 18.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
import json
import logging
import os
from contextlib import asynccontextmanager
from datetime import datetime, date
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import glob
from helpers.capital_helpers.capital_main import get_capital_metrics
from helpers.code_helpers.get_github_commits_metrics import get_commits_data
from helpers.staking_helpers.staking_main import (get_wallet_stake_info,
give_more_reward_response,
get_analyze_mor_master_dict)
from helpers.staking_helpers.get_mor_amount_staked_over_time import get_mor_staked_over_time
from helpers.supply_helpers.supply_main import (get_combined_supply_data,
get_historical_prices_and_trading_volume, get_market_cap,
get_mor_holders,
get_historical_locked_and_burnt_mor)
from helpers.uniswap_helpers.get_total_combined_uniswap_position import get_combined_uniswap_position
from helpers.code_helpers.code_main import get_total_weights_and_contributors
from helpers.supply_helpers.get_chain_wise_supplies import get_chain_wise_circ_supply
from sheets_config.slack_notify import slack_notification
scheduler = AsyncIOScheduler()
LAST_CACHE_UPDATE_TIME = None
# Add debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.debug("Starting application...")
@asynccontextmanager
async def lifespan(app: FastAPI):
global LAST_CACHE_UPDATE_TIME
logger.info("Starting application initialization")
try:
# Check for required environment variables
required_vars = [
'RPC_URL', 'ARB_RPC_URL', 'BASE_RPC_URL', 'ETHERSCAN_API_KEY',
'ARBISCAN_API_KEY', 'BASESCAN_API_KEY', 'DUNE_API_KEY',
'DUNE_QUERY_ID', 'SPREADSHEET_ID', 'GITHUB_API_KEY',
'GOOGLE_APPLICATION_CREDENTIALS'
]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logger.warning(f"Missing environment variables: {', '.join(missing_vars)}")
# Don't fail startup, just log warning
# Verify GOOGLE_APPLICATION_CREDENTIALS is valid JSON if present
google_creds = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
if google_creds:
try:
json.loads(google_creds)
logger.info("Successfully validated Google credentials JSON")
except json.JSONDecodeError:
logger.warning("GOOGLE_APPLICATION_CREDENTIALS is present but contains invalid JSON")
# Initialize cache file
if not os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'w') as f:
f.write('{}')
logger.info(f"Created empty cache file at {CACHE_FILE}")
try:
logger.info("Setting up scheduler")
scheduler.add_job(update_cache_task, CronTrigger(hour='*/12'))
scheduler.start()
except Exception as scheduler_error:
logger.error(f"Scheduler error: {str(scheduler_error)}")
# Continue without scheduler
LAST_CACHE_UPDATE_TIME = datetime.now().isoformat()
logger.info("Application startup complete")
yield
except Exception as e:
logger.error(f"Error during startup: {str(e)}", exc_info=True)
yield # Still yield to allow the application to start
finally:
try:
logger.info("Shutting down scheduler")
scheduler.shutdown()
except Exception as shutdown_error:
logger.error(f"Error during shutdown: {str(shutdown_error)}")
app = FastAPI(lifespan=lifespan)
# Add CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"http://localhost:3001",
"http://localhost:3002",
"https://*.vercel.app",
"https://morpheus-stats-frontend.vercel.app",
"https://mor-stats-backend-cfcfatfxejhphfg9.centralus-01.azurewebsites.net"
], # List of allowed origins
allow_credentials=True,
allow_methods=["*"], # Allow all methods
allow_headers=["*"], # Allow all headers
)
logging.getLogger("httpx").disabled = True
logging.getLogger("dune-client").disabled = True
logging.getLogger("DuneClient").disabled = True
logging.getLogger("dune_client.models").disabled = True
logging.getLogger("dune_client").disabled = True
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CACHE_FILE = 'cache.json'
# Function to read cache from a file
def read_cache() -> dict:
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'r') as file:
data = file.read()
if data.strip(): # Check if the file is not empty
return json.loads(data)
else:
return {} # Return an empty dictionary if the file is empty
except json.JSONDecodeError as e:
# Log the error
logger.info(f"Error reading cache file: {e}")
# Return an empty dictionary if JSON is invalid
return {}
return {}
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError(f"Type not serializable")
def ensure_serializable(obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
elif isinstance(obj, dict):
return {k: ensure_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [ensure_serializable(i) for i in obj]
elif isinstance(obj, (int, float, str, bool, type(None))):
return obj
else:
return str(obj) # Convert any other type to string
def write_cache(cache_data: dict):
try:
with open(CACHE_FILE, 'w') as file:
json.dump(cache_data, file, default=json_serial)
except TypeError as e:
# This will catch issues with non-serializable objects
logging.error(f"TypeError in write_cache: {e}")
# You could add code here to identify which key is causing the problem
for key, value in cache_data.items():
try:
json.dumps({key: value}, default=json_serial)
except TypeError:
logging.error(f"Non-serializable value for key: {key}")
except Exception as e:
logging.error(f"Error writing to cache file: {e}")
async def update_cache_task() -> None:
global LAST_CACHE_UPDATE_TIME
try:
cache_data = read_cache()
cache_data['staking_metrics'] = await get_analyze_mor_master_dict()
cache_data['total_and_circ_supply'] = await get_combined_supply_data()
cache_data['prices_and_volume'] = await get_historical_prices_and_trading_volume()
cache_data['market_cap'] = await get_market_cap()
cache_data['give_mor_reward'] = give_more_reward_response()
cache_data['stake_info'] = get_wallet_stake_info()
cache_data['mor_holders_by_range'] = await get_mor_holders()
cache_data['locked_and_burnt_mor'] = await get_historical_locked_and_burnt_mor()
cache_data['protocol_liquidity'] = get_combined_uniswap_position()
cache_data['capital_metrics'] = get_capital_metrics()
cache_data['github_commits'] = get_commits_data()
cache_data['historical_mor_rewards_locked'] = ensure_serializable(await get_mor_staked_over_time())
cache_data['code_metrics'] = await get_total_weights_and_contributors()
cache_data['chain_wise_supplies'] = get_chain_wise_circ_supply()
slack_notification("Finished updating cache")
# Write the updated cache data to the cache file
try:
write_cache(cache_data)
# After all cache updates are done, update the last cache update time
LAST_CACHE_UPDATE_TIME = datetime.now().isoformat()
slack_notification(f"Finished writing cache at {LAST_CACHE_UPDATE_TIME}")
except Exception as cache_write_error:
slack_notification(f"Error writing to cache: {cache_write_error}")
logger.info(f"Error writing to cache: {cache_write_error}")
except Exception as e:
slack_notification(f"Error in cache update task: {str(e)}")
logger.info(f"Error in cache update task: {str(e)}")
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/analyze-mor-stakers")
async def get_mor_staker_analysis():
cache_data = read_cache()
if 'staking_metrics' in cache_data:
return cache_data['staking_metrics']
# If cache not available, load the data and cache it
try:
result = await get_analyze_mor_master_dict()
cache_data['staking_metrics'] = result
write_cache(cache_data)
return result
except Exception as e:
logger.error(f"Error fetching stakers: {str(e)}")
raise HTTPException(status_code=500, detail=f"An error occurred fetching stakers")
@app.get("/give_mor_reward")
async def give_more_reward():
cache_data = read_cache()
if 'give_mor_reward' in cache_data:
return cache_data['give_mor_reward']
try:
# Call the function to generate the response
res = give_more_reward_response()
# Cache the result
# Ensure that keys in the response are strings before saving to cache
if isinstance(res, dict):
res = {str(key): value for key, value in res.items()}
cache_data['give_mor_reward'] = res
write_cache(cache_data)
return res
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/get_stake_info")
async def get_stake_info():
cache_data = read_cache()
if 'stake_info' in cache_data:
return cache_data['stake_info']
try:
# Call the function to get the stake information
result = get_wallet_stake_info()
# Ensure all keys and values in the result are serializable
serializable_result = {str(key): value for key, value in result.items()}
# Cache the result
cache_data['stake_info'] = serializable_result
write_cache(cache_data)
return serializable_result
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/total_and_circ_supply")
async def total_and_circ_supply():
cache_data = read_cache()
if 'total_and_circ_supply' in cache_data:
logger.info("Returning cached total_and_circ_supply data")
# Return the combined supply data from the cache file with the 'data' key
return {"data": cache_data['total_and_circ_supply']}
# If cache not available, load the data and cache it
try:
logger.info("Cache miss for total_and_circ_supply, fetching new data")
# Add the combined supply data to the cache file without the 'data' key
cache_data['total_and_circ_supply'] = await get_combined_supply_data()
write_cache(cache_data)
# Return the combined supply data from the freshly written cache with the 'data' key
return {"data": cache_data['total_and_circ_supply']}
except Exception as e:
logger.info(f"Error fetching total_and_circ_supply data")
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/prices_and_trading_volume")
async def historical_prices_and_volume():
cache_data = read_cache()
if 'prices_and_volume' in cache_data:
return cache_data['prices_and_volume']
# If cache not available, load the data and cache it
try:
cache_data['prices_and_volume'] = await get_historical_prices_and_trading_volume()
write_cache(cache_data)
return cache_data['prices_and_volume']
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/get_market_cap")
async def market_cap():
cache_data = read_cache()
if 'market_cap' in cache_data:
return cache_data['market_cap']
# If cache not available, load the data and cache it
try:
result = await get_market_cap()
if "error" in result:
raise HTTPException(status_code=500, detail=f"An error occurred")
cache_data['market_cap'] = result
write_cache(cache_data)
return result
except Exception as e:
logger.error(f"Error fetching market cap: {str(e)}")
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/mor_holders_by_range")
async def mor_holders_by_range():
cache_data = read_cache()
if 'mor_holders_by_range' in cache_data:
logger.info("Returning cached data")
return cache_data['mor_holders_by_range']
logger.info("Cache miss, fetching new data")
try:
result = await get_mor_holders()
cache_data['mor_holders_by_range'] = result
write_cache(cache_data)
logger.info("New data fetched and cached")
return result
except Exception as e:
logger.exception(f"An error occurred in mor_holders_by_range: {str(e)}")
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/locked_and_burnt_mor")
async def locked_and_burnt_mor():
cache_data = read_cache()
if 'locked_and_burnt_mor' in cache_data:
return cache_data['locked_and_burnt_mor']
try:
# Cache the result
result = await get_historical_locked_and_burnt_mor()
cache_data['locked_and_burnt_mor'] = result
write_cache(cache_data)
# Return the combined data
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred")
@app.get("/protocol_liquidity")
async def get_protocol_liquidity():
cache_data = read_cache()
if 'protocol_liquidity' in cache_data:
return cache_data['protocol_liquidity']
try:
# Call the protocol_liquidity function with the default address
result = get_combined_uniswap_position()
if not result:
raise HTTPException(status_code=404, detail="Not Found")
# Cache the result
cache_data['protocol_liquidity'] = result
write_cache(cache_data)
return result # Return the calculated liquidity in USD, MOR, and stETH values
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred")
# Function to get the last updated time
@app.get("/last_cache_update_time")
async def get_last_cache_update_time():
return {"last_updated_time": LAST_CACHE_UPDATE_TIME}
@app.get("/capital_metrics")
async def capital_metrics():
cache_data = read_cache()
if 'capital_metrics' in cache_data:
return cache_data['capital_metrics']
try:
result = get_capital_metrics()
# Cache the result
cache_data['capital_metrics'] = result
write_cache(cache_data)
return result
except Exception as e:
logger.error(f"Error fetching capital metrics: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred while fetching capital metrics")
@app.get("/github_commits")
async def get_github_commits():
cache_data = read_cache()
if 'github_commits' in cache_data:
return cache_data['github_commits']
try:
result = get_commits_data()
cache_data['github_commits'] = result
write_cache(cache_data)
return result
except Exception as e:
logger.error(f"Error fetching github commits: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred while fetching github commits")
@app.get("/historical_mor_rewards_locked")
async def get_historical_mor_staked():
cache_data = read_cache()
if 'historical_mor_rewards_locked' in cache_data:
return cache_data['historical_mor_rewards_locked']
try:
result = await get_mor_staked_over_time()
serializable_result = ensure_serializable(result)
cache_data['historical_mor_rewards_locked'] = serializable_result
write_cache(cache_data)
return serializable_result
except Exception as e:
logger.error(f"Error fetching mor rewards locked: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred")
@app.get("/code_metrics")
async def get_code_metrics():
cache_data = read_cache()
if 'code_metrics' in cache_data:
return cache_data['code_metrics']
try:
result = await get_total_weights_and_contributors()
cache_data['code_metrics'] = result
write_cache(cache_data)
return result
except Exception as e:
logger.error(f"Error fetching code metrics: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred")
@app.get("/chain_wise_supplies")
async def get_circ_supply_by_chains():
cache_data = read_cache()
if 'chain_wise_supplies' in cache_data:
return cache_data['chain_wise_supplies']
try:
result = get_chain_wise_circ_supply()
cache_data['chain_wise_supplies'] = result
write_cache(cache_data)
return result
except Exception as e:
logger.error(f"Error fetching code in chain-wise supplies: {str(e)}")
raise HTTPException(status_code=500, detail="An error occurred")
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"env_vars_present": {
var: bool(os.getenv(var)) for var in [
'RPC_URL', 'ARB_RPC_URL', 'BASE_RPC_URL', 'ETHERSCAN_API_KEY',
'ARBISCAN_API_KEY', 'BASESCAN_API_KEY', 'DUNE_API_KEY',
'DUNE_QUERY_ID', 'SPREADSHEET_ID', 'GITHUB_API_KEY', 'SLACK_URL'
]
}
}
@app.get("/debug/check-credentials")
async def check_credentials():
mounted_path = "/config/credentials.json"
env_var = bool(os.getenv("GOOGLE_SHEETS_CREDENTIALS"))
return {
"mounted_file_exists": os.path.exists(mounted_path),
"mounted_file_readable": os.access(mounted_path, os.R_OK) if os.path.exists(mounted_path) else False,
"env_var_exists": env_var,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)