Skip to content

Commit

Permalink
Merge pull request #385 from j3-signalroom/github_issue-384
Browse files Browse the repository at this point in the history
See #384. Code complete.
  • Loading branch information
j3-signalroom authored Oct 20, 2024
2 parents 41fc269 + a184ef0 commit 1b80938
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 35 deletions.
Binary file added .blog/images/apache-flink-kickstarter.mov
Binary file not shown.
Binary file modified .blog/images/streamlit-screenshot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file.

The format is base on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.51.02.000] - 2024-10-20
### Changed
- Issue [#384](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/384).

## [0.51.01.000] - 2024-10-20
### Changed
- Issue [#382](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/376).
Expand Down
77 changes: 42 additions & 35 deletions python/kickstarter/flink_kickstarter_visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,63 @@


@st.cache_data
def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# Get the number of flights per month for the selected airline and year
def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""This function loads data from an Apache Iceberg table and returns it as a Pandas DataFrame.
Args:
_tbl_env (StreamTableEnvironment): is the Table Environment.
database_name (str): is the name of the database.
Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: is a tuple of Pandas DataFrames.
"""
# Get the number of flights per month by airline, year, and month
airline_monthly_flights_table = _tbl_env.sql_query(f"""
select
airline,
extract(year from to_timestamp(departure_time)) as departure_year,
extract(month from to_timestamp(departure_time)) as departure_month,
date_format(to_timestamp(departure_time), 'MMM') as departure_month_abbr,
count(*) as flight_count
from
airlines.flight
group by
airline,
extract(year from to_timestamp(departure_time)),
extract(month from to_timestamp(departure_time))
extract(month from to_timestamp(departure_time)),
date_format(to_timestamp(departure_time), 'MMM')
order by
departure_year asc,
departure_month asc;
""")
df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas()

# Get the top 5 airports with the most departures for selected airline
# Get the top airports with the most departures by airport, airline, year, and rank
ranked_airports_table = _tbl_env.sql_query(f"""
with cte_ranked as (
select
airline,
departure_year,
departure_airport_code,
flight_count,
ROW_NUMBER() OVER (PARTITION BY airline ORDER BY flight_count DESC) AS row_num
ROW_NUMBER() OVER (PARTITION BY airline, departure_year ORDER BY flight_count DESC) AS row_num
from (
select
airline,
extract(year from to_timestamp(departure_time)) as departure_year,
departure_airport_code,
count(*) as flight_count
from
airlines.flight
group by
airline,
extract(year from to_timestamp(departure_time)),
departure_airport_code
) tbl
)
select
airline,
departure_year,
departure_airport_code,
flight_count,
row_num
Expand All @@ -71,15 +86,11 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
""")
df_ranked_airports_table = ranked_airports_table.to_pandas()

# Get the flight data for the selected airline and year
# Get the flight data by airline and year
flight_table = _tbl_env.sql_query(f"SELECT *, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight")
df_flight_table = flight_table.to_pandas()

# Get the number of departure years and airlines in the dataset
lookups_table = _tbl_env.sql_query(f"SELECT DISTINCT airline, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight")
df_lookups_table = lookups_table.to_pandas()

return df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table, df_lookups_table
return df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table


def main(args):
Expand Down Expand Up @@ -158,22 +169,21 @@ def main(args):
print(f"Current database: {tbl_env.get_current_database()}")

# Load the data
df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table, df_lookups_table = load_data(tbl_env, database_name)
df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table = load_data(tbl_env, database_name)

st.title("Apache Flink Kickstarter Dashboard")
st.write("This Streamlit application displays data from the Apache Iceberg table created by the Flink Apps.")
st.write("This Streamlit application displays data from the Apache Iceberg table created by the DataGenerator and FlightImporter Flink Apps.")

# Create a dropdown boxes
global selected_airline
selected_airline = st.selectbox(
index=0,
label='Choose Airline:',
options=df_lookups_table['airline'].dropna().unique()
options=df_flight_table['airline'].dropna().unique()
)
selected_departure_year = st.selectbox(
index=0,
label='Choose Depature Year:',
options=df_lookups_table['departure_year'].dropna().unique()
options=df_flight_table['departure_year'].dropna().unique()
)

with st.container(border=True):
Expand All @@ -184,9 +194,8 @@ def main(args):

# Bar chart
st.title(f"{selected_airline} Monthly Flights in {selected_departure_year}")
df_airline_monthly_flights_table= df_airline_monthly_flights_table[(df_airline_monthly_flights_table['departure_year'] == selected_departure_year) & (df_airline_monthly_flights_table['airline'] == selected_airline)]
st.bar_chart(data=df_airline_monthly_flights_table,
x="departure_month",
st.bar_chart(data=df_airline_monthly_flights_table[(df_airline_monthly_flights_table['departure_year'] == selected_departure_year) & (df_airline_monthly_flights_table['airline'] == selected_airline)] ,
x="departure_month_abbr",
y="flight_count",
x_label="Departure Month",
y_label="Number of Flights")
Expand All @@ -196,22 +205,23 @@ def main(args):

with col2:
st.header("Airport Ranking")
st.title(f"Top {selected_departure_year} {selected_airline} Airports")

df_ranked_airports_table = df_ranked_airports_table[df_ranked_airports_table['airline'] == selected_airline]
# Filter the ranked airports table
df_filter_table = df_ranked_airports_table[(df_ranked_airports_table['airline'] == selected_airline) & (df_ranked_airports_table['departure_year'] == selected_departure_year)]

# Create a slider to select the number of airports to rank
rank_value = st.slider(label="Number of Airports to rank:",
min_value=5,
max_value=df_ranked_airports_table['row_num'].max(),
rank_value = st.slider(label="Ranking:",
min_value=3,
max_value=df_filter_table['row_num'].max(),
step=1,
value=5)
value=3)

# Pie chart
df_ranked_airports_table = df_ranked_airports_table[df_ranked_airports_table['row_num'] <= rank_value]
fig = px.pie(df_ranked_airports_table,
fig = px.pie(df_filter_table[(df_filter_table['row_num'] <= rank_value)],
values='flight_count',
names='departure_airport_code',
title=f"Top {rank_value} Airports with the most dpartures for {selected_airline}",)
title=f"Top {rank_value} based on departures",)
st.plotly_chart(fig, theme=None)

# Display the description of the pie chart
Expand All @@ -221,19 +231,16 @@ def main(args):
st.header(f"{selected_departure_year} {selected_airline} Flight Data")

# Create grid options with only specific columns
gb = GridOptionsBuilder.from_dataframe(df_flight_table)
gb.configure_columns(["email_address", "departure_time", "departure_airport_code", "arrival_time", "arrival_airport_code", "flight_number", "confirmation_code", "airline"])
gridOptions = gb.build()
df_flight_table = df_flight_table[(df_flight_table['departure_year'] == selected_departure_year) & (df_flight_table['airline'] == selected_airline)]
df_filter_table = df_flight_table[(df_flight_table['departure_year'] == selected_departure_year) & (df_flight_table['airline'] == selected_airline)]
AgGrid(
df_flight_table,
gridOptions=gridOptions,
df_filter_table,
gridOptions=GridOptionsBuilder.from_dataframe(df_flight_table).build(),
height=300,
width='100%'
)

# Display the description of the table
st.write("This table displays the flight data from the Apache Iceberg table. The table shows the email address, departure time, departure airport code, arrival time, arrival airport code, flight number, confirmation code, and airline for each flight.")
st.write("This table displays the flight data from the Apache Iceberg table. The table shows the email address, departure time, departure airport code, arrival time, arrival airport code, flight number, confirmation code, airline, and departure year for each flight.")


if __name__ == "__main__":
Expand Down

0 comments on commit 1b80938

Please sign in to comment.