-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path311_metrics_adjustment.py
82 lines (76 loc) · 4.65 KB
/
311_metrics_adjustment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import sys
import datetime as dt
from pyspark.sql import SparkSession
'''
spark-submit 311_metrics_adjustment.py hdfs:/user/amr1059/average_completion_time.parquet \
hdfs:/user/amr1059/average_completion_time_by_incident.parquet hdfs:/user/amr1059/incidents_per_zip.parquet
'''
def adjust_and_transpose(spark, parquet_file1, parquet_file2, parquet_file3):
print('{} | Starting dataframe transpose'.format(dt.datetime.now()))
table = spark.read.parquet(parquet_file1)
table.createOrReplaceTempView('average_completion_time')
table_transpose = spark.sql("""
SELECT incident_zip,
SUM(coalesce(CASE WHEN year = 2010 THEN avg_job_time END, 0)) AS 2010_job_time,
SUM(coalesce(CASE WHEN year = 2011 THEN avg_job_time END, 0)) AS 2011_job_time,
SUM(coalesce(CASE WHEN year = 2012 THEN avg_job_time END, 0)) AS 2012_job_time,
SUM(coalesce(CASE WHEN year = 2013 THEN avg_job_time END, 0)) AS 2013_job_time,
SUM(coalesce(CASE WHEN year = 2014 THEN avg_job_time END, 0)) AS 2014_job_time,
SUM(coalesce(CASE WHEN year = 2015 THEN avg_job_time END, 0)) AS 2015_job_time,
SUM(coalesce(CASE WHEN year = 2016 THEN avg_job_time END, 0)) AS 2016_job_time,
SUM(coalesce(CASE WHEN year = 2017 THEN avg_job_time END, 0)) AS 2017_job_time,
SUM(coalesce(CASE WHEN year = 2018 THEN avg_job_time END, 0)) AS 2018_job_time,
SUM(coalesce(CASE WHEN year = 2019 THEN avg_job_time END, 0)) AS 2019_job_time
FROM average_completion_time
GROUP BY incident_zip
ORDER BY incident_zip
""")
table2 = spark.read.parquet(parquet_file2)
table2.createOrReplaceTempView('average_completion_time_by_incident')
table2_transpose = spark.sql("""
SELECT incident_zip, complaint_type,
SUM(coalesce(CASE WHEN year = 2010 THEN avg_job_time END, 0)) AS 2010_job_time,
SUM(coalesce(CASE WHEN year = 2011 THEN avg_job_time END, 0)) AS 2011_job_time,
SUM(coalesce(CASE WHEN year = 2012 THEN avg_job_time END, 0)) AS 2012_job_time,
SUM(coalesce(CASE WHEN year = 2013 THEN avg_job_time END, 0)) AS 2013_job_time,
SUM(coalesce(CASE WHEN year = 2014 THEN avg_job_time END, 0)) AS 2014_job_time,
SUM(coalesce(CASE WHEN year = 2015 THEN avg_job_time END, 0)) AS 2015_job_time,
SUM(coalesce(CASE WHEN year = 2016 THEN avg_job_time END, 0)) AS 2016_job_time,
SUM(coalesce(CASE WHEN year = 2017 THEN avg_job_time END, 0)) AS 2017_job_time,
SUM(coalesce(CASE WHEN year = 2018 THEN avg_job_time END, 0)) AS 2018_job_time,
SUM(coalesce(CASE WHEN year = 2019 THEN avg_job_time END, 0)) AS 2019_job_time
FROM average_completion_time_by_incident
GROUP BY incident_zip, complaint_type
ORDER BY incident_zip
""")
table3 = spark.read.parquet(parquet_file3)
table3.createOrReplaceTempView('incidents_per_zip')
table3_transpose = spark.sql("""
SELECT incident_zip, complaint_type,
SUM(coalesce(CASE WHEN year = 2010 THEN incident_count END, 0)) AS 2010_incident_count,
SUM(coalesce(CASE WHEN year = 2011 THEN incident_count END, 0)) AS 2011_incident_count,
SUM(coalesce(CASE WHEN year = 2012 THEN incident_count END, 0)) AS 2012_incident_count,
SUM(coalesce(CASE WHEN year = 2013 THEN incident_count END, 0)) AS 2013_incident_count,
SUM(coalesce(CASE WHEN year = 2014 THEN incident_count END, 0)) AS 2014_incident_count,
SUM(coalesce(CASE WHEN year = 2015 THEN incident_count END, 0)) AS 2015_incident_count,
SUM(coalesce(CASE WHEN year = 2016 THEN incident_count END, 0)) AS 2016_incident_count,
SUM(coalesce(CASE WHEN year = 2017 THEN incident_count END, 0)) AS 2017_incident_count,
SUM(coalesce(CASE WHEN year = 2018 THEN incident_count END, 0)) AS 2018_incident_count,
SUM(coalesce(CASE WHEN year = 2019 THEN incident_count END, 0)) AS 2019_incident_count
FROM incidents_per_zip
GROUP BY incident_zip, complaint_type
ORDER BY incident_zip
""")
print('{}| Finished transpose'.format(dt.datetime.now()))
print('{}| Writing transposed dataframes to file'.format(dt.datetime.now()))
table_transpose.write.csv('avg_comp_time_transpose')
table2_transpose.write.csv('avg_comp_time_incident_transpose')
table3_transpose.write.csv('incidents_by_zip_transpose')
print('{}| Completed'.format(dt.datetime.now()))
if __name__ == "__main__":
spark = SparkSession.builder.appName("transpose_tables").getOrCreate()
input_parquet1 = sys.argv[1]
input_parquet2 = sys.argv[2]
input_parquet3 = sys.argv[3]
adjust_and_transpose(spark, input_parquet1, input_parquet2, input_parquet3)