diff --git a/src/components/raw_to_bronze_transformer.py b/src/components/raw_to_bronze_transformer.py index 93f9abc..3b06067 100644 --- a/src/components/raw_to_bronze_transformer.py +++ b/src/components/raw_to_bronze_transformer.py @@ -167,6 +167,49 @@ def fill_in_station_id_using_name(self, df: DataFrame) -> DataFrame: return df_filled + def fill_in_using_station_id(self, df: DataFrame) -> DataFrame: + # Create a mapping DataFrame with distinct non-null id and corresponding non-null values + mapping_df = ( + df.filter(df["id"].isNotNull()) + .select("id", "name", "latitude", "longitude") + .distinct() + ) + mapping_df = ( + mapping_df.withColumnRenamed("name", "mapped_name") + .withColumnRenamed("latitude", "mapped_latitude") + .withColumnRenamed("longitude", "mapped_longitude") + ) + + # Show the mapping DataFrame + mapping_df.show() + + # Join the original DataFrame with the mapping DataFrame on the id column + df_filled = df.alias("df1").join(mapping_df.alias("df2"), on="id", how="left") + + # Use coalesce to fill null values in the name, latitude, and longitude columns + df_filled = ( + df_filled.withColumn( + "name", coalesce(df_filled["df1.name"], df_filled["mapped_name"]) + ) + .withColumn( + "latitude", + coalesce(df_filled["df1.latitude"], df_filled["mapped_latitude"]), + ) + .withColumn( + "longitude", + coalesce(df_filled["df1.longitude"], df_filled["mapped_longitude"]), + ) + ) + + # Drop the extra columns from the join + return ( + df_filled.drop("mapped_name") + .drop("mapped_latitude") + .drop("mapped_longitude") + .dropDuplicates() + .dropna(how="any") + ) + def transform(self): logging.info("Reading raw delta table") df = self.read_raw_delta() diff --git a/test/components/raw_to_bronze_transformer_test.py b/test/components/raw_to_bronze_transformer_test.py index d6c5c99..fa3cb37 100644 --- a/test/components/raw_to_bronze_transformer_test.py +++ b/test/components/raw_to_bronze_transformer_test.py @@ -415,6 +415,7 @@ def test_drup_duplicates_and_all_nulls( assert isinstance(output, DataFrame) assert before - after == 3 + def test_fill_in_station_id_using_name( dataframe_2: DataFrame, transformer: RawToBronzeTransformer, @@ -424,4 +425,22 @@ def test_fill_in_station_id_using_name( output = transformer.fill_in_station_id_using_name(output) assert isinstance(output, DataFrame) - assert output.filter(col("id").isNull()).count() is 0 \ No newline at end of file + assert output.filter(col("id").isNull()).count() is 0 + + +def test_fill_in_using_station_id( + dataframe_2: DataFrame, + transformer: RawToBronzeTransformer, +): + output = transformer.get_station_dataframe(dataframe_2) + output = transformer.drup_duplicates_and_all_nulls(output) + output = transformer.fill_in_station_id_using_name(output) + output = transformer.fill_in_using_station_id(output) + + assert isinstance(output, DataFrame) + assert ( + output.filter( + col("name").isNull() | col("latitude").isNull() | col("longitude").isNull() + ).count() + is 0 + )