Skip to content

Commit

Permalink
start scaffolding out dbt
Browse files Browse the repository at this point in the history
  • Loading branch information
jlaneve committed Dec 13, 2022
1 parent 59941af commit 719de8d
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 0 deletions.
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions core/parse/base_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class BaseParser:
"""
A base Parser class that all parsers should inherit from.
"""

def __init__(self):
pass

def parse(self):
"""
Parses the given data into a `cosmos` entity.
:return: The parsed entity
:rtype: Task or Group
"""
raise NotImplementedError()
51 changes: 51 additions & 0 deletions core/render/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import importlib
from datetime import datetime

from airflow.models import DAG

from core.graph.group import Group


class CosmosDag:
"""
Render a Task or Group as an Airflow DAG.
"""

def __init__(self, group: Group):
"""
:param group: The Group to render
:type group: Group
"""
self.group = group

def render(self):
"""
Render the DAG.
:return: The rendered DAG
:rtype: DAG
"""
dag = DAG(
dag_id=self.entity.task_id,
default_args={
"owner": "airflow",
"start_date": datetime(2019, 1, 1),
},
schedule_interval=None,
)

for task in self.group.tasks:
# import the operator class
module_name, class_name = task.operator_class.rsplit(".", 1)
module = importlib.import_module(module_name)
operator = getattr(module, class_name)

# instantiate the operator
t = operator(**task.arguments)

for upstream_task_id in task.upstream_task_ids:
t.set_upstream(upstream_task_id)

t.set_downstream(task.task_id)

return dag
103 changes: 103 additions & 0 deletions providers/dbt/parser/project.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import json
import os

from core.graph.group import Group
from core.graph.task import Task
from core.parse.base_parser import BaseParser

from .utils import validate_directory


class DbtProjectParser(BaseParser):
"""
Parses a dbt project into `cosmos` entities.
"""

def __init__(
self,
project_path: str,
dbt_root_path: str,
dbt_profiles_dir: str,
):
"""
Initializes the parser.
:param project_path: The path to the dbt project, relative to the dbt root path
:type project_path: str
:param dbt_root_path: The path to the dbt root directory
:type dbt_root_path: str
:param dbt_profiles_dir: The path to the dbt profiles directory
:type dbt_profiles_dir: str
"""
# Validate the project path
validate_directory(project_path, "project_path")
self.project_path = project_path

# Validate the dbt root path
validate_directory(dbt_root_path, "dbt_root_path")
self.dbt_root_path = dbt_root_path

# Validate the dbt profiles directory
validate_directory(dbt_profiles_dir, "dbt_profiles_dir")
self.dbt_profiles_dir = dbt_profiles_dir

def parse(self):
"""
Parses the dbt project in the project_path into `cosmos` entities.
"""
manifest = self.get_manifest()
nodes = manifest["nodes"]

for node_name, node in nodes.items():
if node_name.split(".")[0] == "model":
# make the run task
run_task = Task(
task_id=node_name,
operator_class="cosmos.providers.dbt.operators.DbtRunModel",
arguments={
"model_name": node_name,
"project_path": self.project_path,
"dbt_root_path": self.dbt_root_path,
"dbt_profiles_dir": self.dbt_profiles_dir,
},
)

# make the test task
test_task = Task(
task_id=f"{node_name}_test",
operator_class="cosmos.providers.dbt.operators.DbtTestModel",
upstream_task_ids=[node_name],
arguments={
"model_name": node_name,
"project_path": self.project_path,
"dbt_root_path": self.dbt_root_path,
"dbt_profiles_dir": self.dbt_profiles_dir,
},
)

# make the group
group = Group(
group_id=node_name,
tasks=[run_task, test_task],
)

# do something with the group for now
print(group)

def get_manifest(self) -> dict:
"""
Gets the dbt manifest for the project.
:return: The dbt manifest
:rtype: dict
"""
manifest_path = os.path.join(
self.dbt_root_path,
self.project_path,
"target/manifest.json",
)

with open(manifest_path) as f:
manifest = json.load(f)

return manifest
23 changes: 23 additions & 0 deletions providers/dbt/parser/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
from typing import Optional


def validate_directory(
path: str,
param_name: Optional[str],
):
"""
Validates that the given path exists and is a directory. If not, raises a
ValueError.
"""
if not param_name:
param_name = "Path"

if not path:
raise ValueError(f"{param_name} must be provided.")

if not os.path.exists(path):
raise ValueError(f"{param_name} {path} does not exist.")

if not os.path.isdir(path):
raise ValueError(f"{param_name} {path} is not a directory.")
45 changes: 45 additions & 0 deletions providers/dbt/render/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from core.render.dag import CosmosDag
from providers.dbt.parser.project import DbtProjectParser


class DbtDag(CosmosDag):
"""
Render a dbt project as an Airflow DAG.
"""

def __init__(self, project_dir: str, **kwargs):
"""
:param project_dir: The path to the dbt project directory
:type project_dir: str
:param kwargs: Additional arguments to pass to the DAG constructor
:type kwargs: dict
:return: The rendered DAG
:rtype: airflow.models.DAG
"""
self.project_dir = project_dir
self.kwargs = kwargs

return self.render()

def render(self):
"""
Render the DAG.
:return: The rendered DAG
:rtype: airflow.models.DAG
"""
# first, parse the dbt project and get a Group
parser = DbtProjectParser(
project_path=self.project_dir,
)
group = parser.parse()

# then, render the Group as a DAG
dag = super().render(group)

# finally, update the DAG with any additional kwargs
for key, value in self.kwargs.items():
setattr(dag, key, value)

return dag

0 comments on commit 719de8d

Please sign in to comment.