From c50035c8703f190fffcbbcaa2b869ae89d40f036 Mon Sep 17 00:00:00 2001 From: Shakleen Ishfar Date: Tue, 1 Oct 2024 10:01:25 -0400 Subject: [PATCH] Finished timestamp conversion logic --- requirements.txt | 2 +- setup.py | 2 +- src/components/raw_to_bronze_transformer.py | 170 ++++---- .../raw_to_bronze_transformer_test.py | 384 ++++++++++++++++-- 4 files changed, 413 insertions(+), 145 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9524aa5..8fecbb1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ blinker==1.8.2 cachetools==5.5.0 certifi==2024.8.30 charset-normalizer==3.3.2 -CitiBike-Demand-Prediction==0.0.3 +CitiBike-Demand-Prediction==0.0.4 click==8.1.7 cloudpickle==3.0.0 comm==0.2.2 diff --git a/setup.py b/setup.py index 6d0d775..22c0188 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def get_requirements(file_name: str) -> List[str]: setup( name="CitiBike Demand Prediction", - version="0.0.3", + version="0.0.4", description="An End-to-End Machine Learning project where I predict demand of bikes at citibike stations at hourly level.", author="Shakleen Ishfar", author_email="shakleenishfar@gmail.com", diff --git a/src/components/raw_to_bronze_transformer.py b/src/components/raw_to_bronze_transformer.py index 43988b2..f6bca58 100644 --- a/src/components/raw_to_bronze_transformer.py +++ b/src/components/raw_to_bronze_transformer.py @@ -45,101 +45,87 @@ def create_file_name_column(self, df: DataFrame) -> DataFrame: "file_name", regexp_extract(col("file_path"), regex_str, 0) ) - def convert_timestamp_type_1(self, df: DataFrame) -> DataFrame: - df_1 = df.where(col("file_name").startswith("202")) - - for i in range(2013, 2020, 1): - if i in [2014, 2015, 2016]: - continue - - temp_df = df.where(col("file_name").startswith(str(i))) - df_1 = df_1.union(temp_df) - - for i in range(1, 9): - temp_df = df.where(col("file_name").startswith(f"20140{i}")) - df_1 = df_1.union(temp_df) - - for i in range(10, 13): - temp_df = df.where(col("file_name").startswith(f"2016{i}")) - df_1 = df_1.union(temp_df) - - df_1 = df_1.withColumn( - "start_time", to_timestamp("start_time", "yyyy-MM-dd HH:mm:ss") - ).withColumn("end_time", to_timestamp("end_time", "yyyy-MM-dd HH:mm:ss")) - - return ( - df_1.withColumn( - "start_time", - when( - (col("file_name").startswith("202108")) - & (year("start_time") == 2021) - & (month("start_time") == 7), - add_months(col("start_time"), 1), - ).otherwise(col("start_time")), - ) - .withColumn( - "end_time", - when( - (col("file_name").startswith("202108")) - & (year("end_time") == 2021) - & (month("end_time") == 7), - add_months(col("end_time"), 1), - ).otherwise(col("end_time")), - ) - .drop("file_path", "file_name") - ) + def get_dataframe_timeformat_type_1(self, df: DataFrame) -> DataFrame: + # Create a list of conditions + conditions = [ + col("file_name").startswith(file_name_prefix) + for file_name_prefix in ["202", "2013", "2017", "2018", "2019"] + ] - def convert_timestamp_type_2(self, df: DataFrame) -> DataFrame: - df_2 = None + # Add conditions for 2014 and 2015 months + conditions += [col("file_name").startswith(f"20140{i}") for i in range(1, 9)] + conditions += [col("file_name").startswith(f"2016{i}") for i in range(10, 13)] - for i in ["201501", "201502", "201503", "201506"]: - temp_df = df.where(col("file_name").startswith(i)) + # Combine all conditions using the 'or' operation + filter_condition = conditions[0] + for condition in conditions[1:]: + filter_condition = filter_condition | condition - if df_2 is None: - df_2 = temp_df - else: - df_2 = df_2.union(temp_df) + # Apply the filter condition to the DataFrame + df_1 = df.where(filter_condition) - return ( - df_2.withColumn("start_time", to_timestamp("start_time", "M/d/yyyy H:mm")) - .withColumn("end_time", to_timestamp("end_time", "M/d/yyyy H:mm")) - .drop("file_path", "file_name") - ) + return df_1 - def convert_timestamp_type_3(self, df: DataFrame) -> DataFrame: - df_3 = None + def get_dataframe_timeformat_type_2(self, df: DataFrame) -> DataFrame: + conditions = [ + col("file_name").startswith(file_name_prefix) + for file_name_prefix in ["201501", "201502", "201503", "201506"] + ] - for i in range(9, 13): - temp_df = df.where(col("file_name").startswith(f"2014{i:02}")) + filter_condition = conditions[0] + for condition in conditions[1:]: + filter_condition = filter_condition | condition - if df_3 is None: - df_3 = temp_df - else: - df_3 = df_3.union(temp_df) + df_2 = df.where(filter_condition) - for i in range(4, 13): - if i == 6: - continue + return df_2 - temp_df = df.where(col("file_name").startswith(f"2015{i:02}")) - df_3 = df_3.union(temp_df) + def get_dataframe_timeformat_type_3(self, df: DataFrame) -> DataFrame: + # Create a list of conditions for 2014 + conditions_2014 = [ + col("file_name").startswith(f"2014{i:02}") for i in range(9, 13) + ] - for i in range(1, 10): - temp_df = df.where(col("file_name").startswith(f"2016{i:02}")) - df_3 = df_3.union(temp_df) + # Create a list of conditions for 2015, excluding 201506 + conditions_2015 = [ + col("file_name").startswith(f"2015{i:02}") for i in range(4, 13) if i != 6 + ] - return ( - df_3.withColumn( - "start_time", to_timestamp("start_time", "M/d/yyyy HH:mm:ss") - ) - .withColumn("end_time", to_timestamp("end_time", "M/d/yyyy HH:mm:ss")) - .drop("file_path", "file_name") - ) + # Create a list of conditions for 2016 + conditions_2016 = [ + col("file_name").startswith(f"2016{i:02}") for i in range(1, 10) + ] + + # Combine all conditions into a single list + all_conditions = conditions_2014 + conditions_2015 + conditions_2016 + + # Combine all conditions using the 'or' operation + filter_condition = all_conditions[0] + for condition in all_conditions[1:]: + filter_condition = filter_condition | condition + + # Apply the filter condition to the DataFrame + df_3 = df.where(filter_condition) + + return df_3 + + def set_timestamp_for_format( + self, df: DataFrame, time_format: str + ) -> DataFrame: + return df.withColumn( + "start_time", to_timestamp("start_time", time_format) + ).withColumn("end_time", to_timestamp("end_time", time_format)) + + def set_timestamp_datatype(self, df: DataFrame) -> DataFrame: + df_1 = self.get_dataframe_timeformat_type_1(df) + df_1 = self.set_timestamp_for_format(df_1, "yyyy-MM-dd HH:mm:ss") + + df_2 = self.get_dataframe_timeformat_type_2(df) + df_2 = self.set_timestamp_for_format(df_2, "M/d/yyyy H:mm") + + df_3 = self.get_dataframe_timeformat_type_3(df) + df_3 = self.set_timestamp_for_format(df_3, "M/d/yyyy HH:mm:ss") - def standardize_time_format(self, df: DataFrame) -> DataFrame: - df_1 = self.convert_timestamp_type_1(df) - df_2 = self.convert_timestamp_type_2(df) - df_3 = self.convert_timestamp_type_3(df) return df_1.union(df_2).union(df_3) def transform(self): @@ -147,21 +133,9 @@ def transform(self): df = self.read_raw_delta() logging.info("Creating file name column") df = self.create_file_name_column(df) - logging.info("Standardizing time") - df = self.standardize_time_format(df) - logging.info("Dropping rows with null values for stations") - df = df.na.drop( - subset=[ - "start_station_id", - "start_station_name", - "start_station_latitude", - "start_station_longitude", - "end_station_id", - "end_station_name", - "end_station_latitude", - "end_station_longitude", - ] - ) + + self.set_timestamp_datatype(df) + logging.info("Writing to bronze delta table") self.write_delta(df) diff --git a/test/components/raw_to_bronze_transformer_test.py b/test/components/raw_to_bronze_transformer_test.py index 76f358d..e826b2f 100644 --- a/test/components/raw_to_bronze_transformer_test.py +++ b/test/components/raw_to_bronze_transformer_test.py @@ -1,38 +1,270 @@ import pytest from unittest.mock import Mock, patch from pyspark.sql import SparkSession +from pyspark.sql.types import ( + StructField, + StructType, + LongType, + IntegerType, + StringType, + FloatType, + TimestampType, +) +from pyspark.sql.functions import col +from pyspark.sql.dataframe import DataFrame from src.components.raw_to_bronze_transformer import ( RawToBronzeTransformerConfig, RawToBronzeTransformer, ) +schema = StructType( + StructType( + [ + StructField("start_time", StringType(), True), + StructField("end_time", StringType(), True), + StructField("start_station_name", StringType(), True), + StructField("start_station_latitude", FloatType(), True), + StructField("start_station_longitude", FloatType(), True), + StructField("end_station_name", StringType(), True), + StructField("end_station_latitude", FloatType(), True), + StructField("end_station_longitude", FloatType(), True), + StructField("start_station_id", IntegerType(), True), + StructField("end_station_id", IntegerType(), True), + StructField("member", IntegerType(), True), + StructField("row_number", LongType(), True), + StructField("file_path", StringType(), True), + StructField("file_name", StringType(), True), + ] + ) +) + @pytest.fixture(scope="session") -def dataframe_mock(): - df = Mock(name="dataframe") - df.withColumnRenamed.return_value = df - df.withColumn.return_value = df - df.drop.return_value = df - df.union.return_value = df - df.write.return_value = df - df.save.return_value = df - df.select.return_value = df - return df +def spark(): + spark = ( + SparkSession.builder.master("local[1]") + .appName("local-tests") + .config("spark.executor.cores", "1") + .config("spark.executor.instances", "1") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.driver.bindAddress", "127.0.0.1") + .getOrCreate() + ) + yield spark + spark.stop() -@pytest.fixture(scope="session") -def spark_mock(dataframe_mock): - spark = Mock(SparkSession) - spark.read = spark - spark.read.format = spark - spark.read.format.load = spark - return spark +@pytest.fixture +def transformer(spark: SparkSession): + return RawToBronzeTransformer(spark) + + +@pytest.fixture +def dataframe_2(spark: SparkSession): + return spark.createDataFrame( + data=[ + [ + "2021-01-19 19:43:36.986", + "2021-01-19 19:45:50.414", + "Rivington St & Ridge St", + 40.718502044677734, + -73.9832992553711, + "Allen St & Rivington St", + 40.72019577026367, + -73.98997497558594, + 5406, + 5414, + 1, + 1666447310848, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/post_2020/202101-citibike-tripdata_1.csv", + "202101-citibike-tripdata_1.csv", + ], + [ + "2021-01-29 06:38:32.423", + "2021-01-29 06:40:28.603", + "Clark St & Henry St", + 40.697601318359375, + -73.99344635009766, + "Columbia Heights & Cranberry St", + 40.70037841796875, + -73.9954833984375, + 4789, + 4829, + 1, + 1666447310849, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/post_2020/202101-citibike-tripdata_1.csv", + "202101-citibike-tripdata_1.csv", + ], + [ + "1/1/2015 0:01", + "1/1/2015 0:24", + "Rivington St & Ridge St", + 40.718502044677734, + -73.9832992553711, + "Allen St & Rivington St", + 40.72019577026367, + -73.98997497558594, + 5406, + 5414, + 1, + 1666447310848, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/post_2020/202101-citibike-tripdata_1.csv", + "201501-citibike-tripdata_1.csv", + ], + [ + "1/1/2015 0:02", + "1/1/2015 0:08", + "Clark St & Henry St", + 40.697601318359375, + -73.99344635009766, + "Columbia Heights & Cranberry St", + 40.70037841796875, + -73.9954833984375, + 4789, + 4829, + 1, + 1666447310849, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/post_2020/202101-citibike-tripdata_1.csv", + "201501-citibike-tripdata_1.csv", + ], + [ + "9/1/2014 00:00:25", + "9/1/2014 00:00:25", + "Rivington St & Ridge St", + 40.718502044677734, + -73.9832992553711, + "Allen St & Rivington St", + 40.72019577026367, + -73.98997497558594, + 5406, + 5414, + 1, + 1666447310848, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/post_2020/202101-citibike-tripdata_1.csv", + "201409-citibike-tripdata_1.csv", + ], + [ + "9/1/2014 00:00:28", + "9/1/2014 00:00:28", + "Clark St & Henry St", + 40.697601318359375, + -73.99344635009766, + "Columbia Heights & Cranberry St", + 40.70037841796875, + -73.9954833984375, + 4789, + 4829, + 1, + 1666447310849, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/post_2020/202101-citibike-tripdata_1.csv", + "201409-citibike-tripdata_1.csv", + ], + ], + schema=schema, + ) -@pytest.fixture() -def transformer(spark_mock): - return RawToBronzeTransformer(spark_mock) +@pytest.fixture +def dataframe(spark: SparkSession): + return spark.createDataFrame( + data=[ + [ + "2019-08-01 00:00:01.4680", + "2019-08-01 00:06:35.3780", + "Forsyth St & Broome St", + 40.71894073486328, + -73.99266052246094, + "Market St & Cherry St", + 40.71076202392578, + -73.99400329589844, + 531, + 408, + 1, + 85899345920, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/pre_2020/201908-citibike-tripdata_1.csv", + ], + [ + "2019-08-01 00:00:01.9290", + "2019-08-01 00:10:29.7840", + "Lafayette Ave & Fort Greene Pl", + 40.686920166015625, + -73.9766845703125, + "Bergen St & Smith St", + 40.686744689941406, + -73.99063110351562, + 274, + 3409, + 1, + 85899345921, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/pre_2020/201908-citibike-tripdata_1.csv", + ], + [ + "2019-08-01 00:00:04.0480", + "2019-08-01 00:18:56.1650", + "Front St & Washington St", + 40.70254898071289, + -73.9894027709961, + "President St & Henry St", + 40.68280029296875, + -73.9999008178711, + 2000, + 3388, + 1, + 85899345922, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/pre_2020/201908-citibike-tripdata_1.csv", + ], + [ + "2019-08-01 00:00:04.1630", + "2019-08-01 00:29:44.7940", + "9 Ave & W 45 St", + 40.76019287109375, + -73.99125671386719, + "Rivington St & Chrystie St", + 40.721099853515625, + -73.99192810058594, + 479, + 473, + 1, + 85899345923, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/pre_2020/201908-citibike-tripdata_1.csv", + ], + [ + "2019-08-01 00:00:05.4580", + "2019-08-01 00:25:23.4550", + "1 Ave & E 94 St", + 40.78172302246094, + -73.94593811035156, + "1 Ave & E 94 St", + 40.78172302246094, + -73.94593811035156, + 3312, + 3312, + 1, + 85899345924, + "file:///media/ishrak/New%20Volume/Studies/Projects/CitiBike-Demand-Prediction/Data/CSVs/pre_2020/201908-citibike-tripdata_1.csv", + ], + ], + schema=StructType( + StructType( + [ + StructField("start_time", StringType(), True), + StructField("end_time", StringType(), True), + StructField("start_station_name", StringType(), True), + StructField("start_station_latitude", FloatType(), True), + StructField("start_station_longitude", FloatType(), True), + StructField("end_station_name", StringType(), True), + StructField("end_station_latitude", FloatType(), True), + StructField("end_station_longitude", FloatType(), True), + StructField("start_station_id", IntegerType(), True), + StructField("end_station_id", IntegerType(), True), + StructField("member", IntegerType(), True), + StructField("row_number", LongType(), True), + StructField("file_path", StringType(), True), + ] + ) + ), + ) def test_config(): @@ -43,36 +275,98 @@ def test_config(): assert hasattr(config, "bronze_data_path") -def test_init(spark_mock: SparkSession, transformer: RawToBronzeTransformer): - assert transformer.config == RawToBronzeTransformerConfig() - assert transformer.spark is spark_mock +def test_init(transformer: RawToBronzeTransformer, spark: SparkSession): + assert hasattr(transformer, "config") + assert hasattr(transformer, "spark") + assert transformer.spark is spark + assert isinstance(transformer.config, RawToBronzeTransformerConfig) + assert isinstance(transformer.spark, SparkSession) + + +def test_read_raw_delta(dataframe: DataFrame): + spark_mock = Mock(SparkSession) + transformer = RawToBronzeTransformer(spark_mock) + spark_mock.read.format("delta").load.return_value = dataframe -def test_read_raw_delta(spark_mock: SparkSession, transformer: RawToBronzeTransformer): - _ = transformer.read_raw_delta() - spark_mock.read.assert_called_once() - spark_mock.read.format.assert_called_once_with("delta") - spark_mock.read.format.load.assert_called_once() + df = transformer.read_raw_delta() + spark_mock.read.format.assert_called_with("delta") + spark_mock.read.format("delta").load.assert_called_with( + transformer.config.raw_data_path + ) + + assert df is dataframe -def test_write_delta(dataframe_mock, transformer: RawToBronzeTransformer): - write_mock = Mock() - dataframe_mock.write = write_mock - transformer.write_delta(dataframe_mock) +def test_write_delta(): + dataframe = Mock(DataFrame) + spark_mock = Mock(SparkSession) + transformer = RawToBronzeTransformer(spark_mock) - write_mock.save.assert_called_once_with( - path=transformer.config.bronze_data_path, - format="delta", - mode="overwrite", + transformer.write_delta(dataframe) + + dataframe.write.save.assert_called_once_with( + path=transformer.config.bronze_data_path, format="delta", mode="overwrite" ) -def test_create_file_name_column(mocker, dataframe_mock, transformer: RawToBronzeTransformer): - reg_exp_mock = Mock() - mocker.patch("src.components.raw_to_bronze_transformer.col", return_value=Mock()) - mocker.patch("src.components.raw_to_bronze_transformer.regexp_extract", return_value=reg_exp_mock) - - output = transformer.create_file_name_column(dataframe_mock) - - dataframe_mock.withColumn.assert_called_once_with("file_name", reg_exp_mock) - assert output == dataframe_mock + +def test_create_file_name_column( + transformer: RawToBronzeTransformer, + dataframe: DataFrame, +): + df = transformer.create_file_name_column(dataframe) + + assert "file_name" in df.columns + + +def test_get_dataframe_timeformat_type_1( + dataframe_2: DataFrame, + transformer: RawToBronzeTransformer, +): + output = transformer.get_dataframe_timeformat_type_1(dataframe_2) + + assert isinstance(output, DataFrame) + assert output.count() == 2 + + +def test_get_dataframe_timeformat_type_2( + dataframe_2: DataFrame, + transformer: RawToBronzeTransformer, +): + output = transformer.get_dataframe_timeformat_type_2(dataframe_2) + + assert isinstance(output, DataFrame) + assert output.count() == 2 + + +def test_get_dataframe_timeformat_type_3( + dataframe_2: DataFrame, + transformer: RawToBronzeTransformer, +): + output = transformer.get_dataframe_timeformat_type_3(dataframe_2) + + assert isinstance(output, DataFrame) + assert output.count() == 2 + + +def test_set_timestamp_for_format( + dataframe_2: DataFrame, + transformer: RawToBronzeTransformer, +): + output = transformer.set_timestamp_for_format(dataframe_2, "yyyy-MM-dd HH:mm:ss") + + assert isinstance(output, DataFrame) + assert output.schema[0] == StructField("start_time", TimestampType(), True) + assert output.schema[1] == StructField("end_time", TimestampType(), True) + + +def test_set_timestamp_datatype( + dataframe_2: DataFrame, + transformer: RawToBronzeTransformer, +): + output = transformer.set_timestamp_datatype(dataframe_2) + + assert isinstance(output, DataFrame) + assert output.schema[0] == StructField("start_time", TimestampType(), True) + assert output.schema[1] == StructField("end_time", TimestampType(), True)