diff --git a/cosmos/config.py b/cosmos/config.py index ea757fd46..37b4a67c8 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -55,7 +55,7 @@ class RenderConfig: :param select: A list of dbt select arguments (e.g. 'config.materialized:incremental') :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') :param selector: Name of a dbt YAML selector to use for parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. - :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing + :param dbt_deps: (deprecated) Configure to run dbt deps when using dbt ls for dag parsing :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. @@ -75,7 +75,7 @@ class RenderConfig: select: list[str] = field(default_factory=list) exclude: list[str] = field(default_factory=list) selector: str | None = None - dbt_deps: bool = True + dbt_deps: bool | None = None node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None dbt_executable_path: str | Path = get_system_dbt() env_vars: dict[str, str] | None = None @@ -94,6 +94,11 @@ def __post_init__(self, dbt_project_path: str | Path | None) -> None: "RenderConfig.env_vars is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.", DeprecationWarning, ) + if self.dbt_deps is not None: + warnings.warn( + "RenderConfig.dbt_deps is deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0. Use ProjectConfig.install_dbt_deps instead.", + DeprecationWarning, + ) self.project_path = Path(dbt_project_path) if dbt_project_path else None # allows us to initiate this attribute from Path objects and str self.dbt_ls_path = Path(self.dbt_ls_path) if self.dbt_ls_path else None @@ -141,6 +146,7 @@ class ProjectConfig: Class for setting project config. :param dbt_project_path: The path to the dbt project directory. Example: /path/to/dbt/project. Defaults to None + :param install_dbt_deps: Run dbt deps during DAG parsing and task execution. Defaults to True. :param models_relative_path: The relative path to the dbt models directory within the project. Defaults to models :param seeds_relative_path: The relative path to the dbt seeds directory within the project. Defaults to seeds :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to @@ -159,6 +165,7 @@ class ProjectConfig: """ dbt_project_path: Path | None = None + install_dbt_deps: bool = True manifest_path: Path | None = None models_path: Path | None = None seeds_path: Path | None = None diff --git a/cosmos/converter.py b/cosmos/converter.py index 91a7ff43f..7c4205f37 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -155,7 +155,14 @@ def validate_initial_user_config( raise CosmosValueError( "ProjectConfig.env_vars and operator_args with 'env' are mutually exclusive and only one can be used." ) + if "install_deps" in operator_args: + warn( + "The operator argument `install_deps` is deprecated since Cosmos 1.9 and will be removed in Cosmos 2.0. Use `ProjectConfig.install_dbt_deps` instead.", + DeprecationWarning, + ) + if "vars" in operator_args: + # TODO: remove the following in a separate PR warn( "operator_args with 'vars' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.vars instead.", DeprecationWarning, @@ -196,6 +203,31 @@ def validate_changed_config_paths( ) +def override_configuration( + project_config: ProjectConfig, render_config: RenderConfig, execution_config: ExecutionConfig, operator_args: dict +) -> None: + """ + There are a few scenarios where a configuration should override another one. + This function changes, in place, render_config, execution_config and operator_args depending on other configurations. + """ + if project_config.dbt_project_path: + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path + + if render_config.dbt_deps is None: + render_config.dbt_deps = project_config.install_dbt_deps + + if execution_config.dbt_executable_path: + operator_args["dbt_executable_path"] = execution_config.dbt_executable_path + + if execution_config.invocation_mode: + operator_args["invocation_mode"] = execution_config.invocation_mode + + if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV): + if "install_deps" not in operator_args: + operator_args["install_deps"] = project_config.install_dbt_deps + + class DbtToAirflowConverter: """ Logic common to build an Airflow DbtDag and DbtTaskGroup from a DBT project. @@ -225,22 +257,15 @@ def __init__( **kwargs: Any, ) -> None: - project_config.validate_project() - - execution_config = execution_config or ExecutionConfig() - render_config = render_config or RenderConfig() - operator_args = operator_args or {} + # We copy the configuration so the changes introduced in this method, such as override_configuration, + # do not affect other DAGs or TaskGroups that may reuse the same original configuration + execution_config = copy.deepcopy(execution_config) if execution_config is not None else ExecutionConfig() + render_config = copy.deepcopy(render_config) if render_config is not None else RenderConfig() + operator_args = copy.deepcopy(operator_args) if operator_args is not None else {} + project_config.validate_project() validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) - - if project_config.dbt_project_path: - # We copy the configuration so the change does not affect other DAGs or TaskGroups - # that may reuse the same original configuration - render_config = copy.deepcopy(render_config) - execution_config = copy.deepcopy(execution_config) - render_config.project_path = project_config.dbt_project_path - execution_config.project_path = project_config.dbt_project_path - + override_configuration(project_config, render_config, execution_config, operator_args) validate_changed_config_paths(execution_config, project_config, render_config) env_vars = project_config.env_vars or operator_args.get("env") @@ -252,9 +277,6 @@ def __init__( ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." ) - if not operator_args: - operator_args = {} - cache_dir = None cache_identifier = None if settings.enable_cache: @@ -291,10 +313,6 @@ def __init__( "vars": dbt_vars, "cache_dir": cache_dir, } - if execution_config.dbt_executable_path: - task_args["dbt_executable_path"] = execution_config.dbt_executable_path - if execution_config.invocation_mode: - task_args["invocation_mode"] = execution_config.invocation_mode validate_arguments( execution_config=execution_config, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 847172e46..a9291445a 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -347,6 +347,9 @@ def __init__( self.dbt_vars = dbt_vars or {} self.operator_args = operator_args or {} self.log_dir: Path | None = None + self.should_install_dbt_deps = ( + self.render_config.dbt_deps if isinstance(self.render_config.dbt_deps, bool) else True + ) @cached_property def env_vars(self) -> dict[str, str]: @@ -642,7 +645,7 @@ def load_via_dbt_ls_without_cache(self) -> None: logger.debug(f"Content of the dbt project dir {project_path}: `{os.listdir(project_path)}`") tmpdir_path = Path(tmpdir) - create_symlinks(project_path, tmpdir_path, self.render_config.dbt_deps) + create_symlinks(project_path, tmpdir_path, self.should_install_dbt_deps) latest_partial_parse = None if self.project.partial_parse: @@ -679,7 +682,7 @@ def load_via_dbt_ls_without_cache(self) -> None: self.log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME) env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) - if self.render_config.dbt_deps and has_non_empty_dependencies_file(self.project_path): + if self.should_install_dbt_deps and has_non_empty_dependencies_file(self.project_path): if is_cache_package_lockfile_enabled(project_path): latest_package_lockfile = _get_latest_cached_package_lockfile(project_path) if latest_package_lockfile: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a9c8f87a8..6652ad411 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -128,7 +128,7 @@ class AbstractDbtLocalBase(AbstractDbtBase): :param profile_args: Arguments to pass to the profile. See :py:class:`cosmos.providers.dbt.core.profiles.BaseProfileMapping`. :param profile_config: ProfileConfig Object - :param install_deps: If true, install dependencies before running the command + :param install_deps (deprecated): If true, install dependencies before running the command :param callback: A callback function called on after a dbt run with a path to the dbt project directory. :param target_name: A name to use for the dbt target. If not provided, and no target is found in your project's dbt_project.yml, "cosmos_target" is used. @@ -151,7 +151,7 @@ def __init__( task_id: str, profile_config: ProfileConfig, invocation_mode: InvocationMode | None = None, - install_deps: bool = False, + install_deps: bool = True, callback: Callable[[str], None] | None = None, callback_args: dict[str, Any] | None = None, should_store_compiled_sql: bool = True, diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index ad11c8b2c..185cae7c0 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -95,7 +95,7 @@ dbt-related - ``vars``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.dbt_vars`` instead) Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``. - ``warn_error``: convert ``dbt`` warnings into errors. - ``full_refresh``: If True, then full refresh the node. This only applies to model and seed nodes. -- ``install_deps``: When using ``ExecutionMode.LOCAL`` or ``ExecutionMode.VIRTUALENV``, run ``dbt deps`` every time a task is executed. +- ``install_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) When using ``ExecutionMode.LOCAL`` or ``ExecutionMode.VIRTUALENV``, run ``dbt deps`` every time a task is executed. Airflow-related ............... diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 279d5b392..e5d4620f8 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -24,6 +24,7 @@ variables that should be used for rendering and execution. It takes the followin will only be rendered at execution time, not at render time. - ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode. +- ``install_dbt_deps``: (new in v1.9) Run dbt deps during DAG parsing and task execution if True (default). - ``partial_parse``: (new in v1.4) If True, then attempt to use the ``partial_parse.msgpack`` if it exists. This is only used for the ``LoadMode.DBT_LS`` load mode, and for the ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV`` execution modes. Due to the way that dbt `partial parsing works `_, it does not work with Cosmos profile mapping classes. To benefit from this feature, users have to set the ``profiles_yml_filepath`` argument in ``ProfileConfig``. diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 83bc0458c..7c8603238 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -13,7 +13,7 @@ The ``RenderConfig`` class takes the following arguments: - ``invocation_mode``: (new in v1.9) how to run ``dbt ls``, when using ``LoadMode.DBT_LS``. Learn more about this below. - ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding `_ for more information. - ``selector``: (new in v1.3) name of a dbt YAML selector to use for DAG parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. See `Selecting & Excluding `_ for more information. -- ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True +- ``dbt_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``dbt_ls_path``: Should be set when using ``load_method=LoadMode.DBT_LS_OUTPUT``. Path of the user-managed output of ``dbt ls``. diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index cd82c0741..a547f20ad 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -37,7 +37,6 @@ def test_is_cosmos_dag_is_true(): profile_config=profile_config, start_date=datetime(2023, 1, 1), dag_id="basic_cosmos_dag", - operator_args={"install_dep": True}, ) assert total_cosmos_tasks(dag) == 13