Skip to content

jo1013/pyspark

Repository files navigation

AIRFLOW + PYSPARK

์ด ๊ธ€์€ ์šฐ๋ถ„ํˆฌ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.


0. ํ™˜๊ฒฝ์…‹ํŒ…

docker ํ™˜๊ฒฝ ๋‹ค์šด

$ docker pull jo1013/pyspark:0.05
$ docker pull jo1013/airflow:0.07
$ docker pull mysql:8.0.17

git clone (๊ธ€์“ด์ด๋Š” /home/workspace ์—์„œ ์‹คํ–‰)

$ git clone https://github.com/jo1013/pyspark.git
$ cd pyspark

1. ์‹คํ–‰ ๋ช…๋ น์–ด

$ docker-compose up  ## mysql pyspark airflow(postgresql) ์ปจํ…Œ์ด๋„ˆ์‹คํ–‰
  • (docker-compose.yml์—์„œ 3๊ฐœ์˜ container๋Š” ๋ณธ์ธ์˜ volumes์— ๋งž๊ฒŒ ์ˆ˜์ •ํ•œ๋‹ค.)

2. ๋‹ค๋ฅธ ํ„ฐ๋ฏธ๋„๋กœ ์ปจํ…Œ์ด๋„ˆ ์ ‘์†ํ•˜๊ธฐ

$ docker exec -it airflow bash

$ docker exec -it [์„ค์ •์ด๋ฆ„] bash

3. postgresql ๊ตฌ๋™

  • postgreqs ์‹œ์ž‘
$ service postgresql start

์ปจํ…Œ์ด๋„ˆ๋ฅผ ๊ณ„์† ๋„์šฐ๊ธฐ ์‹ซ๋‹ค๋ฉด ?

  • ๋กœ์ปฌ ํ„ฐ๋ฏธ๋„์—์„œ
$ docker exec -it -d airflow service postgresql start 

4. ์—ฐ๊ฒฐ ๋””๋ ‰ํ† ๋ฆฌ ๋ณ€๊ฒฝ

$ nano /root/airflow/airflow.cfg

์ž์‹ ์˜ ๋ณผ๋ฅ  ์„ค์ •์— ๋งž๊ฒŒ ์ˆ˜์ • (๊ธ€์“ด์ด์™€ ํด๋”์„ค์ • ๊ฐ™์„ ๊ฒฝ์šฐ ๊ณ ์น  ํ•„์š” X)

# dags_folder = /root/airflow/dags 
dags_folder = /home/pyspark/airflow/dags

# base_log_folder = /root/airflow/logs 
base_log_folder = /home/pyspark/airflow/logs 

# plugins_folder = /root/airflow/plugins
plugins_folder = /home/pyspark/airflow/plugins

# default_timezone = utc 
default_timezone = Asia/Seoul 

# executor = SequentialExecutor 
executor = LocalExecutor 

5. ์‹คํ–‰

airflow ์‹œ์ž‘ ๋ช…๋ น์–ด

$ airflow webserver

์ปจํ…Œ์ด๋„ˆ๋ฅผ ๊ณ„์† ๋„์šฐ๊ธฐ ์‹ซ๋‹ค๋ฉด ?

  • ๋กœ์ปฌ ํ„ฐ๋ฏธ๋„์—์„œ
$ docker exec -it -d airflow airflow webserver


AIRFLOW

airflow๋งŒ ์‹คํ–‰ ๋ช…๋ น์–ด

$ cd Airflow
$ docker run -it -d -p 8090:8080 -v ~/workspace:/home -e LC_ALL=C.UTF-8 --name airflow6 jo1013/airflowex:0.06
$ docker run -it -d -p [์—ฐ๊ฒฐ๋กœ์ปฌํฌํŠธ]:[์—ฐ๊ฒฐ๋„์ปคํฌํŠธ] -v [๋กœ์ปฌ๋””๋ ‰ํ„ฐ๋ฆฌ]:[์ปจํ…Œ์ด๋„ˆ๋””๋ ‰ํ„ฐ๋ฆฌ] -e LC_ALL=C.[์ธ์ฝ”๋”ฉ๋ฐฉ์‹] --name [์„ค์ •ํ• ์ด๋ฆ„] [dockerhubid]/[imagename]:[tag]

DB account ์„ค์ • ๋ฐ ๊ถŒํ•œ ์„ค์ •

$ sudo su - postgres
$ psql
$ CREATE DATABASE airflow;
$ CREATE USER timmy with ENCRYPTED password '0000';
$ GRANT all privileges on DATABASE airflow to timmy;

postgres ์œ ์ €์˜ airflow db์ ‘์†

$ \c airflow
$ GRANT all privileges on all tables in schema public to timmy;
$ \q        
$ exit

postgre cluster ์„ค์ •

$ pg_createcluster 13 main 
$ pg_ctlcluster 13 main start

postgresql ์„ค์ •

# $ cd /etc/postgresql/13/main
# $ nano pg_hba.conf

์•„๋ž˜์™€ ๊ฐ™์ด ์ˆ˜์ •

๋ชจ๋“  ํฌํŠธ์— ๋Œ€ํ•ด ์—ด์–ด๋†“๊ธฐ (์ถ”ํ›„ ์ˆ˜์ • ํ•„์š”)

# IPv4 local connections:                                                          
host        all             all             0.0.0.0/0               md5 

DB ์žฌ์‹œ์ž‘

$ service postgresql restart

Arflow ์ˆ˜์ •ํ•˜๊ธฐ

# sql_alchemy_conn = sqlite:////root/airflow/airflow.db 
# sql_alchemy_conn = postgresql+psycopg2://timmy:0000@172.17.0.2/airflow    

# docker hub์—์„œ๋Š” ๊ฐ€๋Šฅํ–‡์œผ๋‚˜ docker-compose์—์„œ๋Š” ๋‹จ์ผ ์ปจํ…Œ์ด๋„ˆ๊ณผ IP Adress๊ฐ€ ๋‹ฌ๋ผ์ง„๋‹ค.

sql_alchemy_conn = postgresql+psycopg2://timmy:0000@localhost/airflow
# --> ๊ฐ™์€ docker (์ปจํ…Œ์ด๋„ˆ) ๋‚ด์—์„œ postgresql์ด ์ž‘๋™ํ•˜๋ฏ€๋กœ localhost๋กœ ๊ณ ์นœ๋‹ค.
  • sql_alchemy_conn์— localhost๋ฅผ ์ ์œผ๋ฉด ํ•ด๋‹น ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ฐพ์•„๊ฐ€์ง€ ๋ชปํ•˜๊ธฐ ๋•Œ๋ฌธ์— postgres์ปจํ…Œ์ด๋„ˆ์˜ IP๋ฅผ ๋„ฃ์–ด์ค˜์•ผํ•œ๋‹ค.

IP ํ™•์ธ

$ ifconfig

์™ธ๋ถ€์ ‘์† ํ—ˆ์šฉ

$ cd /etc/postgresql/13/main
$ nano pg_hba.conf 

์•„๋ž˜์ฒ˜๋Ÿผ ์ˆ˜์ •

IPv4 local connections:                                                          
host        all             all             0.0.0.0/0               md5 

postgresql ์žฌ์‹œ์ž‘

$ service postgresql restart

ํด๋” ๋งŒ๋“ค๊ธฐ

$ cd Airflow
$ mkdir dags
$ mkdir logs

airflow db ์ดˆ๊ธฐํ™” (๋กœ๊ทธ์ธ ์•ˆ๋ ๋•Œ ์ด์šฉ)

$ airflow db init

๋ณ€๊ฒฝ ๋‚ด์šฉ ์ €์žฅ

$ docker commit postgres postgres:airflow

๊ณ„์ • ์ƒ์„ฑ py ํŒŒ์ผ ์‹คํ–‰

$ cd home

$ nano makeuser.py  

makeuser.py๋ฅผ ~/airflow_home ์œ„์น˜๋กœ ์ˆ˜์ •

$ cp makeuser.py airflow

makr user.py ์— ๋„ฃ์„ ๋‚ด์šฉ ์•„๋ž˜ ๋ณต์‚ฌ (์•„์ด๋””,๋น„๋ฐ€๋ฒˆํ˜ธ ์ˆ˜์ •)

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user.email = 'sunny@test.com'
user.password = 'sunny'
user.superuser = True
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()


๋˜๋Š” ํ„ฐ๋ฏธ๋„์—์„œ ์ง์ ‘ ๊ณ„์ •์ƒ์„ฑ

(์•„์ด๋””,๋น„๋ฐ€๋ฒˆํ˜ธ ์ˆ˜์ •)

$ airflow users create \
          --username admin \
          --firstname FIRST_NAME \
          --lastname LAST_NAME \
          --role Admin \
          --email admin@example.org

  • db ์ดˆ๊ธฐํ™” (postgres 'airflow' table )
airflow db init 

์ฐธ๊ณ  ์ž๋ฃŒ

Creating a Database Cluster

-   In file system terms, a database cluster is a single directory under which all data will be stored. We call this the data directory or data area. It is completely up to you where you choose to store your data. There is no default, although locations such as /usr/local/pgsql/data or /var/lib/pgsql/data are popular. The data directory must be initialized before being used, using the program initdb which is installed with PostgreSQL.
-   ๋ณดํ†ต ์œ„์˜ ๊ธ€์ค‘์— ๋‚˜์˜จ ๊ฒฝ๋กœ์™€ ๊ฐ™์ด ๊ฒฝ๋กœ ์„ค์ •์„ ํ•˜์ง€๋งŒ ๊ผญ ๊ทธ๋Ÿด ํ•„์š”๋Š” ์—†๋‹ค๋Š” ๋‚ด์šฉ~

---

Airflow ๋ช…๋ช…๋ฒ•

๋‹ค์Œ์€ Airflow ์—…๋ฌด ํ๋ฆ„์„ ์„ค๊ณ„ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๋ช‡ ๊ฐ€์ง€ ์šฉ์–ด์— ๊ด€ํ•œ ๊ฐ„๋žตํ•œ ๊ฐœ์š”์ด๋‹ค.

  • Airflow DAG๋Š” ํƒœ์Šคํฌ๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.

  • ๊ฐ ํƒœ์Šคํฌ๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ ํด๋ž˜์Šค๋ฅผ ์ธ์Šคํ„ด์Šคํ™”ํ•˜์—ฌ ๋งŒ๋“ ๋‹ค. ๊ตฌ์„ฑํ•œ ์˜คํผ๋ ˆ์ดํ„ฐ ์ธ์Šคํ„ด์Šค๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ํƒœ์Šคํฌ๊ฐ€ ๋œ๋‹ค. my_task = MyOperator(...)

  • DAG๊ฐ€ ์‹œ์ž‘๋˜๋ฉด Airflow๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— DAG ๋Ÿฐ ํ•ญ๋ชฉ์„ ๋งŒ๋“ ๋‹ค.

  • ํŠน์ • DAG ๋Ÿฐ ๋งฅ๋ฝ์—์„œ ํƒœ์Šคํฌ๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ํƒœ์Šคํฌ ์ธ์Šคํ„ด์Šค๊ฐ€ ๋งŒ๋“ค์–ด์ง„๋‹ค.

  • AIRFLOW_HOME์€ DAG ์ •์˜ ํŒŒ์ผ๊ณผ Airflow ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ์ €์žฅํ•˜๋Š” ๋””๋ ‰ํ„ฐ๋ฆฌ์ด๋‹ค.

์ถœ์ฒ˜ : https://aldente0630.github.io/data-engineering/2018/06/17/developing-workflows-with-apache-airflow.html

-> ๋ฒ„์ „์ด ๋‹ฌ๋ผ์„œ์ธ์ง€ ์œ„ ์ถœ์ฒ˜์˜ ํŠœํ† ๋ฆฌ์–ผ์„ ๋”ฐ๋ผํ•˜๋‹ค๋ณด๋ฉด import ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๋Š”๋ฐ dags/test_operators.py ์—์„œ

from airflow.operators import MyFirstOperator

๋ฅผ

from my_operators import MyFirstOperator

from [filename] import [classname]

์œผ๋กœ ๊ณ ์ณ์ฃผ๋ฉด ๋œ๋‹ค.

๋ฒ„์ „ ๋ณ€๊ฒฝ์œผ๋กœ ์ธํ•œ ์˜ค๋ฅ˜๋กœ ๋ณด์ธ๋‹ค.?


pyspark

pyspark ์ปจํ…Œ์ด๋„ˆ๋งŒ ์‹คํ–‰

$ docker run -it --rm -p 8888:8888 -p 8000:8000 -v ~/workspace:/home jo1013/pyspark:0.05

pyspark bash ์ ‘์†

$ docker exec -it py_spark bash
$ docker exec -it [container id or container name] bash

์ฅฌํ”ผํ„ฐ ๋…ธํŠธ๋ถ ์‹คํ–‰ ํฌํŠธ 8888 (pyspark ์ปจํ…Œ์ด๋„ˆ ๋‚ด์—์„œ ์‹คํ–‰)

$ jupyter notebook --allow-root --ip=0.0.0.0 --port=8888 --no-browser

mysql๋งŒ run run

$ docker run -n db-mysql -e MYSQL_DATABASE=testdb - MYSQL_ROOT_PASSWORD=root - TZ=Asia/Seoul -p 3306:3306 -c --character-set-server=utf8mb4 -c --collation-server=utf8mb4_unicode_ci 
$ docker run --name db-mysql -e MYSQL_ROOT_PASSWORD=root -d -p 3306:3306 mysql

airflow dag ๋ฆฌ์ŠคํŠธ ๋ณด๊ธฐ

$ airflow dags list

task list ๋ณด๊ธฐ

$ airflow tasks list

์ฅฌํ”ผํ„ฐ ๋…ธํŠธ๋ถ ์‹คํ–‰ ํฌํŠธ 8888

$ jupyter notebook --allow-root --ip=0.0.0.0 --port=8888 --no-browser

mysql Container db ์—ฐ๊ฒฐ๋ฒ•

๋กœ์ปฌํ„ฐ๋ฏธ๋„์—์„œ ip ์ฃผ์†Œ ํ™•์ธ

$ ifconfig

๋กœ์ปฌ์— ์—ฐ๊ฒฐ๋˜ ์žˆ์œผ๋ฏ€๋กœ ์—ฐ๊ฒฐ dbํ˜ธ์ŠคํŠธ๋ฅผ ๋กœ์ปฌ ip์ฃผ์†Œ๋ฅผ ์ž…๋ ฅํ•˜๋ฉด๋œ๋‹ค.

๋‹ค๋ฅธ ์ปจํ…Œ์ด๋„ˆ๋กœ ํŒŒ์ด์ฌํŒŒ์ผ ์‹คํ–‰ํ• ๋–„ ssh ์‚ฌ์šฉ

Airflow dag command์— ๋‹ค๋ฅธ์ปจํ…Œ์ด๋„ˆ ๋ช…๋ น ์‹คํ–‰ ๋ฐฉ๋ฒ•

ํ•ด๋ฒ• : ssh๋กœ ๋ช…๋ น๋‚ด๋ฆฌ๊ธฐ


*๊ทธ์ „์— ํ•ด๋‹น์ปจํ…Œ์ด๋„ˆ์™€ ๋ช…๋ น์–ด ๋ฐ›๋Š” ์ปจํ…Œ์ด๋„ˆ์— ssh ์„ค์น˜ํ•˜๊ณ ๋ช…๋ น์–ด ๋ฐ›๋Š” ์ปจํ…Œ์ด๋„ˆ์— /etc/ssh/sshd_config ์—์„œ PermitRootLogin ์„ yes๋กœ ๊ณ ์ณ์ฃผ์–ด์•ผํ•จ (์ ˆ๋Œ€X) (๋ณด์•ˆ ์œ„ํ—˜)

ssh ์ ‘์† ์ •๋ณด๋ฅผ ์ฝ”๋“œ์— ๋‘๋Š” ๊ฒƒ์„ ํ”ผํ•˜๊ธฐ ์œ„ํ•ด

  1. Airflow DB์— ์ ‘์†์ •๋ณด๋ฅผ ์ €์žฅํ•˜๊ณ  operator์—์„œ๋Š” ๋ถˆ๋Ÿฌ์„œ ์“ธ ์ˆ˜ ์žˆ๋Š” SSH Connection์„ ์„ค์ •ํ•ด์„œ ์‚ฌ์šฉ
  2. ์ถ”๊ฐ€๋กœ ๊ฐ€๋Šฅํ•˜๋‹ค๋ฉด ์™ธ๋ถ€ secret manager ํ˜• ์„œ๋น„์Šค๋ฅผ ์‚ฌ์šฉํ•˜์‹œ๋Š” ๊ฒƒ์„ ๊ถŒ์žฅ๋“œ๋ฆฝ๋‹ˆ๋‹ค.
$ ssh-keygen -t rsa
  • id_rsa: ๋น„๋ฐ€ํ‚ค
  • id_rsa.pub: ๊ณต๊ฐœํ‚ค

์‹คํ–‰ํ•˜๋Š” ์ปจํ…Œ์ด๋„ˆ์—์„œ ์‚ฌ์šฉ์ž๋ฅผ ์ถ”๊ฐ€

$ useradd airflow
$ useradd [์œ ์ €]
su -  airflow
$  mkdir .ssh

์‚ฌ์šฉ ํด๋” ๊ถŒํ•œ ํ—ˆ์šฉ

$ chown -R airflow:airflow /home/workspace
$ chown -R ๊ณ„์ •๋ช…:๊ณ„์ •๋ช… [ํ™ˆ๋””๋ ‰ํ„ฐ๋ฆฌ๊ฒฝ๋กœ]

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published