-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstore_data.py
executable file
·380 lines (299 loc) · 15.6 KB
/
store_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#!/usr/bin/env python3
# pylint: disable=invalid-name
"""Use the public Octopus Energy API to request the half-hourly rates for the Agile tariff
for a particular region, and insert these into an SQLite database, dealing with duplicate
requests and pruning old data so that the DB doesn't grow infinitely."""
import sqlite3
import os
import sys
import time
from reprlib import Repr
from datetime import datetime, timedelta
from urllib.request import pathname2url
import pytz
import requests
import argparse
import eco_indicator
AGILE_API_BASE = ('https://api.octopus.energy/v1/products/')
AGILE_IMPORT_FLEX_100 = ('AGILE-24-04-03/electricity-tariffs/E-1R-AGILE-24-04-03-')
AGILE_EXPORT = ('AGILE-OUTGOING-19-05-13/electricity-tariffs/E-1R-AGILE-OUTGOING-19-05-13-')
AGILE_REGIONS = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'P', 'N', 'J', 'H', 'K', 'L', 'M']
AGILE_API_TAIL = "/standard-unit-rates/"
TRACKER_ELECTRICITY = 'SILVER-VAR-22-10-21/electricity-tariffs/E-1R-SILVER-VAR-22-10-21-'
TRACKER_GAS = 'SILVER-VAR-22-10-21/gas-tariffs/G-1R-SILVER-VAR-22-10-21-'
CARBON_API_BASE = ('https://api.carbonintensity.org.uk')
CARBON_REGIONS = {'A': '/regional/intensity/{from_time}/fw48h/regionid/10',
'B': '/regional/intensity/{from_time}/fw48h/regionid/9',
'C': '/regional/intensity/{from_time}/fw48h/regionid/13',
'D': '/regional/intensity/{from_time}/fw48h/regionid/6',
'E': '/regional/intensity/{from_time}/fw48h/regionid/8',
'F': '/regional/intensity/{from_time}/fw48h/regionid/4',
'G': '/regional/intensity/{from_time}/fw48h/regionid/3',
'P': '/regional/intensity/{from_time}/fw48h/regionid/1',
'N': '/regional/intensity/{from_time}/fw48h/regionid/2',
'J': '/regional/intensity/{from_time}/fw48h/regionid/14',
'H': '/regional/intensity/{from_time}/fw48h/regionid/12',
'K': '/regional/intensity/{from_time}/fw48h/regionid/7',
'L': '/regional/intensity/{from_time}/fw48h/regionid/11',
'M': '/regional/intensity/{from_time}/fw48h/regionid/5',
'Z': '/intensity/{from_time}/fw48h'}
MAX_RETRIES = 15 # give up once we've tried this many times to get the prices from the API
parser = argparse.ArgumentParser(description=('Read data from a remote API and store it in a local SQlite database'))
parser.add_argument('--conf', '-c', default='config.yaml', help='specify config file')
parser.add_argument('--print', '-p', action='store_true', help='print data which was retrieved (JSON format)')
args = parser.parse_args()
conf_file = args.conf
def get_data_from_api(_request_uri: str) -> dict:
"""using the provided URI, request data from the API and return a JSON object.
Try to handle errors gracefully with retries when appropriate."""
# Try to handle issues with the API - rare but do happen, using an
# exponential sleep time up to 2**14 (16384) seconds, approx 4.5 hours.
# We will keep trying for over 9 hours and then give up.
retry_count = 0
my_repr = Repr()
my_repr.maxstring = 80 # let's avoid truncating our error messages too much
while retry_count <= MAX_RETRIES:
if retry_count == MAX_RETRIES:
raise SystemExit('API retry limit exceeded.')
try:
success = False
response = requests.get(_request_uri, timeout=5)
response.raise_for_status()
if response.status_code // 100 == 2:
success = True
except requests.exceptions.HTTPError as error:
print(('API HTTP error ' + str(response.status_code) +
',retrying in ' + str(2**retry_count) + 's'))
time.sleep(2**retry_count)
retry_count += 1
except requests.exceptions.ConnectionError as error:
print(('API connection error: ' + my_repr.repr(str(error)) +
', retrying in ' + str(2**retry_count) + 's'))
time.sleep(2**retry_count)
retry_count += 1
except requests.exceptions.Timeout:
print('API request timeout, retrying in ' + str(2**retry_count) + 's')
time.sleep(2**retry_count)
retry_count += 1
except requests.exceptions.RequestException as error:
raise SystemExit('API Request error: ' + str(error)) from error
if success:
print('API request successful, status ' + str(response.status_code) + '.')
if args.print: print(response.json())
return response.json()
def insert_data(data: dict, is_gas: bool):
"""Insert our data records one by one, keep track of how many were successfully inserted
and print the results of the insertion."""
num_rows_inserted = 0
if config['Mode'] == 'agile_import' or config['Mode'] == 'agile_export':
for result in data['results']:
# insert_record returns false if it was a duplicate record
# or true if a record was successfully entered.
if insert_record(result['valid_from'], result['value_inc_vat'], is_gas):
num_rows_inserted += 1
if num_rows_inserted > 0:
lastslot = datetime.strftime(datetime.strptime(
data['results'][0]['valid_to'], "%Y-%m-%dT%H:%M:%SZ"), "%H:%M on %A %d %b")
print(str(num_rows_inserted) + ' prices were inserted, ending at ' + lastslot + '.')
else:
print('No prices were inserted - maybe we have them'
' already, or Octopus are late with their update.')
if config['Mode'] == "tracker":
for result in data['results']:
# insert_record returns false if it was a duplicate record
# or true if a record was successfully entered.
if insert_record(result['valid_from'], result['value_inc_vat'], is_gas):
num_rows_inserted += 1
if num_rows_inserted > 0:
lastslot = datetime.strftime(datetime.strptime(
data['results'][0]['valid_to'], "%Y-%m-%dT%H:%M:%SZ"), "%H:%M on %A %d %b")
print(str(num_rows_inserted) + ' prices were inserted, ending at ' + lastslot + '.')
else:
print('No prices were inserted - maybe we have them'
' already, or Octopus are late with their update.')
if config['Mode'] == 'carbon':
if config['DNORegion'] == 'Z':
carbon_data = data['data']
else:
carbon_data = data['data']['data']
for result in carbon_data:
if insert_record(result['from'], result['intensity']['forecast'], is_gas):
num_rows_inserted += 1
if num_rows_inserted > 0:
lastslot = datetime.strftime(datetime.strptime(
carbon_data[47]['from'], "%Y-%m-%dT%H:%MZ"), "%H:%M on %A %d %b")
print(str(num_rows_inserted) + ' intensities were inserted, '
'ending at ' + lastslot + '.')
else:
print('No values were inserted - maybe we have them'
' already, or carbonintensity.org.uk are late with their update.')
def insert_record(valid_from: str, data_value: float, is_gas: bool) -> bool:
"""Assuming we still have a cursor, take a tuple and stick it into the database.
Return False if it was a duplicate record (not inserted) and True if a record
was successfully inserted."""
if not cursor:
raise SystemExit('Database connection lost!')
if config['Mode'] == 'agile_import' or config['Mode'] == 'agile_export':
# make the date/time work for SQLite, it's picky about the format,
# easier to use the built in SQLite datetime functions
# when figuring out what records we want rather than trying to roll our own
valid_from_formatted = datetime.strftime(
datetime.strptime(valid_from, "%Y-%m-%dT%H:%M:%SZ"), "%Y-%m-%d %H:%M:%S")
data_tuple = (valid_from_formatted, data_value)
# print(data_tuple) # debug
try:
cursor.execute(
"INSERT INTO 'eco'('valid_from', 'value_inc_vat') VALUES (?, ?) ON CONFLICT(valid_from) DO UPDATE SET value_inc_vat=excluded.value_inc_vat;", data_tuple)
except sqlite3.Error as error:
raise SystemError('Database error: ' + str(error)) from error
else:
return True # the record was inserted
if config['Mode'] == "tracker":
# make the date/time work for SQLite, it's picky about the format,
# easier to use the built in SQLite datetime functions
# when figuring out what records we want rather than trying to roll our own
valid_from_formatted = datetime.strftime(
datetime.strptime(valid_from, "%Y-%m-%dT%H:%M:%SZ"), "%Y-%m-%d %H:%M:%S")
data_tuple = (valid_from_formatted, data_value)
# print(data_tuple) # debug
if is_gas:
try:
cursor.execute(
"INSERT INTO 'eco'('valid_from', 'gas_value_inc_vat') VALUES (?, ?) ON CONFLICT(valid_from) DO UPDATE SET gas_value_inc_vat=excluded.gas_value_inc_vat;", data_tuple)
except sqlite3.Error as error:
raise SystemError('Database error: ' + str(error)) from error
else:
return True # the record was inserted
elif not is_gas:
try:
cursor.execute(
"INSERT INTO 'eco'('valid_from', 'value_inc_vat') VALUES (?, ?) ON CONFLICT(valid_from) DO UPDATE SET value_inc_vat=excluded.value_inc_vat;", data_tuple)
except sqlite3.Error as error:
raise SystemError('Database error: ' + str(error)) from error
else:
return True # the record was inserted
if config['Mode'] == 'carbon':
valid_from_formatted = datetime.strftime(
datetime.strptime(valid_from, "%Y-%m-%dT%H:%MZ"), "%Y-%m-%d %H:%M:%S")
data_tuple = (valid_from_formatted, data_value)
# print(data_tuple) # debug
try:
cursor.execute(
"INSERT INTO 'eco'('valid_from', 'intensity') VALUES (?, ?) ON CONFLICT(valid_from) DO UPDATE SET intensity=excluded.intensity;", data_tuple)
except sqlite3.Error as error:
raise SystemError('Database error: ' + str(error)) from error
else:
return True # the record was inserted
return False
def remove_old_data(age: str):
"""Delete old data from the database, we don't want to display those and we don't want it
to grow too big. 'age' must be a string that SQLite understands"""
if not cursor:
raise SystemExit('Database connection lost before pruning data!')
try:
cursor.execute("SELECT COUNT(*) FROM eco "
"WHERE valid_from < datetime('now', '-" + age + "')")
selected_rows = cursor.fetchall()
num_old_rows = selected_rows[0][0]
# I don't know why this doesn't just return an int rather than a list of a list of an int
if num_old_rows > 0:
cursor.execute("DELETE FROM eco WHERE valid_from < datetime('now', '-" + age + "')")
print(str(num_old_rows) + ' unneeded data points from the past were deleted.')
else:
print('There were no old data points to delete.')
except sqlite3.Error as error:
print('Failed while trying to remove old data points from database: ', error)
os.chdir(sys.path[0])
config = eco_indicator.get_config(conf_file)
# print('conf_file: ') # debug
# print(conf_file) # debug
# print('config: ') # debug
# print(config) # debug
try:
# connect to the database in rw mode so we can catch the error if it doesn't exist
DB_URI = 'file:{}?mode=rw'.format(pathname2url('eco_indicator.sqlite'))
conn = sqlite3.connect(DB_URI, uri=True)
cursor = conn.cursor()
print('Connected to database...')
except sqlite3.OperationalError:
# handle missing database case
print('No database found. Creating a new one...')
conn = sqlite3.connect('eco_indicator.sqlite')
cursor = conn.cursor()
# UNIQUE constraint prevents duplication of data on multiple runs of this script
# ON CONFLICT FAIL allows us to count how many times this happens
cursor.execute('CREATE TABLE eco (valid_from STRING PRIMARY KEY ON CONFLICT REPLACE, '
'value_inc_vat REAL, intensity REAL, gas_value_inc_vat REAL)')
conn.commit()
print('Database created... ')
if config['Mode'] == 'agile_import':
DNO_REGION = config['DNORegion']
AGILE_CAP = config['AgileCap']
if DNO_REGION in AGILE_REGIONS:
print('Selected region ' + DNO_REGION)
else:
raise SystemExit('Error: DNO region ' + DNO_REGION + ' is not a valid choice.')
if AGILE_CAP == 35:
AGILE_VERSION = AGILE_IMPORT_35
elif AGILE_CAP == 55:
AGILE_VERSION = AGILE_IMPORT_55
elif AGILE_CAP == 78:
AGILE_VERSION = AGILE_IMPORT_78
elif AGILE_CAP == 100:
AGILE_VERSION = AGILE_IMPORT_VAR_100
elif AGILE_CAP == 101:
AGILE_VERSION = AGILE_IMPORT_FLEX_100
else:
raise SystemExit('Error: Agile cap of ' + str(AGILE_CAP) + ' refers to an unknown tariff.')
# Build the API for the request - public API so no authentication required
request_uri = (AGILE_API_BASE + AGILE_VERSION + DNO_REGION + AGILE_API_TAIL)
data_rows = get_data_from_api(request_uri)
insert_data(data_rows, False)
elif config['Mode'] == 'carbon':
DNO_REGION = config['DNORegion']
if DNO_REGION in CARBON_REGIONS:
print('Selected region ' + DNO_REGION)
else:
raise SystemExit('Error: DNO region ' + DNO_REGION + ' is not a valid choice.')
# Build the API for the request - public API so no authentication required
request_time = datetime.now().astimezone(pytz.utc).isoformat()
request_uri = (CARBON_API_BASE + CARBON_REGIONS[DNO_REGION])
request_uri = request_uri.format(from_time=request_time)
data_rows = get_data_from_api(request_uri)
insert_data(data_rows, False)
elif config['Mode'] == 'agile_export':
DNO_REGION = config['DNORegion']
if DNO_REGION in AGILE_REGIONS:
print('Selected region ' + DNO_REGION)
else:
raise SystemExit('Error: DNO region ' + DNO_REGION + ' is not a valid choice.')
# Build the API for the request - public API so no authentication required
request_uri = (AGILE_API_BASE + AGILE_EXPORT + DNO_REGION + AGILE_API_TAIL)
data_rows = get_data_from_api(request_uri)
insert_data(data_rows, False)
elif config['Mode'] == 'tracker':
DNO_REGION = config['DNORegion']
if DNO_REGION in AGILE_REGIONS:
print('Selected region ' + DNO_REGION)
else:
raise SystemExit('Error: DNO region ' + DNO_REGION + ' is not a valid choice.')
# Build the API for the request - public API so no authentication required
request_uri = (AGILE_API_BASE + TRACKER_ELECTRICITY + DNO_REGION + AGILE_API_TAIL)
period_from = datetime.now() - timedelta(days=1)
period_from = period_from.strftime("%Y-%m-%dT%H:%M:%SZ")
period_to = datetime.now() + timedelta(days=2)
period_to = period_to.strftime("%Y-%m-%dT%H:%M:%SZ")
request_uri = request_uri + "?period_from=" + period_from + "&period_to=" + period_to
data_rows = get_data_from_api(request_uri)
insert_data(data_rows, False)
request_uri = (AGILE_API_BASE + TRACKER_GAS + DNO_REGION + AGILE_API_TAIL)
request_uri = request_uri + "?period_from=" + period_from + "&period_to=" + period_to
data_rows = get_data_from_api(request_uri)
insert_data(data_rows, True)
else:
raise SystemExit('Error: Invalid mode ' + config['Mode'] + ' passed to store_data.py')
remove_old_data('3 days')
# finish up the database operation
if conn:
conn.commit()
conn.close()