-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflink_streaming.py
85 lines (75 loc) · 2.88 KB
/
flink_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#Task 1
# Stream the data into Flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.schema import Schema
from pyflink.table.types import DataTypes
from pyflink.table import TableDescriptor, FormatDescriptor
from pyflink.table.udf import udf
import math
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(env, environment_settings=settings)
csv_file_path = "/home/tashi/yellow_tripdata_2024-01.csv"
schema = Schema.new_builder() \
.column("VendorID", DataTypes.STRING()) \
.column("tpep_pickup_datetime", DataTypes.STRING()) \
.column("tpep_dropoff_datetime", DataTypes.STRING()) \
.column("passenger_count", DataTypes.STRING()) \
.column("trip_distance", DataTypes.STRING()) \
.column("RatecodeID", DataTypes.STRING()) \
.column("store_and_fwd_flag", DataTypes.STRING()) \
.column("PULocationID", DataTypes.STRING()) \
.column("DOLocationID", DataTypes.STRING()) \
.column("payment_type", DataTypes.STRING()) \
.column("fare_amount", DataTypes.STRING()) \
.column("extra", DataTypes.STRING()) \
.column("mta_tax", DataTypes.STRING()) \
.column("tip_amount", DataTypes.STRING()) \
.column("tolls_amount", DataTypes.STRING()) \
.column("improvement_surcharge", DataTypes.STRING()) \
.column("total_amount", DataTypes.STRING()) \
.column("congestion_surcharge", DataTypes.STRING()) \
.column("Airport_fee", DataTypes.STRING()) \
.build()
table_env.create_temporary_table(
"SourceTable",
TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", csv_file_path)
.format(
FormatDescriptor.for_format("csv")
.option("field-delimiter", ",")
.option("quote-character", "\"")
.option("ignore-first-line", "true")
.build()
)
.build(),
)
@udf(result_type=DataTypes.FLOAT())
def parse_float(value):
try:
return float(value)
except ValueError:
return math.nan
@udf(result_type=DataTypes.INT())
def parse_int(value):
try:
return int(value)
except ValueError:
return None
table_env.create_temporary_function("parse_float", parse_float)
table_env.create_temporary_function("parse_int", parse_int)
result_table = table_env.sql_query("""
SELECT
tpep_pickup_datetime,
parse_int(passenger_count) AS passenger_count,
parse_float(trip_distance) AS trip_distance,
parse_float(fare_amount) AS fare_amount,
parse_float(tip_amount) AS tip_amount,
parse_float(total_amount) AS total_amount
FROM SourceTable
WHERE trip_distance IS NOT NULL AND fare_amount IS NOT NULL
""")
table_env.to_changelog_stream(result_table).print()
env.execute("Flink CSV Streaming Job")