-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.py
124 lines (109 loc) · 5.18 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from rocketry import Rocketry
from rocketry.conds import every, after_success
import requests
from requests.exceptions import HTTPError
from requests.exceptions import Timeout
import joblib
from dotenv import load_dotenv
import os
import time
load_dotenv('keys.env')
TOKEN_CO2 = str(os.getenv('TOKEN_CO2'))
COUNTRIES = str(os.getenv('COUNTRIES'))
# Creating the Rocketry app
app = Rocketry(config={"task_execution": "async"})
def data_zones():
url = "https://api.electricitymap.org/v3/zones"
try:
response = requests.request("GET", url, timeout=3)
# If the response was successful, no Exception will be raised
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}')
except Timeout as err:
print(f'The request timed out: {err}')
except Exception as err:
print(f'Other error occurred: {err}')
else:
data=response.json()
joblib.dump(data , 'data/zones')
def data_from_country(country):
url = "https://api.co2signal.com/v1/latest"
querystring = {"countryCode":country}
headers = {
'auth-token': TOKEN_CO2,
# 'cache-control': "no-cache",
}
try:
response = requests.request("GET",
url,
headers=headers,
params=querystring,
timeout=3)
# If the response was successful, no Exception will be raised
response.raise_for_status()
except HTTPError as http_err:
print(f'HTTP error occurred: {http_err}')
except Timeout as err:
print(f'The request timed out: {err}')
except Exception as err:
print(f'Other error occurred: {err}')
else:
data=response.json()
if "data" in data:
output_data = {}
output_data = process_data(data)
joblib.dump(output_data , 'data/data'+country)
time.sleep(0.3)
# Creating some tasks
def process_data(data):
processed_data = {}
renewable_share= 100 - data['data']['fossilFuelPercentage']
if renewable_share <= 20:
processed_data['name'] = 'Dependiendo de combustibles fósiles'
processed_data['description'] = "Usamos un porcentaje de energía renovable menor al 20%, estamos en proceso de mejorar nuestra huella de carbono con proyectos para consumir energía de fuentes renovables"
processed_data['image'] = 'https://github.com/LuisFDuarte/SelloChainedCO2API/raw/main/images/fossil.jpg'
if renewable_share > 20 and renewable_share <=50:
processed_data['name'] = 'Empezando la transición energética'
processed_data['description'] = "Usamos un porcentaje de energía renovable menor al 50%, estamos en camino de la transición energética para mejorar nuestra huella de carbono en nuestros productos o servicios."
processed_data['image'] = 'https://github.com/LuisFDuarte/SelloChainedCO2API/raw/main/images/greenmobility.jpg'
if renewable_share > 50 and renewable_share <=80:
processed_data['name'] = 'Desarrollando la transición energética'
processed_data['description'] = "Usamos un porcentaje de energía renovable mayor al 50%, estamos activamente desarrollando nuestra transición energética para mejorar nuestra huella de carbono."
processed_data['image'] = 'https://github.com/LuisFDuarte/SelloChainedCO2API/raw/main/images/sustainableEnergy.jpg'
if renewable_share > 80 and renewable_share <=100:
processed_data['name'] = 'Construimos productos y servicios con fuentes renovables'
processed_data['description'] = "Usamos un porcentaje de energía renovable mayor al 80%, estamos comprometidos con el uso de energías renovables para reducir la huella de carbono de nuestros productos y servicios."
processed_data['image'] = 'https://github.com/LuisFDuarte/SelloChainedCO2API/raw/main/images/sustainableHouses.jpg'
processed_data['attributes'] = [
{
'trait_type': 'Energía renovable %',
'value': renewable_share
},
{
'trait_type': 'Intensidad de carbono gCO2/kWh',
'value': data['data']['carbonIntensity']
},
{
'trait_type': 'País',
'value': data['countryCode']
}
]
processed_data['datetime'] = data['data']['datetime']
processed_data['_disclaimer'] = data['_disclaimer']
return processed_data
@app.task('daily')
async def do_daily():
data_zones()
@app.task('every 1 hour')
async def do_requests():
countries = COUNTRIES.split(",")
for country in countries:
print("Doing a task for "+country)
data_from_country(country)
# @app.task(after_success(do_requests))
# async def do_after():
# print("Task Done")
if __name__ == "__main__":
# If this script is run, only Rocketry is run
app.run()