This is an ETL pipeline project with the batching method for collecting and processing real-time bitcoin data and exchange rate conversion data. The main data source is API for real-time bitcoin data from coindesk. The secondary data source is exchage rate that scraped from Google Finance.
- Python (main programming language)
Key Libraries:
urlopen
andjson
(for API data extraction)bs4
andregex
(for scraping Google Finance )
- Airflow & Docker (ETL development and monitoring)
- PostgreSQL (database)
-
Development
- Bottom-Up approach. create functions and files and then put them together.
- Create database.
- Configure dump directory. the prevent file conflicts during processing and also to save memory (there are also cost considerations when using a paid service).
- Build pipeline.
with DAG(dag_id='api_source_etl', default_args=default_args, schedule_interval='@hourly', catchup=False) as dag: dir_prep = PythonOperator( task_id='prepare_dump_directory', python_callable=dir_preparation ) extract_1 = PythonOperator( task_id='extract_api_data', python_callable=extract_coindesk ) extract_2 = PythonOperator( task_id='scraping_data', python_callable=scrape_google_finance ) transform = PythonOperator( task_id='transform_data', python_callable=transform, ) load = PythonOperator( task_id='load_data', python_callable=load, ) # pipeline dir_prep >> [extract_1, extract_2] >> transform >> load
- Build connection to database.
-
Production
- Credential management.
PostgreSQL access protected via environment variables.# get credentials USER = os.getenv('PROD_USER') PASS = os.getenv('PROD_PASS') HOST = os.getenv('PROD_HOST') PORT = os.getenv('PROD_PORT') DB = os.getenv('PROD_DB') TABLE = os.getenv('PROD_TABLE') # create connection conn = psycopg2.connect( database=DB, user=USER, password=PASS, host=HOST, port=PORT )
- Running pipeline.
Pipeline created with 4 steps as follows:
- Credential management.
-
Maintain
- Adding additional bitcoin data sources.
- Moving to cloud-based data warehouse technology.
- Creating dashboards and reports.
This project can provide data that have valuable insights for stakeholders and bitcoin analysts to monitor, analyze, and predict bitcoin prices through the use of real-time data and exchange rate conversion information.