Skip to content

Commit

Permalink
Merge pull request #381 from j3-signalroom/github_issue-378
Browse files Browse the repository at this point in the history
GitHub issue 378
  • Loading branch information
j3-signalroom authored Oct 20, 2024
2 parents 9cc0d65 + a45220b commit a97205c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 47 deletions.
Binary file modified .blog/images/streamlit-screenshot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ All notable changes to this project will be documented in this file.

The format is base on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.51.00.000] - 2024-10-19
### Changed
- Issue [#376](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/376).
- Issue [#378](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/378).

## [0.50.00.000] - 2024-10-18
### Added
- Issue [#31](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/31).
Expand Down
2 changes: 1 addition & 1 deletion python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ The exciting part is that after running all your Flink applications, the data no
![iceberg-flink-streamlit-drawing](../.blog/images/iceberg-flink-streamlit-drawing.png)
To illustrate, I created a Streamlit script that queries the `apache_kickstarter.airlines.flight` and `apache_kickstarter.airlines.flyer_stats` Apache Iceberg Tables, respectively, harnessing Flink SQL to extract valuable insights. These insights are then brought to life through a Streamlit dashboard, transforming raw data into an accessible, visual experience.
To illustrate, I created a Streamlit script that queries the `apache_kickstarter.airlines.flight` Apache Iceberg Table, harnessing Flink SQL to extract valuable insights. These insights are then brought to life through a Streamlit dashboard, transforming raw data into an accessible, visual experience.
Here you go, run this in the docker container terminal command line:
Expand Down
139 changes: 93 additions & 46 deletions python/kickstarter/flink_kickstarter_visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,46 @@ def main(args):
# Print the current database name
print(f"Current database: {tbl_env.get_current_database()}")

# Read `flight`data from the Iceberg table
flight_table = tbl_env.sql_query(f"SELECT * FROM {database_name}.flight")
df_flight_table = flight_table.to_pandas()
# Get the number of departure years and airlines in the dataset
lookups_table = tbl_env.sql_query(f"SELECT DISTINCT airline, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight")
df_lookups_table = lookups_table.to_pandas()

# Set the page configuration to wide mode
st.set_page_config(layout="wide")

# Create grid options with only specific columns
gb = GridOptionsBuilder.from_dataframe(df_flight_table)
gb.configure_columns(["email_address", "departure_time", "departure_airport_code", "arrival_time", "arrival_airport_code", "flight_number", "confirmation_code", "airline"])
gridOptions = gb.build()
AgGrid(
df_flight_table,
gridOptions=gridOptions,
height=300,
width='100%'
st.title("Apache Flink Kickstarter Dashboard")
st.write("This Streamlit application displays data from the Apache Iceberg table created by the Flink Apps.")

# Create a dropdown boxes
selected_airline = st.selectbox(
index=0,
label='Choose Airline:',
options=df_lookups_table['airline'].dropna().unique()
)
selected_departure_year = st.selectbox(
index=0,
label='Choose Depature Year:',
options=df_lookups_table['departure_year'].dropna().unique()
)

#
# Get the number of flights per month for the selected airline and year
airline_monthly_flights_table = tbl_env.sql_query(f"""
select
extract(month from to_timestamp(departure_time)) as departure_month,
count(*) as flight_count
from
airlines.flight
where
airline = '{selected_airline}' and
extract(year from to_timestamp(departure_time)) = {selected_departure_year}
group by
extract(month from to_timestamp(departure_time))
order by
departure_month asc;
""")
df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas()

# Get the top 5 airports with the most departures for selected airline
ranked_airports_table = tbl_env.sql_query(f"""
with cte_ranked as (
select
Expand All @@ -130,44 +154,67 @@ def main(args):
from
cte_ranked
where
airline = 'SkyOne' and
airline = '{selected_airline}' and
row_num <= 5;
""")
df_ranked_airports_table = ranked_airports_table.to_pandas()

fig, ax = plt.subplots()
ax.set_title('Top 5 Airports with the Most Departures for SkyOne')
ax.pie(df_ranked_airports_table['flight_count'], labels=df_ranked_airports_table['departure_airport_code'], autopct='%1.1f%%', startangle=90)
ax.axis('equal') # Equal aspect ratio ensures that the pie is drawn as a circle.

# Display the pie chart in Streamlit
st.pyplot(fig)
# Get the flight data for the selected airline and year
flight_table = tbl_env.sql_query(f"SELECT * FROM {database_name}.flight WHERE airline = '{selected_airline}' AND extract(year from to_timestamp(departure_time)) = {selected_departure_year}")
df_flight_table = flight_table.to_pandas()

#
airline_monthly_flights_table = tbl_env.sql_query(f"""
select
extract(month from to_timestamp(departure_time)) as departure_month,
count(*) as flight_count
from
airlines.flight
where
airline = 'SkyOne' and
extract(year from to_timestamp(departure_time)) = 2025
group by
extract(month from to_timestamp(departure_time))
order by
departure_month asc;
""")
df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas()

fig, ax = plt.subplots()
ax.bar(df_airline_monthly_flights_table['departure_month'], df_airline_monthly_flights_table['flight_count'])
ax.set_xlabel('departure_month')
ax.set_ylabel('flight_count')
ax.set_title('SkyOne Monthly Flights in 2025')

# Display the bar chart in Streamlit
st.pyplot(fig)
with st.container(border=True):
col1, col2 = st.columns(2)

with col1:
st.header("Airline Flights")

# Bar chart
fig, ax = plt.subplots()
ax.bar(df_airline_monthly_flights_table['departure_month'], df_airline_monthly_flights_table['flight_count'], align='center')
ax.set_xlabel("Departure Month")
ax.set_ylabel("Number of Flights")
ax.set_title(f"{selected_airline} Monthly Flights in {selected_departure_year}")
ax.set_xticks(df_airline_monthly_flights_table['departure_month'])
ax.set_xticklabels(df_airline_monthly_flights_table['departure_month'])

# Display the bar chart in Streamlit
st.pyplot(fig)

# Display the description of the bar chart
st.write(f"This bar chart displays the number of {selected_airline} monthly flights in {selected_departure_year}. The x-axis represents the month and the y-axis represents the number of flights.")

with col2:
st.header("Airport Ranking")

# Pie chart of the top 5 airports with the most departures for SkyOne
fig, ax = plt.subplots()
ax.set_title(f"Top 5 Airports with the Most Departures for {selected_airline}")
ax.pie(df_ranked_airports_table['flight_count'], labels=df_ranked_airports_table['departure_airport_code'], autopct='%1.1f%%', startangle=90)
ax.axis('equal') # Equal aspect ratio ensures that the pie is drawn as a circle.

# Display the pie chart in Streamlit
st.pyplot(fig)

# Display the description of the pie chart
st.write(f"This pie chart displays the top 5 airports with the most departures for {selected_airline}. The chart shows the percentage of flights departing from each of the top 5 airports.")

with st.container(border=True):
st.header(f"{selected_departure_year} {selected_airline} Flight Data")

# Create grid options with only specific columns
gb = GridOptionsBuilder.from_dataframe(df_flight_table)
gb.configure_columns(["email_address", "departure_time", "departure_airport_code", "arrival_time", "arrival_airport_code", "flight_number", "confirmation_code", "airline"])
gridOptions = gb.build()
AgGrid(
df_flight_table,
gridOptions=gridOptions,
height=300,
width='100%'
)

# Display the description of the table
st.write("This table displays the flight data from the Apache Iceberg table. The table shows the email address, departure time, departure airport code, arrival time, arrival airport code, flight number, confirmation code, and airline for each flight.")


if __name__ == "__main__":
Expand Down

0 comments on commit a97205c

Please sign in to comment.