-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhelper.py
2105 lines (1810 loc) · 84.8 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import sys
from datetime import datetime, timedelta, timezone
import pytz
import pandas as pd
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import json
import numpy as np
import av
import math
import subprocess
from scipy.signal import find_peaks, medfilt
from typing import Union, List, Tuple, Optional
# Basic folder, need to be set from the class using it
FIRED_BASE_FOLDER = None
# Divider for info in files
DIVIDER = "__"
# Time format used in file names
STORE_TIME_FORMAT = '%Y_%m_%d__%H_%M_%S'
# Delimiter used for csv files
DELIMITER = "\t"
# Annotation types prefix
LIGHT_ANNOTATION = "light"
SENSOR_ANNOTATION = "sensor"
DEVICE_ANNOTATION = "device"
ANNOTATION_TYPES = [LIGHT_ANNOTATION, SENSOR_ANNOTATION, DEVICE_ANNOTATION]
# Folder names
ANNOTATION_FOLDER_NAME = "annotation"
INFO_FOLDER_NAME = "info"
RAW_FOLDER_NAME = "highFreq"
SUMMARY_FOLDER_NAME = "summary"
ONE_HZ_FOLDER_NAME = "1Hz"
FIFTY_HZ_FOLDER_NAME = "50Hz"
# Name of the combined file
COMBINED_FILE_NAME = "combined.mkv"
# Identifier for powermeters whose devices change during recording
CHANGING_DEVICE = "changing"
# The filename where changing device information is stored.
# NOTE: currently only one changing device is supported
CHANGING_DEVICE_INFO_FILENAME = "changingDevice.csv"
# Filenames for mapping and appliance info
DEVICE_MAPPING_FILENAME = "deviceMapping.json"
DEVICE_INFO_FILENAME = "deviceInfo.json"
LIGHTS_POWER_INFO_FILENAME = "lightsPower.json"
RSYNC_PWD_FILE = "rsync_pass.txt"
RSYNC_ALLOWED = True
RSYNC_ADDR = "rsync://FIRED@clu.informatik.uni-freiburg.de/FIRED/"
VERBOSE = False
# What is seen as actual NIGHT hour
# Within this time period (hours am), base power is extracted
BASE_NIGHT_RANGE = [1,5]
# These will be deleted on del
_loadedFiles = []
def __checkBase():
"""Check if base folder is set"""
if FIRED_BASE_FOLDER is None: sys.exit("\033[91mNeed to set FIRED basefolder Folder\033[0m")
def time_format_ymdhms(dt:Union[datetime, float]) -> str:
"""
Return time format as y.m.d h:m:s.
:param dt: The timestamp or date to convert to string
:type dt: datetime object or float
:return: Timeformat as \"y.m.d h:m:s\"
:rtype: str
"""
if dt is None: return "(None)"
if (isinstance(dt, datetime) is False
and isinstance(dt, timedelta) is False):
dt = datetime.fromtimestamp(dt)
return "%s.%s" % (
dt.strftime('%Y.%m.%d %H:%M:%S'),
str("%03i" % (int(dt.microsecond/1000)))
)
def filenameToTimestamp(filename: str, format: str="%Y_%m_%d__%H_%M_%S") -> Optional[float]:
r"""
Return time stamp of a given file
:param filename: filename or filepath
:type filename: str
:param format: format of time in filename, default: \"%Y_%m_%d__%H_%M_%S\"
:type format: str
:return: Timestamp or None if it can not be extracted
:rtype: float or None
"""
timestr = os.path.basename(filename).split(".")[0]
timestr = "_".join(timestr.split("_")[1:])
try: d = datetime.strptime(timestr, format)
except ValueError: d = None
if d is not None: return d.timestamp()
return None
def prettyfyApplianceName(string:str) -> str:
return " ".join([s[0].upper() + s[1:] for s in string.split(" ")])
def loadCSV(filepath: str, delimiter: str=DELIMITER) -> List[dict]:
"""
Load CSV data from given file.
First row in file determines dictionary keys
:param filepath: filepath
:type filepath: str
:param delimiter: column seperator in file.
:type delimiter: str
:return: Data in csv
:rtype: list
"""
def dateparse(timestamp:float):
return datetime.fromtimestamp(float(timestamp))
data = pd.read_csv(filepath, delimiter=delimiter, parse_dates=True, date_parser=dateparse).to_dict('r')
return data
def writeCSV(filepath: str, dataList: list, keys: List[str]=[], delimiter: str=DELIMITER):
"""
Write data to given CSV file.
If keys are not given, all keys of first entry in datalist are used.
All list entries should have the same dictionary keys
:param filepath: filepath
:type filepath: str
:param dataList: Data as list of dictionaries
:type dataList: list
:param keys: Keys of dictionary as list. If not given explicitly, all keys in list[0] entry are use.
:type keys: List of str
:param delimiter: column seperator in file.
:type delimiter: str
:return: Data in csv
:rtype: dict
"""
if len(dataList) == 0: return
# Data must be a list of dictionaries
if len(keys) == 0: keys = list(dataList[0].keys())
try:
file = open(filepath, 'w+')
file.write(delimiter.join(keys) + "\n")
for event in dataList:
file.write(delimiter.join([str(event[key]) for key in keys]) + "\n")
except Exception as e:
print(e)
def __openJson(file:str) -> Union[dict, None]:
"""
Open given json file.
:param file: filepath
:type file: str
:return: Data in json file
:rtype: dict or None
"""
mapping = None
with open(file) as json_file:
mapping = json.load(json_file)
return mapping
def getUnmonitoredDevices() -> List[str]:
"""Return list of all appliances in dataset that have no annotation and no dedicated meter."""
unmetered = getUnmeteredDevices()
lights = [l["name"] for l in loadAnnotations(LIGHT_ANNOTATION, loadData=False)]
unmonitored = [m for m in unmetered if m not in lights]
return unmonitored
def getUnmeteredDevices() -> List[str]:
"""Return list of all appliances in dataset that no dedicated meter attached."""
allDevices = getDeviceInfo()
deviceMapping = getDeviceMapping()
# All directly metered appliances
meteredAppliances = []
for k in deviceMapping:
meteredAppliances.extend(deviceMapping[k]["appliances"])
meteredAppliances.extend(getChangingDevices())
unmetered = [m for m in allDevices if m not in meteredAppliances]
return unmetered
def getSmartMeter() -> Optional[str]:
"""Return smartmeter name used in recording."""
mapping = getDeviceMapping()
# Identifier for smartmeter is meter with phase 0
try: return next(key for key in mapping if mapping[key]["phase"] == 0)
except StopIteration: return None
def getMeterList() -> Optional[List[str]]:
"""Return smartmeter name used in recording."""
mapping = getDeviceMapping()
try: return [key for key in mapping if mapping[key]["phase"] != 0]
except StopIteration: return None
def getChangingMeter() -> Optional[str]:
"""Return name for the meter for which the connected appliance changes."""
mapping = getDeviceMapping()
try: return next(key for key in mapping if CHANGING_DEVICE in mapping[key]["appliances"])
except StopIteration: return None
def getRSYNCPwdFile() -> str:
__checkBase()
return os.path.join(FIRED_BASE_FOLDER, INFO_FOLDER_NAME, RSYNC_PWD_FILE)
def getChangingDevices() -> list:
"""Return all appliances connected to the changing meter."""
info = getChangingDeviceInfo()
return list(set(i["name"].lower() for i in info))
__changingInfo = None
def getChangingDeviceInfo(startTs: Optional[float]=None, stopTs: Optional[float]=None) -> List[dict]:
"""Return info for appliances connected to the changing meter."""
__checkBase()
global __changingInfo
if startTs is None: startTs = getRecordingRange()[0]
if stopTs is None: stopTs = getRecordingRange()[1]
if __changingInfo is None:
__changingInfo = loadCSV(os.path.join(FIRED_BASE_FOLDER, ANNOTATION_FOLDER_NAME, CHANGING_DEVICE_INFO_FILENAME))
# Add safe margin for changing device
for row in __changingInfo:
row["startTs"] -= 60
row["stopTs"] += 60
returnInfo = [e for e in __changingInfo if e["stopTs"] > startTs and e["startTs"] < stopTs]
return returnInfo
__deviceMapping = None
def getDeviceMapping() -> dict:
"""Return mapping from recording meter to connected appliances."""
__checkBase()
global __deviceMapping
if __deviceMapping is None:
__deviceMapping = __openJson(os.path.join(FIRED_BASE_FOLDER, INFO_FOLDER_NAME, DEVICE_MAPPING_FILENAME))
# devInfo = getDeviceInfo()
# meters = set([devInfo[d]["submeter"] for d in devInfo])
# devMapping = {}
# for meter in meters:
# phase = next(devInfo[a]["phase"] for a in devInfo)
# appliances = [a for a in devInfo if devInfo[a]["submeter"] == meter]
# if len(appliances) > 0:
# if devInfo[appliances[0]]["timedMeter"]: appliances = ["changing"]
# devMapping[meter] = {"phase":phase,"appliances":appliances}
return __deviceMapping
__deviceInfo = None
def getDeviceInfo() -> dict:
"""Return info of all appliances."""
__checkBase()
global __deviceInfo
if __deviceInfo is None:
__deviceInfo = __openJson(os.path.join(FIRED_BASE_FOLDER, INFO_FOLDER_NAME, DEVICE_INFO_FILENAME))
return __deviceInfo
__lightsPower = None
def getLightsPowerInfo() -> dict:
"""Return power info of all lights."""
__checkBase()
global __lightsPower
if __lightsPower is None:
__lightsPower = __openJson(os.path.join(FIRED_BASE_FOLDER, INFO_FOLDER_NAME, LIGHTS_POWER_INFO_FILENAME))
return __lightsPower
def get50HzSummaryPath() -> str:
"""Return folder where 50 Hz is stored."""
__checkBase()
return os.path.join(FIRED_BASE_FOLDER, SUMMARY_FOLDER_NAME, FIFTY_HZ_FOLDER_NAME)
def get1HzSummaryPath() -> str:
"""Return folder where 1 Hz is stored."""
__checkBase()
return os.path.join(FIRED_BASE_FOLDER, SUMMARY_FOLDER_NAME, ONE_HZ_FOLDER_NAME)
def getSummaryFilePath() -> str:
"""Return filepath of combined 1 Hz summary data."""
__checkBase()
return os.path.join(FIRED_BASE_FOLDER, SUMMARY_FOLDER_NAME, ONE_HZ_FOLDER_NAME, COMBINED_FILE_NAME)
def getAnnotationPath() -> str:
"""Return folder where annotation data is stored."""
__checkBase()
return os.path.join(FIRED_BASE_FOLDER, ANNOTATION_FOLDER_NAME)
def getHighFreqPath() -> str:
"""Return folder where annotation data is stored."""
__checkBase()
return os.path.join(FIRED_BASE_FOLDER, RAW_FOLDER_NAME)
def getRecordingRange(startStr: Optional[str]=None, endStr: Optional[str]=None) -> Tuple[float, float]:
r"""
Return start and stop timestamp of recording.
If start and/or end is given, max(recordingStart, start) and min(recordingStop, end) is given.
:param startStr: start timestamp in string representation that is checked for validity or None.
Format is: \"%Y.%m.%d\" or \"%Y.%m.%d %H:%M:%S\".
:type startStr: str or None
:param stopStr: start timestamp in string representation that is checked for validity or None.
Format is: \"%Y.%m.%d\" or \"%Y.%m.%d %H:%M:%S\".
:type startStr: str or None
:return: start and end timestamp
:rtype: Tuple(float, float)
"""
summaryPath = get50HzSummaryPath()
firstFolder = min(os.path.join(summaryPath, p) for p in os.listdir(summaryPath)
if os.path.isdir(os.path.join(summaryPath, p)))
allFiles = sorted([os.path.join(firstFolder, p) for p in os.listdir(firstFolder)
if os.path.isfile(os.path.join(firstFolder, p)) and "mkv" in p.split(".")])
if len(allFiles) < 1: return [None, None]
start = filenameToTimestamp(allFiles[0])
durLast = info(allFiles[-1])["streams"][0]["duration"]
end = filenameToTimestamp(allFiles[-1]) + durLast
import pytz, time
tzOFRec = pytz.timezone('Europe/Berlin')
def localTz():
if time.daylight:
offsetHour = time.altzone / 3600
else:
offsetHour = time.timezone / 3600
return pytz.timezone('Etc/GMT%+d' % offsetHour)
thisTz = localTz()
start = datetime.fromtimestamp(start).astimezone(tzOFRec).astimezone(thisTz).timestamp()
end = datetime.fromtimestamp(end).astimezone(tzOFRec).astimezone(thisTz).timestamp()
endDate = datetime.fromtimestamp(end)
nextDay = endDate.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
if (nextDay.timestamp() - endDate.timestamp()) < 2: end = nextDay.timestamp()
if startStr is not None:
if len(startStr.split(" ")) > 1:
startTs = datetime.strptime(startStr, "%Y.%m.%d %H:%M:%S").timestamp()
else:
startTs = datetime.strptime(startStr, "%Y.%m.%d").timestamp()
start = max(startTs, start)
if endStr is not None:
if len(endStr.split(" ")) > 1:
stopTs = datetime.strptime(endStr, "%Y.%m.%d %H:%M:%S").timestamp()
else:
stopTs = datetime.strptime(endStr, "%Y.%m.%d").timestamp()
end = min(stopTs, end)
return start, end
def loadAnnotationInfo(filepath: str) -> Optional[dict]:
"""
Extract info from annotation filename.
:param filepath: filepath of annotation file
:type filepath: str
:return: Dictionary with extracted annotation info or None
:rtype: None or dict
"""
splits = os.path.basename(filepath).split(".")[0].split(DIVIDER)
type = splits[0].lower()
if type not in ANNOTATION_TYPES: return None
room = splits[1].replace("_"," ")
if type in [LIGHT_ANNOTATION, DEVICE_ANNOTATION]:
name = splits[2].replace("_"," ")
elif type == SENSOR_ANNOTATION:
name = room + " " + splits[2].replace("hum","humidity").replace("temp","temperature")
return {"type":type, "room":room, "name":name, "file":filepath}
def loadAnnotationFile(filepath: str) -> Optional[dict]:
"""
Extract info and load data from annotation filename.
:param filepath: filepath of annotation file
:type filepath: str
:return: Dictionary with extracted annotation info and data or None
:rtype: None or dict
"""
info = loadAnnotationInfo(filepath)
if info is None: return None
info["data"] = loadCSV(filepath)
return info
def loadAnnotations(type: str, loadData: bool=True) -> Optional[list]:
"""
Load all annotations of given type.
:param type: Annotation type, must be in ANNOTATION_TYPES
:type type: str
:param loadData: Load only info to speed up things or also data
:type loadData: bool
:return: List of Dictionary with extracted annotation info and data or None
:rtype: None or list
"""
if type not in ANNOTATION_TYPES: return None
annotationFolder = os.path.join(FIRED_BASE_FOLDER, ANNOTATION_FOLDER_NAME)
files = sorted([os.path.join(annotationFolder, o) for o in os.listdir(annotationFolder) if type+"__" in o])
if loadData: getter = loadAnnotationFile
else: getter = loadAnnotationInfo
annos = [getter(file) for file in files]
return annos
def _getFlip(meterOrAppliance: str) -> bool:
deviceMapping = getDeviceMapping()
if meterOrAppliance in deviceMapping:
# Devices not known, flip by default, as we measure N
if deviceMapping[meterOrAppliance]["flip"] == "unknown": return True
return deviceMapping[meterOrAppliance]["flip"]
else:
return False
def getPhase(meterOrAppliance: str) -> int:
"""
Return the live wire the device is connected to.
:param meterOrAppliance: meter name or appliance name
:type meterOrAppliance: str
:return: live wire of connected device, -1 if unknown
:rtype: int
"""
deviceMapping = getDeviceMapping()
deviceInfo = getDeviceInfo()
if meterOrAppliance in deviceMapping:
return deviceMapping[meterOrAppliance]["phase"]
elif meterOrAppliance in deviceInfo:
return deviceInfo[meterOrAppliance]["phase"]
else:
return -1
def convertToTimeRange(data: list, clipLonger: Optional[float]=None, clipTo: float=10*60) -> List[dict]:
r"""
Convert given annotation data to time range.
Range is determined between two entries.
e.g. TS1 off data + TS2 on data -> [off startTs=TS1 stopTs=TS2]
.. code-block:: python3
data = [
{ "timestamp": <TS1>, <data> },
{ "timestamp": <TS2>, <data> }
]
gets converted to:
.. code-block:: python3
data = [
{ startTs: <TS1>, stopTs: <TS2>, <data> },
{ startTs: <TS2>, stopTs: <TS2>+clipTo, <data> }
]
:param data: List of annotation entries
:type data: list
:param clipLonger: States longer that are clipped to parameter clipTo
:type clipLonger: None or float
:param clipTo: States longer than clipLonger are clipped to the given value
:type clipTo: float, default: 10 minutes
:return: List of Dictionary with converted info
:rtype: list
"""
rangeData = []
for i, entry in enumerate(data):
start = entry["timestamp"]
if i == len(data)-1:
end = start + clipTo
else:
end = data[i+1]["timestamp"]
# State longer than 24 hours?
if clipLonger is not None and end-start > clipLonger: end = start + clipTo
# Copy over old entries
newEntry = {k:v for k,v in entry.items()}
newEntry["startTs"] = start
newEntry["stopTs"] = end
rangeData.append(newEntry)
return rangeData
def typeFromApplianceName(name):
types = {"laptop":"laptop",
" pc":"pc", "light":"light", "grinder":"grinder",
"charger":"charger",
"router":"router",
"access point":"router",
"display":"monitor",}
for t in types:
if t in name: return types[t]
return name
def getApplianceList(meter: Optional[str]=None, startTs: Optional[float]=None, stopTs: Optional[float]=None) -> list:
"""
Return list of appliance names active in between the given time range.
Active is defined as metered or connected to changing device or light turned on.
NOTE: What about stove?
:param meter: one meter
:type meter: None or str
:param startTs: Time range start
:type startTs: None or float
:param stopTs: Time range stop
:type stopTs: None or float´
:return: List of appliance names
:rtype: list
"""
__checkBase()
if startTs is None: startTs = getRecordingRange()[0]
if stopTs is None: stopTs = getRecordingRange()[1]
deviceMapping = getDeviceMapping()
devices = []
if meter is None:
for key in deviceMapping: devices.extend(deviceMapping[key]["appliances"])
devices = [d for d in devices if d not in ["changing", "L1,L2,L3"]] + ["stove"]
cdInfo = getChangingDeviceInfo()
changingDevices = list(set([cdI["name"] for cdI in cdInfo if cdI["startTs"] < stopTs and cdI["stopTs"] > startTs]))
lightsInfo = loadAnnotations(LIGHT_ANNOTATION, loadData=False)
lights = [l["name"] for l in lightsInfo]
appliances = sorted(list(set(devices + changingDevices + lights)))
else:
if meter in deviceMapping:
devices.extend(deviceMapping[meter]["appliances"])
if "changing" in devices:
cdInfo = getChangingDeviceInfo()
devices = list(set([cdI["name"] for cdI in cdInfo if cdI["startTs"] < stopTs and cdI["stopTs"] > startTs]))
appliances = sorted(list(set(devices)))
return appliances
def resampleRecord(data: np.recarray, inRate: float, outRate: float) -> np.recarray:
"""
Resample a given numpy record array
:param startTs: Time range start
:type startTs: None or float
:param stopTs: Time range stop
:type stopTs: None or float
:return: List of appliance names
:rtype: list
"""
if inRate == outRate: return data
resampleFac = inRate/outRate
# NOTE: This is done for each measure
# TODO: Maybe we can make this quicker somehow
oldX = np.arange(0, len(data))
newX = np.arange(0, len(data), resampleFac)
data2 = np.zeros(len(newX), dtype=data.dtype)
for measure in data.dtype.names:
data2[measure] = np.interp(newX, oldX, data[measure])
data = data2
return data
def bestBasePowerTimeRange(startTs: float, stopTs: float) -> List[dict]:
"""
Return time ranges to extract base power from given a super time range.
If no night lies between the time range, the time before the given night is used.
NOTE: Will cause problems for datasets that start within a day and only this day given
:param startTs: Time range start
:type startTs: float
:param stopTs: Time range stop
:type stopTs: float
:return: List of dict with startTs and stopTs keys as time ranges
:rtype: list
"""
# More than a day between data
startDate = datetime.fromtimestamp(startTs)
stopDate = datetime.fromtimestamp(stopTs)
timeranges = []
date = startDate
# if we cannot use startnight
if startDate.hour >= BASE_NIGHT_RANGE[0]: date = date.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
while date <= stopDate:
tsStart = date.replace(hour=BASE_NIGHT_RANGE[0], minute=0, second=0, microsecond=0).timestamp()
tsStop = date.replace(hour=BASE_NIGHT_RANGE[1], minute=0, second=0, microsecond=0).timestamp()
timeranges.append({"startTs":tsStart, "stopTs":tsStop})
date += timedelta(days=1)
# if we cannot use stopnight
if stopDate.hour < BASE_NIGHT_RANGE[1]: del timeranges[-1]
if len(timeranges) == 0:
# use night before then
tsStart = startDate.replace(hour=BASE_NIGHT_RANGE[0], minute=0, second=0, microsecond=0).timestamp()
tsStop = startDate.replace(hour=BASE_NIGHT_RANGE[1], minute=0, second=0, microsecond=0).timestamp()
timeranges.append({"startTs":tsStart, "stopTs":tsStop})
return timeranges
def UTCfromLocalTs(ts):
tz = getTimeZone()
date = datetime.fromtimestamp(ts)
date = tz.localize(date)
return date.timestamp()
def getTimeZone():
return pytz.timezone("Europe/Berlin")
def getBasePower(samplingrate: int, startTs: Optional[float] = None, stopTs: Optional[float] = None,
phase: Union[None, int, List[int]] = None) -> List[dict]:
"""
Return base power dict with given samplingrate for given time and phase.
:param samplingrate: samplingrate of returned power
:type samplingrate: int
:param startTs: Time range start
:type startTs: float or None
:param stopTs: Time range stop
:type stopTs: float or None
:param phase: Given phase (grid line L<phase>). Either 1,2,3 or a combination as list or None
:type phase: list(int), int or None
:return: List of dict with power data
:rtype: list(dict)
"""
__checkBase()
if startTs is None:
startTs = getRecordingRange()[0]
if stopTs is None:
stopTs = getRecordingRange()[1]
if phase is None:
phase = [1, 2, 3]
if not isinstance(phase, list):
phase = [phase]
# get ranges from where to compute base powers
ranges = bestBasePowerTimeRange(startTs, stopTs)
smartmeterName = getSmartMeter()
meters = getMeterList()
deviceMapping = getDeviceMapping()
# Construct yet empty base power list
basePowers = {}
newSize = int((stopTs - startTs) * samplingrate)
rangeSize = sum(int((r["stopTs"] - r["startTs"]) * samplingrate) for r in ranges)
powers = ["p", "q", "s"]
dt = [(m, '<f4') for m in powers]
for p in phase:
data = np.recarray((rangeSize,), dtype=dt).view(np.recarray)
# Loop over all ranges
index = 0
for r in ranges:
# get smartmeter data
smData = getMeterPower(smartmeterName, samplingrate, startTs=r["startTs"], stopTs=r["stopTs"], phase=p)["data"]
# add it for each phase
for m in powers:
data[m] = 0
dataDict = {"title": "basepower", "name": "basepower l" + str(p), "phase": p, "data": data,
"timestamp": startTs, "type": "audio", "samplingrate": samplingrate, "measures": powers}
basePowers[p] = dataDict
basePowers[p]["data"][m][index:index + len(smData)] = smData[m]
# load meter data
for meter in meters:
meter_phase = deviceMapping[meter]["phase"]
if meter_phase not in p:
continue
mData = getMeterPower(meter, samplingrate, startTs=r["startTs"], stopTs=r["stopTs"])["data"]
# Subtract it from each phase
for m in powers:
basePowers[p]["data"][m][index:index + len(mData)] -= mData[m]
index += len(smData)
# Prevent that base power can be negative
for m in powers:
indices = np.where(basePowers[p]["data"][m] < 0)
basePowers[p]["data"][m][indices] = 0
# Calculate base power
for p in phase:
data = np.recarray((newSize,), dtype=dt).view(np.recarray)
for m in powers:
hist, bin_edges = np.histogram(basePowers[p]["data"][m])
# Sort histogram based on bins with most entries
idx = list(reversed(np.argsort(hist)))[:2]
# Mean of 2 bins with most entries in histogram
mean = np.sum([bin_edges[i] * hist[i] for i in idx]) / np.sum([hist[i] for i in idx])
data[m][:] = mean
basePowers[p]["data"] = data
return [basePowers[p] for p in phase]
def getPowerStove(samplingrate: int, startTs: Optional[float]=None, stopTs: Optional[float]=None, phase: Union[None,int,List[int]]=None) -> List[dict]:
"""
Reconstruct power of stove, if it is not directly monitored, it might be reconstructable form smartmeter data.
:param samplingrate: samplingrate of returned power
:type samplingrate: int
:param startTs: Time range start
:type startTs: float or None
:param stopTs: Time range stop
:type stopTs: float or None
:param phase: Given phase (grid line L<phase>). Either 1,2,3 or a combination as list or None
:type phase: list(int), int or None
:return: List of dict with power data
:rtype: list(dict)
"""
__checkBase()
if startTs is None:
startTs = getRecordingRange()[0]
if stopTs is None:
stopTs = getRecordingRange()[1]
if phase is None:
phase = [1, 2, 3]
if not isinstance(phase, list):
phase = [phase]
# Init return dict
data = {}
for p in phase:
data[p] = {"phase": p, "samplingrate": samplingrate, "title": "stove", "name": "stove l" + str(p), "data": None}
# Get smartmeter name
smartmeterName = getSmartMeter()
# Calculate base power consumption
base = getBasePower(samplingrate, startTs=startTs, stopTs=stopTs, phase=phase)
# Get total power consumption
smartmeterData = {}
for p in phase:
smartmeterData[p] = getMeterPower(smartmeterName, samplingrate, startTs=startTs, stopTs=stopTs, phase=p)
# Get individual meter data
deviceMapping = getDeviceMapping()
# Load power for all meters within that phase
powerMeters = [m for m in getMeterList() if deviceMapping[m]["phase"] in phase]
allMeterPower = [getMeterPower(name, samplingrate, startTs=startTs, stopTs=stopTs) for name in powerMeters]
for meter in allMeterPower:
meterName = meter["title"]
p = deviceMapping[meterName]["phase"]
if data[p]["data"] is None:
data[p]["data"] = meter["data"]
data[p]["measures"] = meter["measures"]
data[p]["timestamp"] = meter["timestamp"]
else:
for m in data[p]["measures"]:
data[p]["data"][m] += meter["data"][m]
# Lights are neglected, as stove consumes way more power
for p in data:
b = next(b for b in base if b["phase"] == p)
for m in data[p]["measures"]:
data[p]["data"][m] = smartmeterData[p]["data"][m] - data[p]["data"][m] - b["data"][m]
data[p]["data"][m][data[p]["data"]["s"] < 800] = 0
data[p]["data"][m][data[p]["data"]["p"] < 800] = 0
pass
# peaks, props = find_peaks(data[p]["data"]["s"], threshold=800, width=1)
# Filter peaks which are smaller than 2s as this cannot be the stove
peaks, props = find_peaks(data[p]["data"]["s"], threshold=800, width=(1, int(1.0*samplingrate)))
# There may be remaining peaks from slightly misaligned data at higher samplingrates, we want to remove them
for m in data[p]["measures"]:
# Force them to be zero
data[p]["data"][m][peaks] = 0
# median filter data
N = max(1, int(5.0 * samplingrate))
if (N % 2) == 0:
N += 1 # filter has to be odd
if N > 1:
for m in data[p]["measures"]:
# Force them to be zero
data[p]["data"][m] = medfilt(data[p]["data"][m], N)
return [data[p] for p in data]
def getReconstructibleDevices() -> dict:
"""Return dict for reconstructible devices with handler function"""
rec = {"stove":getPowerStove}
return rec
def delDownloads():
"""Delete the files loaded via rsync."""
global _loadedFiles
for f in _loadedFiles:
try: subprocess.check_output(["rm", f])
# Errors like not exist
except subprocess.CalledProcessError: pass
_loadedFiles = []
def getMeterChunk(meter: str, timeslice: float, data: str="VI", samplingrate: Optional[float]=None, startTs: Optional[float]=None, stopTs: Optional[float]=None, channel: Optional[int]=0) -> dict:
"""
Return data of given meter in a as chunks of given size.
:param meter: Name of meter; must be in getMeterList().
:type meter: str
:param timeslice: time slice over which to iterate
:type timeslice: float
:param samplingrate: samplingrate of returned power, None for default
:type samplingrate: int
:param startTs: Time range start
:type startTs: float or None
:param stopTs: Time range stop
:type stopTs: float or None
:return: power data
:rtype: dict
"""
__checkBase()
if startTs is None: startTs = getRecordingRange()[0]
if stopTs is None: stopTs = getRecordingRange()[1]
# This will download it over rsync if this is required
files = getMeterFiles(meter, samplingrate, data=data, startTs=startTs, stopTs=stopTs)
if VERBOSE: print(files)
current = startTs
if len(files) == 0: return
deviceMapping = getDeviceMapping()
finish = False
timestamp = startTs
chunkI = 0
fileI = 0
fileIndex = 0
chunk = None
eof = None
sendSamples = 0
samplesToSend = 1
while fileIndex < len(files) and sendSamples < samplesToSend:
# Not inited at all
if chunk is None:
try:
inf = info(files[fileIndex])["streams"][0]
except Exception as e:
print(e)
print(fileIndex)
print(files[fileIndex])
print(info(files[fileIndex]))
return None
start = timestamp-inf["timestamp"]
end = inf["timestamp"]+inf["duration"]
dur = -1
if stopTs < end: dur = stopTs-(inf["timestamp"]+start)
audio = loadAudio(files[fileIndex], streamsToLoad=[channel], start=start, duration=dur)
if audio is None or len(audio) == 0: return
data = audio[0]
data["phase"] = deviceMapping[meter]["phase"]
if VERBOSE and samplingrate is not None and samplingrate != data["samplingrate"]:
print("Have to resmaple data, this ist typically slow")
chunkSize = int(timeslice*data["samplingrate"])
samplesToSend = (stopTs-startTs)*data["samplingrate"]
chunk = np.recarray((chunkSize,), dtype=data["data"].dtype).view(np.recarray)
eof = data["timestamp"] + data["duration"]
fileI = int((timestamp - data["timestamp"])*data["samplingrate"])
if VERBOSE:
print(time_format_ymdhms(data["timestamp"]) + "->" + time_format_ymdhms(eof))
while fileI < len(data["data"]):
# Fill chunk with available data
fileJ = min(len(data["data"]), fileI + (chunkSize - chunkI))
samples = fileJ - fileI
# print("{}->{}: {}".format(fileI, fileJ, len(data["data"])))
# print("{}->{}: {}".format(chunkI, chunkI+samples, chunkSize))
# print("_"*50)
chunk[chunkI:chunkI+samples] = data["data"][fileI:fileJ]
chunkI += samples
fileI += samples
# Total chunk is written
if chunkI >= chunkSize:
# Copy over dict entries
dataReturn = {k:i for k,i in data.items() if k != "data"}
dataReturn["data"] = chunk
if samplingrate is not None and samplingrate != data["samplingrate"]:
dataReturn["data"] = resampleRecord(dataReturn["data"], data["samplingrate"], samplingrate)
dataReturn["samplingrate"] = samplingrate
dataReturn["timestamp"] = timestamp
distanceToStop = stopTs - (dataReturn["timestamp"] + len(dataReturn["data"])/data["samplingrate"])
if distanceToStop <= 0:
sampleEnd = int((stopTs - dataReturn["timestamp"])*data["samplingrate"])
dataReturn["data"] = dataReturn["data"][:sampleEnd]
finish = True
dataReturn["samples"] = len(dataReturn["data"])
dataReturn["duration"] = dataReturn["samples"]/data["samplingrate"]
sendSamples = sendSamples+len(dataReturn["data"])
yield dataReturn
chunk = np.recarray((chunkSize,), dtype=data["data"].dtype).view(np.recarray)
timestamp += chunkSize/data["samplingrate"]
chunkI = 0
if finish: return
missingSamples = int((stopTs - eof)*data["samplingrate"])
if VERBOSE:
print("Missing:{}".format(missingSamples))
print("Samples Send: {}/{}".format(sendSamples, samplesToSend))
# We reached end of file
# Load next and check distance
fileIndex += 1
if fileIndex < len(files):
dur = stopTs-eof
# Strange things if we use exact dur, so use an extra second, it is cut later
# NOTE: Maybe we are missing samples sometimes due to milliseconds resolution of pyav?
audio = loadAudio(files[fileIndex], streamsToLoad=[channel], duration=dur+1)
if audio is None or len(audio) == 0: return
data = audio[0]
fileI = 0
missingSamples = int((data["timestamp"] - eof)*data["samplingrate"])
if VERBOSE and missingSamples > 0:
print("Gap between: {}->{}, {}samples".format(time_format_ymdhms(eof), time_format_ymdhms(data["timestamp"]), missingSamples))
eof = data["timestamp"] + data["duration"]
# If file has missing samples or missing samples when no file is left
if missingSamples > 0:
while missingSamples > 0:
samples = min(missingSamples, (chunkSize - chunkI))
# Fill with zeros
for m in chunk.dtype.names: chunk[m][chunkI:chunkI+samples] = 0
chunkI += samples
missingSamples -= samples
# Total chunk is written
if chunkI >= chunkSize:
# Copy over dict entries
dataReturn = {k:i for k,i in data.items() if k != "data"}
dataReturn["data"] = chunk
if samplingrate is not None and samplingrate != data["samplingrate"]:
dataReturn["data"] = resampleRecord(dataReturn["data"], data["samplingrate"], samplingrate)
dataReturn["samplingrate"] = samplingrate
dataReturn["timestamp"] = timestamp
distanceToStop = stopTs - (dataReturn["timestamp"] + len(dataReturn["data"])/data["samplingrate"])
if distanceToStop <= 0:
sampleEnd = int((stopTs - dataReturn["timestamp"])*data["samplingrate"])
dataReturn["data"] = dataReturn["data"][:sampleEnd]
finish = True
dataReturn["samples"] = len(dataReturn["data"])
dataReturn["duration"] = dataReturn["samples"]/data["samplingrate"]
sendSamples += int(dataReturn["samples"])
yield dataReturn
chunk = np.recarray((chunkSize,), dtype=data["data"].dtype).view(np.recarray)
timestamp += chunkSize/data["samplingrate"]
chunkI = 0
if finish: return
def getMeterVI(meter: str, samplingrate: Optional[float]=None, startTs: Optional[float]=None, stopTs: Optional[float]=None, phase: Optional[int]=None, smartmeterMergePhases: Optional[bool] = False) -> dict:
"""
Return vi of given meter.
:param meter: Name of meter; must be in getMeterList().
:type meter: str
:param samplingrate: samplingrate of returned power, None for default
:type samplingrate: int
:param startTs: Time range start
:type startTs: float or None
:param stopTs: Time range stop
:type stopTs: float or None
:return: power data
:rtype: dict
"""