diff --git a/cosmos/__init__.py b/cosmos/__init__.py
index 7374e9db6..b5d730b44 100644
--- a/cosmos/__init__.py
+++ b/cosmos/__init__.py
@@ -172,6 +172,44 @@
DbtTestAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks")
+try:
+ from cosmos.operators.aws_ecs import (
+ DbtBuildAwsEcsOperator,
+ DbtLSAwsEcsOperator,
+ DbtRunAwsEcsOperator,
+ DbtRunOperationAwsEcsOperator,
+ DbtSeedAwsEcsOperator,
+ DbtSnapshotAwsEcsOperator,
+ DbtSourceAwsEcsOperator,
+ DbtTestAwsEcsOperator,
+ )
+except ImportError:
+ DbtBuildAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtBuildAwsEcsOperator", "aws-ecs"
+ ) # pragma: no cover
+ DbtLSAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtLSAwsEcsOperator", "aws-ecs") # pragma: no cover
+ DbtRunAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtRunAwsEcsOperator", "aws-ecs"
+ ) # pragma: no cover
+ DbtRunOperationAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtRunOperationAwsEcsOperator",
+ "aws-ecs",
+ ) # pragma: no cover
+ DbtSeedAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtSeedAwsEcsOperator", "aws-ecs"
+ ) # pragma: no cover
+ DbtSnapshotAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtSnapshotAwsEcsOperator",
+ "aws-ecs",
+ ) # pragma: no cover
+ DbtTestAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtTestAwsEcsOperator", "aws-ecs"
+ ) # pragma: no cover
+ DbtSourceAwsEcsOperator = MissingPackage(
+ "cosmos.operators.aws_ecs.DbtSourceAwsEcsOperator", "aws-ecs"
+ ) # pragma: no cover
+
+
try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
@@ -267,6 +305,15 @@
"DbtSeedAwsEksOperator",
"DbtSnapshotAwsEksOperator",
"DbtTestAwsEksOperator",
+ # AWS ECS Task Run Execution Mode
+ "DbtBuildAwsEcsOperator",
+ "DbtLSAwsEcsOperator",
+ "DbtRunAwsEcsOperator",
+ "DbtRunOperationAwsEcsOperator",
+ "DbtSeedAwsEcsOperator",
+ "DbtSnapshotAwsEcsOperator",
+ "DbtTestAwsEcsOperator",
+ "DbtSourceAwsEcsOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtCloneGcpCloudRunJobOperator",
diff --git a/cosmos/constants.py b/cosmos/constants.py
index a68f5a836..53c0df937 100644
--- a/cosmos/constants.py
+++ b/cosmos/constants.py
@@ -93,6 +93,7 @@ class ExecutionMode(Enum):
DOCKER = "docker"
KUBERNETES = "kubernetes"
AWS_EKS = "aws_eks"
+ AWS_ECS = "aws_ecs"
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"
GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job"
diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py
new file mode 100644
index 000000000..2decbcc6c
--- /dev/null
+++ b/cosmos/operators/aws_ecs.py
@@ -0,0 +1,207 @@
+from __future__ import annotations
+
+import inspect
+from typing import Any, Callable, Sequence
+
+from airflow.utils.context import Context
+
+from cosmos.config import ProfileConfig
+from cosmos.log import get_logger
+from cosmos.operators.base import (
+ AbstractDbtBase,
+ DbtBuildMixin,
+ DbtLSMixin,
+ DbtRunMixin,
+ DbtRunOperationMixin,
+ DbtSeedMixin,
+ DbtSnapshotMixin,
+ DbtSourceMixin,
+ DbtTestMixin,
+)
+
+logger = get_logger(__name__)
+
+DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}
+
+try:
+ from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
+except ImportError: # pragma: no cover
+ raise ImportError(
+ "Could not import EcsRunTaskOperator. Ensure you've installed the Amazon Web Services provider "
+ "separately or with `pip install astronomer-cosmos[...,aws-ecs]`."
+ ) # pragma: no cover
+
+
+class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignore
+ """
+ Executes a dbt core cli command in an ECS Task instance with dbt installed in it.
+
+ """
+
+ template_fields: Sequence[str] = tuple(
+ list(AbstractDbtBase.template_fields) + list(EcsRunTaskOperator.template_fields)
+ )
+
+ intercept_flag = False
+
+ def __init__(
+ self,
+ # arguments required by EcsRunTaskOperator
+ aws_conn_id: str,
+ cluster: str,
+ task_definition: str,
+ container_name: str,
+ #
+ profile_config: ProfileConfig | None = None,
+ command: list[str] | None = None,
+ environment_variables: dict[str, Any] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ self.profile_config = profile_config
+ self.command = command
+ self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES
+ self.container_name = container_name
+ kwargs.update(
+ {
+ "aws_conn_id": aws_conn_id,
+ "task_definition": task_definition,
+ "cluster": cluster,
+ "overrides": None,
+ }
+ )
+ super().__init__(**kwargs)
+ # In PR #1474, we refactored cosmos.operators.base.AbstractDbtBase to remove its inheritance from BaseOperator
+ # and eliminated the super().__init__() call. This change was made to resolve conflicts in parent class
+ # initializations while adding support for ExecutionMode.AIRFLOW_ASYNC. Operators under this mode inherit
+ # Airflow provider operators that enable deferrable SQL query execution. Since super().__init__() was removed
+ # from AbstractDbtBase and different parent classes require distinct initialization arguments, we explicitly
+ # initialize them (including the BaseOperator) here by segregating the required arguments for each parent class.
+ base_operator_args = set(inspect.signature(EcsRunTaskOperator.__init__).parameters.keys())
+ base_kwargs = {}
+ for arg_key, arg_value in kwargs.items():
+ if arg_key in base_operator_args:
+ base_kwargs[arg_key] = arg_value
+ base_kwargs["task_id"] = kwargs["task_id"]
+ base_kwargs["aws_conn_id"] = aws_conn_id
+ EcsRunTaskOperator.__init__(self, **base_kwargs)
+
+ def build_and_run_cmd(
+ self,
+ context: Context,
+ cmd_flags: list[str] | None = None,
+ run_as_async: bool = False,
+ async_context: dict[str, Any] | None = None,
+ ) -> Any:
+ self.build_command(context, cmd_flags)
+ self.log.info(f"Running command: {self.command}")
+
+ result = EcsRunTaskOperator.execute(self, context)
+
+ logger.info(result)
+
+ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
+ # For the first round, we're going to assume that the command is dbt
+ # This means that we don't have openlineage support, but we will create a ticket
+ # to add that in the future
+ self.dbt_executable_path = "dbt"
+ dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
+ self.environment_variables = {**env_vars, **self.environment_variables}
+ self.command = dbt_cmd
+ # Override Ecs Task Run default arguments with dbt command
+ self.overrides = {
+ "containerOverrides": [
+ {
+ "name": self.container_name,
+ "command": self.command,
+ "environment": [{"name": key, "value": value} for key, value in self.environment_variables.items()],
+ }
+ ]
+ }
+
+
+class DbtBuildAwsEcsOperator(DbtBuildMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core build command.
+ """
+
+ template_fields: Sequence[str] = DbtAwsEcsBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+
+class DbtLSAwsEcsOperator(DbtLSMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core ls command.
+ """
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+
+class DbtSeedAwsEcsOperator(DbtSeedMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core seed command.
+
+ :param full_refresh: dbt optional arg - dbt will treat incremental models as table models
+ """
+
+ template_fields: Sequence[str] = DbtAwsEcsBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+
+class DbtSnapshotAwsEcsOperator(DbtSnapshotMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core snapshot command.
+ """
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+
+class DbtSourceAwsEcsOperator(DbtSourceMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt source freshness command.
+ """
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+
+class DbtRunAwsEcsOperator(DbtRunMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core run command.
+ """
+
+ template_fields: Sequence[str] = DbtAwsEcsBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+
+class DbtTestAwsEcsOperator(DbtTestMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core test command.
+ """
+
+ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None:
+ super().__init__(**kwargs)
+ # as of now, on_warning_callback in docker executor does nothing
+ self.on_warning_callback = on_warning_callback
+
+
+class DbtRunOperationAwsEcsOperator(DbtRunOperationMixin, DbtAwsEcsBaseOperator):
+ """
+ Executes a dbt core run-operation command.
+
+ :param macro_name: name of macro to execute
+ :param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the
+ selected macro.
+ """
+
+ template_fields: Sequence[str] = DbtAwsEcsBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
diff --git a/docs/_static/cosmos_aws_ecs_schematic.png b/docs/_static/cosmos_aws_ecs_schematic.png
new file mode 100644
index 000000000..d1e897053
Binary files /dev/null and b/docs/_static/cosmos_aws_ecs_schematic.png differ
diff --git a/docs/_static/jaffle_shop_aws_ecs_dag_run.png b/docs/_static/jaffle_shop_aws_ecs_dag_run.png
new file mode 100644
index 000000000..69879c829
Binary files /dev/null and b/docs/_static/jaffle_shop_aws_ecs_dag_run.png differ
diff --git a/docs/_static/jaffle_shop_aws_ecs_dag_run_logs.png b/docs/_static/jaffle_shop_aws_ecs_dag_run_logs.png
new file mode 100644
index 000000000..802f018f5
Binary files /dev/null and b/docs/_static/jaffle_shop_aws_ecs_dag_run_logs.png differ
diff --git a/docs/getting_started/aws-container-run-job.rst b/docs/getting_started/aws-container-run-job.rst
new file mode 100644
index 000000000..00cef7ade
--- /dev/null
+++ b/docs/getting_started/aws-container-run-job.rst
@@ -0,0 +1,191 @@
+.. title:: Getting Started with Astronomer Cosmos on AWS ECS
+
+Getting Started with Astronomer Cosmos on AWS ECS
+==================================================
+
+Astronomer Cosmos provides a unified way to run containerized workloads across multiple cloud providers. In this guide, you’ll learn how to deploy and run a Cosmos job on AWS Elastic Container Service (ECS) using Fargate.
+Schematically, the guide will walk you through the steps required to build the following architecture:
+
+.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/cosmos_aws_ecs_schematic.png
+ :width: 800
+
+Prerequisites
++++++++++++++
+
+Before you begin, ensure you have the following:
+
+- An active **AWS Account** with permissions to create ECS clusters, register task definitions, and run tasks.
+- The **AWS CLI** installed and configured with the proper credentials.
+- **Docker** installed for building your container image.
+- Access to your container registry (for example, **Amazon ECR**) where your job image is stored.
+- Basic familiarity with AWS ECS concepts (clusters, task definitions, services, and Fargate).
+- An existing installation of **Astronomer Cosmos** (refer to the `Cosmos documentation `_ for more details).
+
+
+
+Step-by-step guide
+++++++++++++++++++
+
+**Install Airflow and Cosmos**
+
+Create a python virtualenv, activate it, upgrade pip to the latest version and install ``apache airflow`` & ``astronomer cosmos``:
+
+.. code-block:: bash
+
+ python3 -m venv venv
+ source venv/bin/activate
+ python3 -m pip install --upgrade pip
+ pip install apache-airflow
+ pip install "astronomer-cosmos[amazon]"
+ pip install "aiobotocore[boto3]"
+.. note::
+ The package aiobotocore[boto3] is optional; you will need it if you plan to use **deferred tasks**.
+
+**Set up your ECR**
+
+1. **Set your secrets**
+ On the `cosmos-examples `_ repository, you can find a ready-to-use Docker image for the AWS ECS service. Just replace your secrets, or you can create your own.
+
+2. **AWS CLI login**
+ Before building and pushing your image, you first need to log in to the AWS service using the AWS CLI tool.
+ Use the following command:
+
+ .. code-block:: bash
+
+ aws ecr-public get-login-password --region | docker login --username AWS --password-stdin
+
+3. **Build and tag your image**
+ Once you have your image ready, run the following commands:
+
+ .. code-block:: bash
+
+ docker build -f Dockerfile.aws_ecs . --platform=linux/amd64 -t
+ docker tag
+
+4. **Push your image**
+
+ .. code-block:: bash
+
+ docker push
+
+**Configure Your AWS Environment**
+
+1. **Create an ECS Cluster**
+
+ Create an ECS cluster to host your Cosmos jobs. You can do this from the AWS Console or using the AWS CLI:
+
+ .. code-block:: bash
+
+ aws ecs create-cluster --cluster-name my-cosmos-cluster
+
+2. **Set Up an IAM Role for ECS Tasks**
+
+ Ensure you have an IAM role that your ECS tasks can assume. This role should include permissions for ECS, ECR, and CloudWatch (for logs). For example, you might create a role named ``ecsTaskExecutionRole`` with the managed policies:
+
+ - ``AmazonECSTaskExecutionRolePolicy``
+ - (Optional) Additional policies for custom resource access
+
+3. **Configure Networking**
+
+ For Fargate tasks, make sure you have at least one subnet (preferably in multiple Availability Zones) and a security group that permits outbound internet access if needed. Note the subnet IDs for later use.
+
+**Prepare Your Cosmos Job Definition**
+
+Cosmos jobs are defined as container tasks. Create a task definition file (e.g., ``cosmos-task-definition.json``) with the configuration for your job.
+
+For example:
+
+.. code-block:: json
+
+ {
+ "family": "cosmos-job",
+ "networkMode": "awsvpc",
+ "requiresCompatibilities": [
+ "FARGATE"
+ ],
+ "cpu": "512",
+ "memory": "1024",
+ "executionRoleArn": "arn:aws:iam:::role/ecsTaskExecutionRole",
+ "containerDefinitions": [
+ {
+ "name": "cosmos-job",
+ "image": "/your_image:latest",
+ "essential": true,
+ "environment": [
+ { "name": "VAR1", "value": "value1" },
+ { "name": "VAR2", "value": "value2" }
+ ],
+ "logConfiguration": {
+ "logDriver": "awslogs",
+ "options": {
+ "awslogs-group": "/ecs/cosmos-job",
+ "awslogs-region": "us-east-1",
+ "awslogs-stream-prefix": "ecs"
+ }
+ }
+ }
+ ]
+ }
+
+.. note::
+
+ Replace ````, ````, and adjust the CPU, memory, and environment variables as needed.
+
+**Deploy Your Cosmos Job on AWS ECS**
+
+1. **Register the Task Definition**
+
+ Use the AWS CLI to register your task definition:
+
+ .. code-block:: bash
+
+ aws ecs register-task-definition --cli-input-json file://cosmos-task-definition.json
+
+2. **Run the Task**
+
+ Run a test task on your ECS cluster. Specify the subnets and security groups in your network configuration. For example:
+
+ .. code-block:: bash
+
+ aws ecs run-task \
+ --cluster my-cosmos-cluster \
+ --launch-type FARGATE \
+ --task-definition cosmos-job \
+ --network-configuration "awsvpcConfiguration={subnets=[subnet-12345678,subnet-87654321],securityGroups=[sg-abcdef12],assignPublicIp=ENABLED}"
+
+ Once the test is ok, we are able to run the dbt commands in our Cosmos DAG:
+
+ .. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/jaffle_shop_aws_ecs_dag_run.png
+ :width: 800
+
+ .. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/jaffle_shop_aws_ecs_dag_run_logs.png
+ :width: 800
+
+ Remember to config your DAG for connecting to AWS ECS and the database connection where you are performing your SQL queries!
+
+
+**Monitor and Debug Your Job**
+
+1. **Check Task Status**
+
+ You can view the status of your task from the AWS Console under your ECS cluster or via the CLI:
+
+ .. code-block:: bash
+
+ aws ecs describe-tasks --cluster my-cosmos-cluster --tasks
+
+2. **View Logs**
+
+ Since the task definition configures AWS CloudWatch Logs, you can view your job’s output in the CloudWatch Logs console. Look for log streams with the prefix you set (e.g., ``ecs/cosmos-job``).
+
+**Conclusion**
+
+
+By following this guide, you can deploy Astronomer Cosmos jobs on AWS ECS using Fargate. This integration enables you to leverage the scalability and managed infrastructure of ECS while maintaining a consistent container orchestration experience with Cosmos.
+
+For more detailed information on AWS ECS, please refer to the `AWS ECS Developer Guide `_.
+
+Happy deploying! :rocket:
+
+
+Remember to config your DAG for connecting to AWS ECS and the database connection where you are performing your SQL queries!
diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst
index 0c3fea639..3b4a3c928 100644
--- a/docs/getting_started/execution-modes.rst
+++ b/docs/getting_started/execution-modes.rst
@@ -12,7 +12,8 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut
5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image)
6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image)
7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image)
-8. **airflow_async**: (Experimental and introduced since Cosmos 1.7.0) Run the dbt resources from your dbt project asynchronously, by submitting the corresponding compiled SQLs to Apache Airflow's `Deferrable operators `__
+8. **aws_ecs**: Run ``dbt`` commands from AWS ECS instances managed by Cosmos (requires a pre-existing Docker image)
+9. **airflow_async**: (Experimental and introduced since Cosmos 1.7.0) Run the dbt resources from your dbt project asynchronously, by submitting the corresponding compiled SQLs to Apache Airflow's `Deferrable operators `__
The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.
@@ -53,6 +54,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con
- Slow
- High
- No
+ * - AWS ECS
+ - Slow
+ - High
+ - No
* - Airflow Async
- Medium
- None
@@ -244,6 +249,40 @@ Each task will create a new Cloud Run Job execution, giving full isolation. The
)
+AWS ECS
+---------
+.. versionadded:: 1.9.0
+
+Using ``AWS Elastic Container Service (ECS)`` as the execution mode provides an isolated and scalable way to run ``dbt`` tasks within an AWS ECS service. This execution mode ensures that each ``dbt`` run is performed inside a dedicated container running in an ECS task.
+
+This execution mode requires the user to have an AWS environment configured to run ECS tasks (see :ref:``aws-ecs`` for more details on the exact requirements). Similar to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles.
+
+Each task will create a new ECS task execution, providing full isolation. However, this separation introduces some overhead in execution time due to container startup and provisioning. For users who require faster execution times, configuring appropriate ECS task definitions and cluster optimizations can help mitigate these delays.
+
+Please refer to the step-by-step guide for using AWS ECS as the execution mode.
+
+.. code-block:: python
+
+ aws_ecs_cosmos_dag = DbtDag(
+ # ...
+ execution_config=ExecutionConfig(execution_mode=ExecutionMode.AWS_ECS),
+ operator_args={
+ "aws_conn_id": "aws_default",
+ "cluster": "my-ecs-cluster",
+ "task_definition": "my-dbt-task",
+ "container_name": "dbt-container",
+ "launch_type": "FARGATE",
+ "deferrable": True,
+ "network_configuration": {
+ "awsvpcConfiguration": {
+ "subnets": ["<<>>"],
+ "assignPublicIp": "ENABLED",
+ },
+ },
+ "environment_variables": {"DBT_PROFILE_NAME": "default"},
+ },
+ )
+
Airflow Async (experimental)
----------------------------
diff --git a/pyproject.toml b/pyproject.toml
index a251028d5..58262f1bd 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -99,6 +99,9 @@ kubernetes = [
aws_eks = [
"apache-airflow-providers-amazon>=8.0.0",
]
+aws-ecs = [
+ "apache-airflow-providers-amazon>=8.0.0",
+]
azure-container-instance = [
"apache-airflow-providers-microsoft-azure>=8.5.0",
]
diff --git a/tests/operators/test_aws_ecs.py b/tests/operators/test_aws_ecs.py
new file mode 100644
index 000000000..230c1616a
--- /dev/null
+++ b/tests/operators/test_aws_ecs.py
@@ -0,0 +1,221 @@
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from airflow.utils.context import Context
+from pendulum import datetime
+
+from cosmos.operators.aws_ecs import (
+ DbtAwsEcsBaseOperator,
+ DbtBuildAwsEcsOperator,
+ DbtLSAwsEcsOperator,
+ DbtRunAwsEcsOperator,
+ DbtRunOperationAwsEcsOperator,
+ DbtSeedAwsEcsOperator,
+ DbtSnapshotAwsEcsOperator,
+ DbtSourceAwsEcsOperator,
+ DbtTestAwsEcsOperator,
+)
+
+
+class ConcreteDbtAwsEcsOperator(DbtAwsEcsBaseOperator):
+ base_cmd = ["cmd"]
+
+
+def test_dbt_aws_ecs_operator_add_global_flags() -> None:
+ """
+ Check if global flags are added correctly.
+ """
+ dbt_base_operator = ConcreteDbtAwsEcsOperator(
+ task_id="my-task",
+ aws_conn_id="my-aws-conn-id",
+ cluster="my-ecs-cluster",
+ task_definition="my-dbt-task-definition",
+ container_name="my-dbt-container-name",
+ project_dir="my/dir",
+ vars={
+ "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
+ "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
+ },
+ no_version_check=True,
+ )
+ assert dbt_base_operator.add_global_flags() == [
+ "--vars",
+ "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
+ "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
+ "--no-version-check",
+ ]
+
+
+@patch("cosmos.operators.base.context_to_airflow_vars")
+def test_dbt_aws_ecs_operator_get_env(p_context_to_airflow_vars: MagicMock) -> None:
+ """
+ If an end user passes in a variable via the context that is also a global flag, validate that the both are kept
+ """
+ dbt_base_operator = ConcreteDbtAwsEcsOperator(
+ task_id="my-task",
+ aws_conn_id="my-aws-conn-id",
+ cluster="my-ecs-cluster",
+ task_definition="my-dbt-task-definition",
+ container_name="my-dbt-container-name",
+ project_dir="my/dir",
+ )
+ dbt_base_operator.env = {
+ "start_date": "20220101",
+ "end_date": "20220102",
+ "some_path": Path(__file__),
+ "retries": 3,
+ ("tuple", "key"): "some_value",
+ }
+ p_context_to_airflow_vars.return_value = {"START_DATE": "2023-02-15 12:30:00"}
+ env = dbt_base_operator.get_env(
+ Context(execution_date=datetime(2023, 2, 15, 12, 30)),
+ )
+ expected_env = {
+ "start_date": "20220101",
+ "end_date": "20220102",
+ "some_path": Path(__file__),
+ "START_DATE": "2023-02-15 12:30:00",
+ }
+ assert env == expected_env
+
+
+@patch("cosmos.operators.base.context_to_airflow_vars")
+def test_dbt_aws_ecs_operator_check_environment_variables(
+ p_context_to_airflow_vars: MagicMock,
+) -> None:
+ """
+ If an end user passes in a variable via the context that is also a global flag, validate that the both are kept
+ """
+ dbt_base_operator = ConcreteDbtAwsEcsOperator(
+ task_id="my-task",
+ aws_conn_id="my-aws-conn-id",
+ cluster="my-ecs-cluster",
+ task_definition="my-dbt-task-definition",
+ container_name="my-dbt-container-name",
+ project_dir="my/dir",
+ environment_variables={"FOO": "BAR"},
+ )
+ dbt_base_operator.env = {
+ "start_date": "20220101",
+ "end_date": "20220102",
+ "some_path": Path(__file__),
+ "retries": 3,
+ "FOO": "foo",
+ ("tuple", "key"): "some_value",
+ }
+ expected_env = {"start_date": "20220101", "end_date": "20220102", "some_path": Path(__file__), "FOO": "BAR"}
+ dbt_base_operator.build_command(context=MagicMock())
+
+ assert dbt_base_operator.environment_variables == expected_env
+
+
+base_kwargs = {
+ "task_id": "my-task",
+ "aws_conn_id": "my-aws-conn-id",
+ "cluster": "my-ecs-cluster",
+ "task_definition": "my-dbt-task-definition",
+ "container_name": "my-dbt-container-name",
+ "environment_variables": {"FOO": "BAR", "OTHER_FOO": "OTHER_BAR"},
+ "project_dir": "my/dir",
+ "vars": {
+ "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
+ "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
+ },
+ "no_version_check": True,
+}
+
+result_map = {
+ "ls": DbtLSAwsEcsOperator(**base_kwargs),
+ "run": DbtRunAwsEcsOperator(**base_kwargs),
+ "test": DbtTestAwsEcsOperator(**base_kwargs),
+ "source": DbtSourceAwsEcsOperator(**base_kwargs),
+ "seed": DbtSeedAwsEcsOperator(**base_kwargs),
+ "build": DbtBuildAwsEcsOperator(**base_kwargs),
+ "snapshot": DbtSnapshotAwsEcsOperator(**base_kwargs),
+ "run-operation": DbtRunOperationAwsEcsOperator(macro_name="some-macro", **base_kwargs),
+}
+
+
+def test_dbt_aws_ecs_build_command():
+ """
+ Check whether the dbt command is built correctly.
+ """
+ for command_name, command_operator in result_map.items():
+ command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock())
+ if command_name not in {"run-operation", "source"}:
+ assert command_operator.command == [
+ "dbt",
+ command_name,
+ "--vars",
+ "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
+ "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
+ "--no-version-check",
+ ]
+ elif command_name == "source":
+ assert command_operator.command == [
+ "dbt",
+ command_name,
+ "freshness",
+ "--vars",
+ "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
+ "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
+ "--no-version-check",
+ ]
+ else:
+ assert command_operator.command == [
+ "dbt",
+ command_name,
+ "some-macro",
+ "--vars",
+ "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
+ "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
+ "--no-version-check",
+ ]
+
+
+def test_dbt_aes_ecs_overrides_parameter():
+ """
+ Check whether overrides parameter passed on to EcsRunTaskOperator is built correctly.
+ """
+
+ run_operator = DbtRunAwsEcsOperator(**base_kwargs)
+ run_operator.build_command(context=MagicMock(), cmd_flags=MagicMock())
+
+ actual_overrides = run_operator.overrides
+
+ assert "containerOverrides" in actual_overrides
+ actual_container_overrides = actual_overrides["containerOverrides"][0]
+ assert isinstance(actual_container_overrides["command"], list), "`command` should be of type list"
+
+ assert "environment" in actual_container_overrides
+ actual_env = actual_container_overrides["environment"]
+
+ expected_env_vars = [{"name": "FOO", "value": "BAR"}, {"name": "OTHER_FOO", "value": "OTHER_BAR"}]
+
+ for expected_env_var in expected_env_vars:
+ assert expected_env_var in actual_env
+
+
+@patch("cosmos.operators.aws_ecs.EcsRunTaskOperator.execute")
+def test_dbt_aws_ecs_build_and_run_cmd(mock_execute):
+ """
+ Check that building methods run correctly.
+ """
+
+ dbt_base_operator = ConcreteDbtAwsEcsOperator(
+ task_id="my-task",
+ aws_conn_id="my-aws-conn-id",
+ cluster="my-ecs-cluster",
+ task_definition="my-dbt-task-definition",
+ container_name="my-dbt-container-name",
+ project_dir="my/dir",
+ environment_variables={"FOO": "BAR"},
+ )
+ mock_build_command = MagicMock()
+ dbt_base_operator.build_command = mock_build_command
+
+ mock_context = MagicMock()
+ dbt_base_operator.build_and_run_cmd(context=mock_context)
+
+ mock_build_command.assert_called_with(mock_context, None)
+ mock_execute.assert_called_once_with(dbt_base_operator, mock_context)