-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.py
69 lines (50 loc) · 1.7 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import duckdb
from prefect import flow, task
import os
from typing import TypeVar, Generic, Literal
T = TypeVar("T")
class Const(Generic[T]):
...
SQL_INIT_FILE: Const[str] = "init.sql"
DatasetSplit = Literal["train", "test"]
Sentiment = Literal["pos", "neg"]
@task
def create_connection() -> duckdb.DuckDBPyConnection:
return duckdb.connect()
@task
def close_connection(conn: duckdb.DuckDBPyConnection) -> None:
conn.close()
@task
def create_tables(conn: duckdb.DuckDBPyConnection) -> duckdb.DuckDBPyRelation:
with open(SQL_INIT_FILE) as file:
init_cmd = file.read()
return conn.sql(init_cmd)
@task
def ingest_data(conn: duckdb.DuckDBPyConnection, split: DatasetSplit, sentiment: Sentiment):
path = f"raw-data/{split}/{sentiment}/"
list_of_files = os.listdir(path)
key = lambda file: int(file.split(".")[0].split("_")[0])
for file in sorted(list_of_files, key=key):
name = file.split(".")[0]
sentiment_id, score = [int(x) for x in name.split("_")]
with open(path + file) as file:
review = file.read()
cmd = f"INSERT INTO {split} (sentiment_id, score, review, sentiment) VALUES (?, ?, ?, ?);"
values = (sentiment_id, score, review, sentiment)
conn.execute(cmd, values)
@task
def write_result(conn):
conn.sql(f"EXPORT DATABASE 'dataset' (FORMAT PARQUET);")
def yield_matrix():
for split in ["test", "train"]:
for sentiment in ["pos", "neg"]:
yield split, sentiment
@flow
def main():
conn = create_connection()
create_tables(conn)
for split, sentiment in yield_matrix():
ingest_data(conn, split, sentiment)
write_result(conn)
close_connection(conn)
main()