-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevents.py
125 lines (95 loc) · 4.33 KB
/
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
Quick and dirty evaluation of petl's ETL features.
"""
import os
import sys
import petl
# regexes
# petl runs strip() so leading whitespace is cleared
# pattern covers some fiddly cases:
# line 1: handle leading "- " (strip() prefix for 2nd event within a day) or a date/date to date range
# line 2: extract event title
# line 3: extract optional tag block within "[]" section
EVENT_TITLE_PATTERN = r"^(?:(?P<date>\d{1,2}\/\d{1,2}\/\d{4})(?: to (?P<end>\d{1,2}\/\d{1,2}\/\d{4}))? )?(?:- )" \
r"(?P<title>[\w ]*)" \
r"(?P<tags>\[[\w,\/\s]+])?"
TAG_PATTERN = r"\[(?P<distance>[SML])/(?P<difficulty>[EMR])(?P<wet>W?)\]"
# constants
NA = ""
EVENT_ACTIVITY_KEYWORDS = ("MTB", "Bushwalk")
PETL_OUTPUT_KEY = "PETL_OUTPUT"
# configure output file
if PETL_OUTPUT_KEY in os.environ:
petl_output_path = os.environ[PETL_OUTPUT_KEY]
else:
petl_output_path = None
def event_classifier(title):
return [kw for kw in EVENT_ACTIVITY_KEYWORDS if kw.lower() in title.lower()] or None
# TODO: implement the following steps to assess petl library (ignoring architecture)
# DONE load raw data
# split data into relevant category strings (do regex funcs work?)
# DONE transform relevant fields (e.g. dates)
# DONE classify event by keyword / add activity compound field
# DONE error check date order
# DONE save to JSON lines
# read in JSON lines (data index) / bypass processing pipeline
data_path = sys.argv[1]
assert os.path.exists(data_path)
raw_table = petl.fromtext(data_path)
tokenised_table = petl.capture(raw_table,
"lines",
EVENT_TITLE_PATTERN,
["date", "end_date", "title", "raw_tags"])
# replace None in raw tags with NA string to prevent future regex failure
tokenised_raw_tag_table = petl.convert(tokenised_table, "raw_tags", lambda v: v if v else NA)
print(petl.lookall(tokenised_raw_tag_table))
tokenised_tag_table = petl.capture(tokenised_raw_tag_table,
"raw_tags",
TAG_PATTERN,
["distance", "difficulty", "wet"],
fill=(NA, NA, NA))
tokenised_activity_tag_table = petl.addfield(tokenised_tag_table,
"activities",
lambda rec: event_classifier(rec["title"]))
# perform date ETL operations in chained/functional form
# fill missing dates where multiple events fall on the same day
# add date validation fields
def check_prev_date(previous, current, _):
"""True if current event starts after the last event (may occur on same day)"""
if previous is None:
return None
if previous.end_date:
# verify current event doesn't start before or during previous event
return current.date > previous.date and current.date >= previous.end_date
return current.date >= previous.date
date_parser = petl.dateparser("%d/%m/%Y")
processed_table = (petl
.convert(tokenised_activity_tag_table, "date", date_parser)
.convert("end_date", date_parser)
.filldown("date")
.addfieldusingcontext("is_ordered", check_prev_date)
.movefield("is_ordered", 2))
# validate against data constraints
constraints = [
dict(name="Dates Chronologically Ordered", field="is_ordered", assertion=lambda x: x is not False),
dict(name="distance_valid", field="distance", test=lambda x: x in "SMLX" or x == ""),
dict(name="difficulty_valid", field="difficulty", test=lambda x: x in "EMHX" or x == "")
]
error_table = petl.validate(processed_table, constraints=constraints)
# display rough summary of data
print("Full form data")
print(petl.lookall(processed_table))
n_errors = len(error_table)-1 # header counts for one row
if n_errors:
print(f"\nError report table (errors={n_errors})")
print(error_table)
print("\nShort form data")
print(processed_table)
# export to JSON
# HACK: convert dates to strs on the fly to facilitate JSON export...
if petl_output_path:
print(f"\nExporting processed table to JSON {petl_output_path}")
print(petl.convert(processed_table, "date", str)
.convert("end_date", str)
.tojson(petl_output_path, lines=True))
assert os.path.exists(petl_output_path)