diff --git a/core/base/group.py b/core/graph/group.py similarity index 100% rename from core/base/group.py rename to core/graph/group.py diff --git a/core/base/task.py b/core/graph/task.py similarity index 100% rename from core/base/task.py rename to core/graph/task.py diff --git a/core/parse/base_parser.py b/core/parse/base_parser.py new file mode 100644 index 000000000..dd65f6e9a --- /dev/null +++ b/core/parse/base_parser.py @@ -0,0 +1,16 @@ +class BaseParser: + """ + A base Parser class that all parsers should inherit from. + """ + + def __init__(self): + pass + + def parse(self): + """ + Parses the given data into a `cosmos` entity. + + :return: The parsed entity + :rtype: Task or Group + """ + raise NotImplementedError() diff --git a/core/render/dag.py b/core/render/dag.py new file mode 100644 index 000000000..fd36ff657 --- /dev/null +++ b/core/render/dag.py @@ -0,0 +1,51 @@ +import importlib +from datetime import datetime + +from airflow.models import DAG + +from core.graph.group import Group + + +class CosmosDag: + """ + Render a Task or Group as an Airflow DAG. + """ + + def __init__(self, group: Group): + """ + :param group: The Group to render + :type group: Group + """ + self.group = group + + def render(self): + """ + Render the DAG. + + :return: The rendered DAG + :rtype: DAG + """ + dag = DAG( + dag_id=self.entity.task_id, + default_args={ + "owner": "airflow", + "start_date": datetime(2019, 1, 1), + }, + schedule_interval=None, + ) + + for task in self.group.tasks: + # import the operator class + module_name, class_name = task.operator_class.rsplit(".", 1) + module = importlib.import_module(module_name) + operator = getattr(module, class_name) + + # instantiate the operator + t = operator(**task.arguments) + + for upstream_task_id in task.upstream_task_ids: + t.set_upstream(upstream_task_id) + + t.set_downstream(task.task_id) + + return dag diff --git a/providers/dbt/parser/project.py b/providers/dbt/parser/project.py new file mode 100644 index 000000000..bb742b4ff --- /dev/null +++ b/providers/dbt/parser/project.py @@ -0,0 +1,103 @@ +import json +import os + +from core.graph.group import Group +from core.graph.task import Task +from core.parse.base_parser import BaseParser + +from .utils import validate_directory + + +class DbtProjectParser(BaseParser): + """ + Parses a dbt project into `cosmos` entities. + """ + + def __init__( + self, + project_path: str, + dbt_root_path: str, + dbt_profiles_dir: str, + ): + """ + Initializes the parser. + + :param project_path: The path to the dbt project, relative to the dbt root path + :type project_path: str + :param dbt_root_path: The path to the dbt root directory + :type dbt_root_path: str + :param dbt_profiles_dir: The path to the dbt profiles directory + :type dbt_profiles_dir: str + """ + # Validate the project path + validate_directory(project_path, "project_path") + self.project_path = project_path + + # Validate the dbt root path + validate_directory(dbt_root_path, "dbt_root_path") + self.dbt_root_path = dbt_root_path + + # Validate the dbt profiles directory + validate_directory(dbt_profiles_dir, "dbt_profiles_dir") + self.dbt_profiles_dir = dbt_profiles_dir + + def parse(self): + """ + Parses the dbt project in the project_path into `cosmos` entities. + """ + manifest = self.get_manifest() + nodes = manifest["nodes"] + + for node_name, node in nodes.items(): + if node_name.split(".")[0] == "model": + # make the run task + run_task = Task( + task_id=node_name, + operator_class="cosmos.providers.dbt.operators.DbtRunModel", + arguments={ + "model_name": node_name, + "project_path": self.project_path, + "dbt_root_path": self.dbt_root_path, + "dbt_profiles_dir": self.dbt_profiles_dir, + }, + ) + + # make the test task + test_task = Task( + task_id=f"{node_name}_test", + operator_class="cosmos.providers.dbt.operators.DbtTestModel", + upstream_task_ids=[node_name], + arguments={ + "model_name": node_name, + "project_path": self.project_path, + "dbt_root_path": self.dbt_root_path, + "dbt_profiles_dir": self.dbt_profiles_dir, + }, + ) + + # make the group + group = Group( + group_id=node_name, + tasks=[run_task, test_task], + ) + + # do something with the group for now + print(group) + + def get_manifest(self) -> dict: + """ + Gets the dbt manifest for the project. + + :return: The dbt manifest + :rtype: dict + """ + manifest_path = os.path.join( + self.dbt_root_path, + self.project_path, + "target/manifest.json", + ) + + with open(manifest_path) as f: + manifest = json.load(f) + + return manifest diff --git a/providers/dbt/parser/utils.py b/providers/dbt/parser/utils.py new file mode 100644 index 000000000..8f43071e9 --- /dev/null +++ b/providers/dbt/parser/utils.py @@ -0,0 +1,23 @@ +import os +from typing import Optional + + +def validate_directory( + path: str, + param_name: Optional[str], +): + """ + Validates that the given path exists and is a directory. If not, raises a + ValueError. + """ + if not param_name: + param_name = "Path" + + if not path: + raise ValueError(f"{param_name} must be provided.") + + if not os.path.exists(path): + raise ValueError(f"{param_name} {path} does not exist.") + + if not os.path.isdir(path): + raise ValueError(f"{param_name} {path} is not a directory.") diff --git a/providers/dbt/render/dag.py b/providers/dbt/render/dag.py new file mode 100644 index 000000000..8216e04e7 --- /dev/null +++ b/providers/dbt/render/dag.py @@ -0,0 +1,45 @@ +from core.render.dag import CosmosDag +from providers.dbt.parser.project import DbtProjectParser + + +class DbtDag(CosmosDag): + """ + Render a dbt project as an Airflow DAG. + """ + + def __init__(self, project_dir: str, **kwargs): + """ + :param project_dir: The path to the dbt project directory + :type project_dir: str + :param kwargs: Additional arguments to pass to the DAG constructor + :type kwargs: dict + + :return: The rendered DAG + :rtype: airflow.models.DAG + """ + self.project_dir = project_dir + self.kwargs = kwargs + + return self.render() + + def render(self): + """ + Render the DAG. + + :return: The rendered DAG + :rtype: airflow.models.DAG + """ + # first, parse the dbt project and get a Group + parser = DbtProjectParser( + project_path=self.project_dir, + ) + group = parser.parse() + + # then, render the Group as a DAG + dag = super().render(group) + + # finally, update the DAG with any additional kwargs + for key, value in self.kwargs.items(): + setattr(dag, key, value) + + return dag