-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
137 lines (115 loc) · 4.41 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import re
from datetime import datetime, timedelta
from dateutil import tz
from dateutil.tz.tz import tzfile
from typing import Optional
from suntime import Sun
import fiftyone as fo
import fiftyone.operators as foo
from fiftyone import ViewField as F
def timestamp_from_filepath(f: str, regex: str) -> datetime:
matches = re.search(regex, f).groups()
matches = ['00' if i is None else i for i in matches] # Replace not found items with "00"
year, month, day, hour, minute, second = matches
return datetime.strptime(f'{year}-{month}-{day}_{hour}-{minute}-{second}', "%Y-%m-%d_%H-%M-%S")
def get_timeofday(
dt: datetime,
geo: list,
tzinfo: tzfile,
dawn_length: timedelta = timedelta(minutes=15),
dusk_length: timedelta = timedelta(minutes=15),
sunrise_length: timedelta = timedelta(minutes=15),
sunset_length: timedelta = timedelta(minutes=15),
morning_length: timedelta = timedelta(hours=1),
evening_length: timedelta = timedelta(hours=1)
) -> str:
sun = Sun(*geo)
sunrise = sun.get_sunrise_time(dt, tzinfo)
morning_end = sunrise + morning_length
sunset = sun.get_sunset_time(dt, tzinfo)
# Fix sunset as stated here: https://github.com/SatAgro/suntime/issues/12#issuecomment-621755084
if sunset < sunrise:
sunset = sunset + timedelta(days=1)
evening_begin = sunset - evening_length
if dt <= sunrise - sunrise_length - dawn_length:
return "night"
elif dt <= sunrise - sunrise_length:
return "dawn"
elif dt <= sunrise + sunrise_length:
return "sunrise"
elif dt <= morning_end:
return "morning"
elif dt < evening_begin:
return "day"
elif dt < sunset - sunset_length:
return "evening"
elif dt <= sunset + sunset_length:
return "sunset"
elif dt <= sunset + sunset_length + dusk_length:
return "dusk"
else:
return "night"
def compute_timestamps(dt: datetime, tzinfo: tzfile, geo: Optional[list] = None) -> list:
dt = dt.replace(tzinfo=tzinfo)
weekday = dt.weekday()
time = int(dt.hour) + (int(dt.minute) / 60) + (int(dt.second) / 6000)
timeofday = None
if geo:
timeofday = get_timeofday(dt=dt, geo=geo, tzinfo=tzinfo)
return weekday, time, timeofday
################################################################
################################################################
class ComputeTimestamps(foo.Operator):
@property
def config(self):
return foo.OperatorConfig(
name="compute_timestamps",
label="Compute Timestamps from filepath or sample's creation date",
description="Filter by various timestamp informations found in filepath or sample`s creation date"
)
def execute(self, ctx):
view = ctx.view
if view is None:
view = ctx.dataset
source = ctx.params.get("source")
regex = ctx.params.get("regex")
geo = ctx.params.get("geo")
tzinfo = tz.gettz(ctx.params.get("timezone"))
dts, weekdays, times, timeofdays = [], [], [], []
if source == "created_at":
dts = view.values(F("_id").to_date())
elif source == "filepath" and regex:
for filepath in view.values("filepath"):
dts.append(timestamp_from_filepath(filepath, regex))
else:
return "parameters not allowed"
for dt in dts:
weekday, time, timeofday = compute_timestamps(dt, tzinfo, geo)
weekdays.append(weekday)
times.append(time)
if geo:
timeofdays.append(timeofday)
view.set_values("datetime", dts)
view.set_values("weekday", weekdays)
view.set_values("time", times)
if geo:
view.set_values("timeofday", timeofdays)
def __call__(
self,
sample_collection,
source: str = "filepath",
regex: str = r".*([0-9]{4})-?([0-9]{2})-?([0-9]{2})_([0-9]{2})?-?([0-9]{2})?-?([0-9]{2})?.*?",
geo: Optional[list] = None,
timezone: str = 'Europe/Berlin'
):
ctx = dict(view=sample_collection.view())
params = dict(
target="CURRENT_VIEW",
source=source,
regex=regex,
geo=geo,
timezone=timezone
)
return foo.execute_operator(self.uri, ctx, params=params)
def register(p):
p.register(ComputeTimestamps)