Skip to content

Commit

Permalink
Fill in missing station data using id
Browse files Browse the repository at this point in the history
  • Loading branch information
Shakleen committed Oct 1, 2024
1 parent 179da9c commit 3354d58
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
43 changes: 43 additions & 0 deletions src/components/raw_to_bronze_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,49 @@ def fill_in_station_id_using_name(self, df: DataFrame) -> DataFrame:

return df_filled

def fill_in_using_station_id(self, df: DataFrame) -> DataFrame:
# Create a mapping DataFrame with distinct non-null id and corresponding non-null values
mapping_df = (
df.filter(df["id"].isNotNull())
.select("id", "name", "latitude", "longitude")
.distinct()
)
mapping_df = (
mapping_df.withColumnRenamed("name", "mapped_name")
.withColumnRenamed("latitude", "mapped_latitude")
.withColumnRenamed("longitude", "mapped_longitude")
)

# Show the mapping DataFrame
mapping_df.show()

# Join the original DataFrame with the mapping DataFrame on the id column
df_filled = df.alias("df1").join(mapping_df.alias("df2"), on="id", how="left")

# Use coalesce to fill null values in the name, latitude, and longitude columns
df_filled = (
df_filled.withColumn(
"name", coalesce(df_filled["df1.name"], df_filled["mapped_name"])
)
.withColumn(
"latitude",
coalesce(df_filled["df1.latitude"], df_filled["mapped_latitude"]),
)
.withColumn(
"longitude",
coalesce(df_filled["df1.longitude"], df_filled["mapped_longitude"]),
)
)

# Drop the extra columns from the join
return (
df_filled.drop("mapped_name")
.drop("mapped_latitude")
.drop("mapped_longitude")
.dropDuplicates()
.dropna(how="any")
)

def transform(self):
logging.info("Reading raw delta table")
df = self.read_raw_delta()
Expand Down
21 changes: 20 additions & 1 deletion test/components/raw_to_bronze_transformer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def test_drup_duplicates_and_all_nulls(
assert isinstance(output, DataFrame)
assert before - after == 3


def test_fill_in_station_id_using_name(
dataframe_2: DataFrame,
transformer: RawToBronzeTransformer,
Expand All @@ -424,4 +425,22 @@ def test_fill_in_station_id_using_name(
output = transformer.fill_in_station_id_using_name(output)

assert isinstance(output, DataFrame)
assert output.filter(col("id").isNull()).count() is 0
assert output.filter(col("id").isNull()).count() is 0


def test_fill_in_using_station_id(
dataframe_2: DataFrame,
transformer: RawToBronzeTransformer,
):
output = transformer.get_station_dataframe(dataframe_2)
output = transformer.drup_duplicates_and_all_nulls(output)
output = transformer.fill_in_station_id_using_name(output)
output = transformer.fill_in_using_station_id(output)

assert isinstance(output, DataFrame)
assert (
output.filter(
col("name").isNull() | col("latitude").isNull() | col("longitude").isNull()
).count()
is 0
)

0 comments on commit 3354d58

Please sign in to comment.