Skip to content

Commit

Permalink
Add ProjectConfig.install_dbt_deps & change operator `install_deps=…
Browse files Browse the repository at this point in the history
…True` as default (astronomer#1521)

Cosmos errors if users set different values in `RenderConfig.dbt_deps`
and `operator_args={"install_deps"}`, and since astronomer#1505, it started
raising a more graceful error to users.

The problem is that since the default value for `RenderConfig.dbt_deps`
was `True` and the default value for
`operator_args={"install_deps"=False}` was False, it meant that by
default, Cosmos would raise an exception if users wrote a simple DAG
like:
```
    dag = DbtDag(
        project_config=ProjectConfig(
            DBT_ROOT_PATH / "jaffle_shop",
        ),
        profile_config=profile_config,
        start_date=datetime(2023, 1, 1),
        dag_id="basic_cosmos_dag",
    )
```

Which was a poor experience. This PR aims to solve this.

The documentation will be updated in a separate PR.

Related to: astronomer#1458
Related to: astronomer#1457
  • Loading branch information
tatiana authored Feb 14, 2025
1 parent f584aac commit 968b80e
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 30 deletions.
11 changes: 9 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class RenderConfig:
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param selector: Name of a dbt YAML selector to use for parsing. Only supported when using ``load_method=LoadMode.DBT_LS``.
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param dbt_deps: (deprecated) Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
:param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
Expand All @@ -75,7 +75,7 @@ class RenderConfig:
select: list[str] = field(default_factory=list)
exclude: list[str] = field(default_factory=list)
selector: str | None = None
dbt_deps: bool = True
dbt_deps: bool | None = None
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
dbt_executable_path: str | Path = get_system_dbt()
env_vars: dict[str, str] | None = None
Expand All @@ -94,6 +94,11 @@ def __post_init__(self, dbt_project_path: str | Path | None) -> None:
"RenderConfig.env_vars is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.",
DeprecationWarning,
)
if self.dbt_deps is not None:
warnings.warn(
"RenderConfig.dbt_deps is deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0. Use ProjectConfig.install_dbt_deps instead.",
DeprecationWarning,
)
self.project_path = Path(dbt_project_path) if dbt_project_path else None
# allows us to initiate this attribute from Path objects and str
self.dbt_ls_path = Path(self.dbt_ls_path) if self.dbt_ls_path else None
Expand Down Expand Up @@ -141,6 +146,7 @@ class ProjectConfig:
Class for setting project config.
:param dbt_project_path: The path to the dbt project directory. Example: /path/to/dbt/project. Defaults to None
:param install_dbt_deps: Run dbt deps during DAG parsing and task execution. Defaults to True.
:param models_relative_path: The relative path to the dbt models directory within the project. Defaults to models
:param seeds_relative_path: The relative path to the dbt seeds directory within the project. Defaults to seeds
:param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to
Expand All @@ -159,6 +165,7 @@ class ProjectConfig:
"""

dbt_project_path: Path | None = None
install_dbt_deps: bool = True
manifest_path: Path | None = None
models_path: Path | None = None
seeds_path: Path | None = None
Expand Down
60 changes: 39 additions & 21 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,14 @@ def validate_initial_user_config(
raise CosmosValueError(
"ProjectConfig.env_vars and operator_args with 'env' are mutually exclusive and only one can be used."
)
if "install_deps" in operator_args:
warn(
"The operator argument `install_deps` is deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0. Use `ProjectConfig.install_dbt_deps` instead.",
DeprecationWarning,
)

if "vars" in operator_args:
# TODO: remove the following in a separate PR
warn(
"operator_args with 'vars' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.vars instead.",
DeprecationWarning,
Expand Down Expand Up @@ -196,6 +203,31 @@ def validate_changed_config_paths(
)


def override_configuration(
project_config: ProjectConfig, render_config: RenderConfig, execution_config: ExecutionConfig, operator_args: dict
) -> None:
"""
There are a few scenarios where a configuration should override another one.
This function changes, in place, render_config, execution_config and operator_args depending on other configurations.
"""
if project_config.dbt_project_path:
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

if render_config.dbt_deps is None:
render_config.dbt_deps = project_config.install_dbt_deps

if execution_config.dbt_executable_path:
operator_args["dbt_executable_path"] = execution_config.dbt_executable_path

if execution_config.invocation_mode:
operator_args["invocation_mode"] = execution_config.invocation_mode

if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
if "install_deps" not in operator_args:
operator_args["install_deps"] = project_config.install_dbt_deps


class DbtToAirflowConverter:
"""
Logic common to build an Airflow DbtDag and DbtTaskGroup from a DBT project.
Expand Down Expand Up @@ -225,22 +257,15 @@ def __init__(
**kwargs: Any,
) -> None:

project_config.validate_project()

execution_config = execution_config or ExecutionConfig()
render_config = render_config or RenderConfig()
operator_args = operator_args or {}
# We copy the configuration so the changes introduced in this method, such as override_configuration,
# do not affect other DAGs or TaskGroups that may reuse the same original configuration
execution_config = copy.deepcopy(execution_config) if execution_config is not None else ExecutionConfig()
render_config = copy.deepcopy(render_config) if render_config is not None else RenderConfig()
operator_args = copy.deepcopy(operator_args) if operator_args is not None else {}

project_config.validate_project()
validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args)

if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

override_configuration(project_config, render_config, execution_config, operator_args)
validate_changed_config_paths(execution_config, project_config, render_config)

env_vars = project_config.env_vars or operator_args.get("env")
Expand All @@ -252,9 +277,6 @@ def __init__(
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV."
)

if not operator_args:
operator_args = {}

cache_dir = None
cache_identifier = None
if settings.enable_cache:
Expand Down Expand Up @@ -291,10 +313,6 @@ def __init__(
"vars": dbt_vars,
"cache_dir": cache_dir,
}
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path
if execution_config.invocation_mode:
task_args["invocation_mode"] = execution_config.invocation_mode

validate_arguments(
execution_config=execution_config,
Expand Down
7 changes: 5 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ def __init__(
self.dbt_vars = dbt_vars or {}
self.operator_args = operator_args or {}
self.log_dir: Path | None = None
self.should_install_dbt_deps = (
self.render_config.dbt_deps if isinstance(self.render_config.dbt_deps, bool) else True
)

@cached_property
def env_vars(self) -> dict[str, str]:
Expand Down Expand Up @@ -642,7 +645,7 @@ def load_via_dbt_ls_without_cache(self) -> None:
logger.debug(f"Content of the dbt project dir {project_path}: `{os.listdir(project_path)}`")
tmpdir_path = Path(tmpdir)

create_symlinks(project_path, tmpdir_path, self.render_config.dbt_deps)
create_symlinks(project_path, tmpdir_path, self.should_install_dbt_deps)

latest_partial_parse = None
if self.project.partial_parse:
Expand Down Expand Up @@ -679,7 +682,7 @@ def load_via_dbt_ls_without_cache(self) -> None:
self.log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME)
env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir)

if self.render_config.dbt_deps and has_non_empty_dependencies_file(self.project_path):
if self.should_install_dbt_deps and has_non_empty_dependencies_file(self.project_path):
if is_cache_package_lockfile_enabled(project_path):
latest_package_lockfile = _get_latest_cached_package_lockfile(project_path)
if latest_package_lockfile:
Expand Down
4 changes: 2 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class AbstractDbtLocalBase(AbstractDbtBase):
:param profile_args: Arguments to pass to the profile. See
:py:class:`cosmos.providers.dbt.core.profiles.BaseProfileMapping`.
:param profile_config: ProfileConfig Object
:param install_deps: If true, install dependencies before running the command
:param install_deps (deprecated): If true, install dependencies before running the command
:param callback: A callback function called on after a dbt run with a path to the dbt project directory.
:param target_name: A name to use for the dbt target. If not provided, and no target is found
in your project's dbt_project.yml, "cosmos_target" is used.
Expand All @@ -151,7 +151,7 @@ def __init__(
task_id: str,
profile_config: ProfileConfig,
invocation_mode: InvocationMode | None = None,
install_deps: bool = False,
install_deps: bool = True,
callback: Callable[[str], None] | None = None,
callback_args: dict[str, Any] | None = None,
should_store_compiled_sql: bool = True,
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/operator-args.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ dbt-related
- ``vars``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.dbt_vars`` instead) Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``.
- ``warn_error``: convert ``dbt`` warnings into errors.
- ``full_refresh``: If True, then full refresh the node. This only applies to model and seed nodes.
- ``install_deps``: When using ``ExecutionMode.LOCAL`` or ``ExecutionMode.VIRTUALENV``, run ``dbt deps`` every time a task is executed.
- ``install_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) When using ``ExecutionMode.LOCAL`` or ``ExecutionMode.VIRTUALENV``, run ``dbt deps`` every time a task is executed.

Airflow-related
...............
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/project-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ variables that should be used for rendering and execution. It takes the followin
will only be rendered at execution time, not at render time.
- ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with
env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode.
- ``install_dbt_deps``: (new in v1.9) Run dbt deps during DAG parsing and task execution if True (default).
- ``partial_parse``: (new in v1.4) If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used
for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``
execution modes. Due to the way that dbt `partial parsing works <https://docs.getdbt.com/reference/parsing#known-limitations>`_, it does not work with Cosmos profile mapping classes. To benefit from this feature, users have to set the ``profiles_yml_filepath`` argument in ``ProfileConfig``.
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``invocation_mode``: (new in v1.9) how to run ``dbt ls``, when using ``LoadMode.DBT_LS``. Learn more about this below.
- ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
- ``selector``: (new in v1.3) name of a dbt YAML selector to use for DAG parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
- ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True
- ``dbt_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) A Boolean to run dbt deps when using dbt ls for dag parsing. Default True
- ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below.
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``dbt_ls_path``: Should be set when using ``load_method=LoadMode.DBT_LS_OUTPUT``. Path of the user-managed output of ``dbt ls``.
Expand Down
1 change: 0 additions & 1 deletion tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def test_is_cosmos_dag_is_true():
profile_config=profile_config,
start_date=datetime(2023, 1, 1),
dag_id="basic_cosmos_dag",
operator_args={"install_dep": True},
)
assert total_cosmos_tasks(dag) == 13

Expand Down

0 comments on commit 968b80e

Please sign in to comment.