diff --git a/.blog/images/streamlit-screenshot.png b/.blog/images/streamlit-screenshot.png index 04511c6..e436956 100644 Binary files a/.blog/images/streamlit-screenshot.png and b/.blog/images/streamlit-screenshot.png differ diff --git a/CHANGELOG.md b/CHANGELOG.md index b9ac58c..68c7e6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ All notable changes to this project will be documented in this file. The format is base on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.51.00.000] - 2024-10-19 +### Changed +- Issue [#376](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/376). +- Issue [#378](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/378). + ## [0.50.00.000] - 2024-10-18 ### Added - Issue [#31](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/31). diff --git a/python/README.md b/python/README.md index dfb7c54..2f82bdb 100644 --- a/python/README.md +++ b/python/README.md @@ -73,7 +73,7 @@ The exciting part is that after running all your Flink applications, the data no ![iceberg-flink-streamlit-drawing](../.blog/images/iceberg-flink-streamlit-drawing.png) -To illustrate, I created a Streamlit script that queries the `apache_kickstarter.airlines.flight` and `apache_kickstarter.airlines.flyer_stats` Apache Iceberg Tables, respectively, harnessing Flink SQL to extract valuable insights. These insights are then brought to life through a Streamlit dashboard, transforming raw data into an accessible, visual experience. +To illustrate, I created a Streamlit script that queries the `apache_kickstarter.airlines.flight` Apache Iceberg Table, harnessing Flink SQL to extract valuable insights. These insights are then brought to life through a Streamlit dashboard, transforming raw data into an accessible, visual experience. Here you go, run this in the docker container terminal command line: diff --git a/python/kickstarter/flink_kickstarter_visualization.py b/python/kickstarter/flink_kickstarter_visualization.py index 9f73a0a..bb87d4b 100644 --- a/python/kickstarter/flink_kickstarter_visualization.py +++ b/python/kickstarter/flink_kickstarter_visualization.py @@ -89,22 +89,46 @@ def main(args): # Print the current database name print(f"Current database: {tbl_env.get_current_database()}") - # Read `flight`data from the Iceberg table - flight_table = tbl_env.sql_query(f"SELECT * FROM {database_name}.flight") - df_flight_table = flight_table.to_pandas() + # Get the number of departure years and airlines in the dataset + lookups_table = tbl_env.sql_query(f"SELECT DISTINCT airline, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight") + df_lookups_table = lookups_table.to_pandas() + + # Set the page configuration to wide mode + st.set_page_config(layout="wide") - # Create grid options with only specific columns - gb = GridOptionsBuilder.from_dataframe(df_flight_table) - gb.configure_columns(["email_address", "departure_time", "departure_airport_code", "arrival_time", "arrival_airport_code", "flight_number", "confirmation_code", "airline"]) - gridOptions = gb.build() - AgGrid( - df_flight_table, - gridOptions=gridOptions, - height=300, - width='100%' + st.title("Apache Flink Kickstarter Dashboard") + st.write("This Streamlit application displays data from the Apache Iceberg table created by the Flink Apps.") + + # Create a dropdown boxes + selected_airline = st.selectbox( + index=0, + label='Choose Airline:', + options=df_lookups_table['airline'].dropna().unique() + ) + selected_departure_year = st.selectbox( + index=0, + label='Choose Depature Year:', + options=df_lookups_table['departure_year'].dropna().unique() ) - # + # Get the number of flights per month for the selected airline and year + airline_monthly_flights_table = tbl_env.sql_query(f""" + select + extract(month from to_timestamp(departure_time)) as departure_month, + count(*) as flight_count + from + airlines.flight + where + airline = '{selected_airline}' and + extract(year from to_timestamp(departure_time)) = {selected_departure_year} + group by + extract(month from to_timestamp(departure_time)) + order by + departure_month asc; + """) + df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas() + + # Get the top 5 airports with the most departures for selected airline ranked_airports_table = tbl_env.sql_query(f""" with cte_ranked as ( select @@ -130,44 +154,67 @@ def main(args): from cte_ranked where - airline = 'SkyOne' and + airline = '{selected_airline}' and row_num <= 5; """) df_ranked_airports_table = ranked_airports_table.to_pandas() - - fig, ax = plt.subplots() - ax.set_title('Top 5 Airports with the Most Departures for SkyOne') - ax.pie(df_ranked_airports_table['flight_count'], labels=df_ranked_airports_table['departure_airport_code'], autopct='%1.1f%%', startangle=90) - ax.axis('equal') # Equal aspect ratio ensures that the pie is drawn as a circle. - # Display the pie chart in Streamlit - st.pyplot(fig) + # Get the flight data for the selected airline and year + flight_table = tbl_env.sql_query(f"SELECT * FROM {database_name}.flight WHERE airline = '{selected_airline}' AND extract(year from to_timestamp(departure_time)) = {selected_departure_year}") + df_flight_table = flight_table.to_pandas() - # - airline_monthly_flights_table = tbl_env.sql_query(f""" - select - extract(month from to_timestamp(departure_time)) as departure_month, - count(*) as flight_count - from - airlines.flight - where - airline = 'SkyOne' and - extract(year from to_timestamp(departure_time)) = 2025 - group by - extract(month from to_timestamp(departure_time)) - order by - departure_month asc; - """) - df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas() - - fig, ax = plt.subplots() - ax.bar(df_airline_monthly_flights_table['departure_month'], df_airline_monthly_flights_table['flight_count']) - ax.set_xlabel('departure_month') - ax.set_ylabel('flight_count') - ax.set_title('SkyOne Monthly Flights in 2025') - - # Display the bar chart in Streamlit - st.pyplot(fig) + with st.container(border=True): + col1, col2 = st.columns(2) + + with col1: + st.header("Airline Flights") + + # Bar chart + fig, ax = plt.subplots() + ax.bar(df_airline_monthly_flights_table['departure_month'], df_airline_monthly_flights_table['flight_count'], align='center') + ax.set_xlabel("Departure Month") + ax.set_ylabel("Number of Flights") + ax.set_title(f"{selected_airline} Monthly Flights in {selected_departure_year}") + ax.set_xticks(df_airline_monthly_flights_table['departure_month']) + ax.set_xticklabels(df_airline_monthly_flights_table['departure_month']) + + # Display the bar chart in Streamlit + st.pyplot(fig) + + # Display the description of the bar chart + st.write(f"This bar chart displays the number of {selected_airline} monthly flights in {selected_departure_year}. The x-axis represents the month and the y-axis represents the number of flights.") + + with col2: + st.header("Airport Ranking") + + # Pie chart of the top 5 airports with the most departures for SkyOne + fig, ax = plt.subplots() + ax.set_title(f"Top 5 Airports with the Most Departures for {selected_airline}") + ax.pie(df_ranked_airports_table['flight_count'], labels=df_ranked_airports_table['departure_airport_code'], autopct='%1.1f%%', startangle=90) + ax.axis('equal') # Equal aspect ratio ensures that the pie is drawn as a circle. + + # Display the pie chart in Streamlit + st.pyplot(fig) + + # Display the description of the pie chart + st.write(f"This pie chart displays the top 5 airports with the most departures for {selected_airline}. The chart shows the percentage of flights departing from each of the top 5 airports.") + + with st.container(border=True): + st.header(f"{selected_departure_year} {selected_airline} Flight Data") + + # Create grid options with only specific columns + gb = GridOptionsBuilder.from_dataframe(df_flight_table) + gb.configure_columns(["email_address", "departure_time", "departure_airport_code", "arrival_time", "arrival_airport_code", "flight_number", "confirmation_code", "airline"]) + gridOptions = gb.build() + AgGrid( + df_flight_table, + gridOptions=gridOptions, + height=300, + width='100%' + ) + + # Display the description of the table + st.write("This table displays the flight data from the Apache Iceberg table. The table shows the email address, departure time, departure airport code, arrival time, arrival airport code, flight number, confirmation code, and airline for each flight.") if __name__ == "__main__":