Skip to content

Commit

Permalink
Finished timestamp conversion logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Shakleen committed Oct 1, 2024
1 parent 374e7da commit c50035c
Show file tree
Hide file tree
Showing 4 changed files with 413 additions and 145 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ blinker==1.8.2
cachetools==5.5.0
certifi==2024.8.30
charset-normalizer==3.3.2
CitiBike-Demand-Prediction==0.0.3
CitiBike-Demand-Prediction==0.0.4
click==8.1.7
cloudpickle==3.0.0
comm==0.2.2
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_requirements(file_name: str) -> List[str]:

setup(
name="CitiBike Demand Prediction",
version="0.0.3",
version="0.0.4",
description="An End-to-End Machine Learning project where I predict demand of bikes at citibike stations at hourly level.",
author="Shakleen Ishfar",
author_email="shakleenishfar@gmail.com",
Expand Down
170 changes: 72 additions & 98 deletions src/components/raw_to_bronze_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,123 +45,97 @@ def create_file_name_column(self, df: DataFrame) -> DataFrame:
"file_name", regexp_extract(col("file_path"), regex_str, 0)
)

def convert_timestamp_type_1(self, df: DataFrame) -> DataFrame:
df_1 = df.where(col("file_name").startswith("202"))

for i in range(2013, 2020, 1):
if i in [2014, 2015, 2016]:
continue

temp_df = df.where(col("file_name").startswith(str(i)))
df_1 = df_1.union(temp_df)

for i in range(1, 9):
temp_df = df.where(col("file_name").startswith(f"20140{i}"))
df_1 = df_1.union(temp_df)

for i in range(10, 13):
temp_df = df.where(col("file_name").startswith(f"2016{i}"))
df_1 = df_1.union(temp_df)

df_1 = df_1.withColumn(
"start_time", to_timestamp("start_time", "yyyy-MM-dd HH:mm:ss")
).withColumn("end_time", to_timestamp("end_time", "yyyy-MM-dd HH:mm:ss"))

return (
df_1.withColumn(
"start_time",
when(
(col("file_name").startswith("202108"))
& (year("start_time") == 2021)
& (month("start_time") == 7),
add_months(col("start_time"), 1),
).otherwise(col("start_time")),
)
.withColumn(
"end_time",
when(
(col("file_name").startswith("202108"))
& (year("end_time") == 2021)
& (month("end_time") == 7),
add_months(col("end_time"), 1),
).otherwise(col("end_time")),
)
.drop("file_path", "file_name")
)
def get_dataframe_timeformat_type_1(self, df: DataFrame) -> DataFrame:
# Create a list of conditions
conditions = [
col("file_name").startswith(file_name_prefix)
for file_name_prefix in ["202", "2013", "2017", "2018", "2019"]
]

def convert_timestamp_type_2(self, df: DataFrame) -> DataFrame:
df_2 = None
# Add conditions for 2014 and 2015 months
conditions += [col("file_name").startswith(f"20140{i}") for i in range(1, 9)]
conditions += [col("file_name").startswith(f"2016{i}") for i in range(10, 13)]

for i in ["201501", "201502", "201503", "201506"]:
temp_df = df.where(col("file_name").startswith(i))
# Combine all conditions using the 'or' operation
filter_condition = conditions[0]
for condition in conditions[1:]:
filter_condition = filter_condition | condition

if df_2 is None:
df_2 = temp_df
else:
df_2 = df_2.union(temp_df)
# Apply the filter condition to the DataFrame
df_1 = df.where(filter_condition)

return (
df_2.withColumn("start_time", to_timestamp("start_time", "M/d/yyyy H:mm"))
.withColumn("end_time", to_timestamp("end_time", "M/d/yyyy H:mm"))
.drop("file_path", "file_name")
)
return df_1

def convert_timestamp_type_3(self, df: DataFrame) -> DataFrame:
df_3 = None
def get_dataframe_timeformat_type_2(self, df: DataFrame) -> DataFrame:
conditions = [
col("file_name").startswith(file_name_prefix)
for file_name_prefix in ["201501", "201502", "201503", "201506"]
]

for i in range(9, 13):
temp_df = df.where(col("file_name").startswith(f"2014{i:02}"))
filter_condition = conditions[0]
for condition in conditions[1:]:
filter_condition = filter_condition | condition

if df_3 is None:
df_3 = temp_df
else:
df_3 = df_3.union(temp_df)
df_2 = df.where(filter_condition)

for i in range(4, 13):
if i == 6:
continue
return df_2

temp_df = df.where(col("file_name").startswith(f"2015{i:02}"))
df_3 = df_3.union(temp_df)
def get_dataframe_timeformat_type_3(self, df: DataFrame) -> DataFrame:
# Create a list of conditions for 2014
conditions_2014 = [
col("file_name").startswith(f"2014{i:02}") for i in range(9, 13)
]

for i in range(1, 10):
temp_df = df.where(col("file_name").startswith(f"2016{i:02}"))
df_3 = df_3.union(temp_df)
# Create a list of conditions for 2015, excluding 201506
conditions_2015 = [
col("file_name").startswith(f"2015{i:02}") for i in range(4, 13) if i != 6
]

return (
df_3.withColumn(
"start_time", to_timestamp("start_time", "M/d/yyyy HH:mm:ss")
)
.withColumn("end_time", to_timestamp("end_time", "M/d/yyyy HH:mm:ss"))
.drop("file_path", "file_name")
)
# Create a list of conditions for 2016
conditions_2016 = [
col("file_name").startswith(f"2016{i:02}") for i in range(1, 10)
]

# Combine all conditions into a single list
all_conditions = conditions_2014 + conditions_2015 + conditions_2016

# Combine all conditions using the 'or' operation
filter_condition = all_conditions[0]
for condition in all_conditions[1:]:
filter_condition = filter_condition | condition

# Apply the filter condition to the DataFrame
df_3 = df.where(filter_condition)

return df_3

def set_timestamp_for_format(
self, df: DataFrame, time_format: str
) -> DataFrame:
return df.withColumn(
"start_time", to_timestamp("start_time", time_format)
).withColumn("end_time", to_timestamp("end_time", time_format))

def set_timestamp_datatype(self, df: DataFrame) -> DataFrame:
df_1 = self.get_dataframe_timeformat_type_1(df)
df_1 = self.set_timestamp_for_format(df_1, "yyyy-MM-dd HH:mm:ss")

df_2 = self.get_dataframe_timeformat_type_2(df)
df_2 = self.set_timestamp_for_format(df_2, "M/d/yyyy H:mm")

df_3 = self.get_dataframe_timeformat_type_3(df)
df_3 = self.set_timestamp_for_format(df_3, "M/d/yyyy HH:mm:ss")

def standardize_time_format(self, df: DataFrame) -> DataFrame:
df_1 = self.convert_timestamp_type_1(df)
df_2 = self.convert_timestamp_type_2(df)
df_3 = self.convert_timestamp_type_3(df)
return df_1.union(df_2).union(df_3)

def transform(self):
logging.info("Reading raw delta table")
df = self.read_raw_delta()
logging.info("Creating file name column")
df = self.create_file_name_column(df)
logging.info("Standardizing time")
df = self.standardize_time_format(df)
logging.info("Dropping rows with null values for stations")
df = df.na.drop(
subset=[
"start_station_id",
"start_station_name",
"start_station_latitude",
"start_station_longitude",
"end_station_id",
"end_station_name",
"end_station_latitude",
"end_station_longitude",
]
)

self.set_timestamp_datatype(df)

logging.info("Writing to bronze delta table")
self.write_delta(df)

Expand Down
Loading

0 comments on commit c50035c

Please sign in to comment.