diff --git a/PRIVACY_NOTICE.rst b/PRIVACY_NOTICE.rst
new file mode 100644
index 000000000..7477ee795
--- /dev/null
+++ b/PRIVACY_NOTICE.rst
@@ -0,0 +1,41 @@
+Privacy Notice
+==============
+
+This project follows the `Privacy Policy of Astronomer `_.
+
+Collection of Data
+------------------
+
+Astronomer Cosmos integrates `Scarf `_ to collect basic telemetry data during operation.
+This data assists the project maintainers in better understanding how Cosmos is used.
+Insights gained from this telemetry are critical for prioritizing patches, minor releases, and
+security fixes. Additionally, this information supports key decisions related to the development road map.
+
+Deployments and individual users can opt-out of analytics by setting the configuration:
+
+
+.. code-block::
+
+ [cosmos] enable_telemetry False
+
+
+As described in the `official documentation `_, it is also possible to opt out by setting one of the following environment variables:
+
+.. code-block::
+
+ DO_NOT_TRACK=True
+ SCARF_NO_ANALYTICS=True
+
+
+In addition to Scarf's default data collection, Cosmos collect the following information when running Cosmos-powered DAGs:
+
+- Cosmos version
+- Airflow version
+- Python version
+- Operating system & machine architecture
+- Event type
+- The DAG hash
+- Total tasks
+- Total Cosmos tasks
+
+No user-identifiable information (IP included) is stored in Scarf.
diff --git a/README.rst b/README.rst
index 7eb32bcac..e35b8a913 100644
--- a/README.rst
+++ b/README.rst
@@ -82,7 +82,9 @@ _______
Privacy Notice
______________
-This project follows `Astronomer's Privacy Policy `_
+The application and this website collect telemetry to support the project's development. These can be disabled by the end-users.
+
+Read the `Privacy Notice `_ to learn more about it.
.. Tracking pixel for Scarf
diff --git a/cosmos/constants.py b/cosmos/constants.py
index 8378e8d10..0513d50d2 100644
--- a/cosmos/constants.py
+++ b/cosmos/constants.py
@@ -160,3 +160,7 @@ def _missing_value_(cls, value): # type: ignore
TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED}
DBT_COMPILE_TASK_ID = "dbt_compile"
+
+TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}/{cosmos_task_count}"
+TELEMETRY_VERSION = "v1"
+TELEMETRY_TIMEOUT = 1.0
diff --git a/cosmos/listeners/__init__.py b/cosmos/listeners/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py
new file mode 100644
index 000000000..0314c3474
--- /dev/null
+++ b/cosmos/listeners/dag_run_listener.py
@@ -0,0 +1,84 @@
+from __future__ import annotations
+
+from airflow.listeners import hookimpl
+from airflow.models.dag import DAG
+from airflow.models.dagrun import DagRun
+
+from cosmos import telemetry
+from cosmos.log import get_logger
+
+logger = get_logger(__name__)
+
+
+class EventStatus:
+ SUCCESS = "success"
+ FAILED = "failed"
+
+
+DAG_RUN = "dag_run"
+
+
+def total_cosmos_tasks(dag: DAG) -> int:
+ """
+ Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`.
+
+ The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and
+ works.
+ """
+ cosmos_tasks = 0
+ for task in dag.task_dict.values():
+ # In a real Airflow deployment, the following `task` is an instance of
+ # `airflow.serialization.serialized_objects.SerializedBaseOperator`
+ # and the only reference to Cosmos is in the _task_module.
+ # It is suboptimal, but works as of Airflow 2.10
+ task_module = getattr(task, "_task_module", None) or task.__class__.__module__
+ if task_module.startswith("cosmos."):
+ cosmos_tasks += 1
+ return cosmos_tasks
+
+
+# @provide_session
+@hookimpl
+def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
+ logger.debug("Running on_dag_run_success")
+ # In a real Airflow deployment, the following `serialized_dag` is an instance of
+ # `airflow.serialization.serialized_objects.SerializedDAG`
+ # and it is not a subclass of DbtDag, nor contain any references to Cosmos
+ serialized_dag = dag_run.get_dag()
+
+ if not total_cosmos_tasks(serialized_dag):
+ logger.debug("The DAG does not use Cosmos")
+ return
+
+ additional_telemetry_metrics = {
+ "dag_hash": dag_run.dag_hash,
+ "status": EventStatus.SUCCESS,
+ "task_count": len(serialized_dag.task_ids),
+ "cosmos_task_count": total_cosmos_tasks(serialized_dag),
+ }
+
+ telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
+ logger.debug("Completed on_dag_run_success")
+
+
+@hookimpl
+def on_dag_run_failed(dag_run: DagRun, msg: str) -> None:
+ logger.debug("Running on_dag_run_failed")
+ # In a real Airflow deployment, the following `serialized_dag` is an instance of
+ # `airflow.serialization.serialized_objects.SerializedDAG`
+ # and it is not a subclass of DbtDag, nor contain any references to Cosmos
+ serialized_dag = dag_run.get_dag()
+
+ if not total_cosmos_tasks(serialized_dag):
+ logger.debug("The DAG does not use Cosmos")
+ return
+
+ additional_telemetry_metrics = {
+ "dag_hash": dag_run.dag_hash,
+ "status": EventStatus.FAILED,
+ "task_count": len(serialized_dag.task_ids),
+ "cosmos_task_count": total_cosmos_tasks(serialized_dag),
+ }
+
+ telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
+ logger.debug("Completed on_dag_run_failed")
diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py
index 5997a5fe3..4bbea4fa2 100644
--- a/cosmos/plugin/__init__.py
+++ b/cosmos/plugin/__init__.py
@@ -10,6 +10,7 @@
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose
+from cosmos.listeners import dag_run_listener
from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud
if in_astro_cloud:
@@ -269,3 +270,4 @@ class CosmosPlugin(AirflowPlugin):
"href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs",
}
appbuilder_views = [item]
+ listeners = [dag_run_listener]
diff --git a/cosmos/settings.py b/cosmos/settings.py
index 5b24321c8..ba9da106a 100644
--- a/cosmos/settings.py
+++ b/cosmos/settings.py
@@ -37,12 +37,28 @@
remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None)
remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None)
+AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")
+
+# The following environment variable is populated in Astro Cloud
+in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"
+
try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
-AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")
-# The following environment variable is populated in Astro Cloud
-in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"
+def convert_to_boolean(value: str | None) -> bool:
+ """
+ Convert a string that represents a boolean to a Python boolean.
+ """
+ value = str(value).lower().strip()
+ if value in ("f", "false", "0", "", "none"):
+ return False
+ return True
+
+
+# Telemetry-related settings
+enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True)
+do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK"))
+no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS"))
diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py
new file mode 100644
index 000000000..0e267b28b
--- /dev/null
+++ b/cosmos/telemetry.py
@@ -0,0 +1,77 @@
+from __future__ import annotations
+
+import platform
+from urllib import parse
+from urllib.parse import urlencode
+
+import httpx
+from airflow import __version__ as airflow_version
+
+import cosmos
+from cosmos import constants, settings
+from cosmos.log import get_logger
+
+logger = get_logger(__name__)
+
+
+def should_emit() -> bool:
+ """
+ Identify if telemetry metrics should be emitted or not.
+ """
+ return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics
+
+
+def collect_standard_usage_metrics() -> dict[str, object]:
+ """
+ Return standard telemetry metrics.
+ """
+ metrics = {
+ "cosmos_version": cosmos.__version__, # type: ignore[attr-defined]
+ "airflow_version": parse.quote(airflow_version),
+ "python_version": platform.python_version(),
+ "platform_system": platform.system(),
+ "platform_machine": platform.machine(),
+ "variables": {},
+ }
+ return metrics
+
+
+def emit_usage_metrics(metrics: dict[str, object]) -> bool:
+ """
+ Emit desired telemetry metrics to remote telemetry endpoint.
+
+ The metrics must contain the necessary fields to build the TELEMETRY_URL.
+ """
+ query_string = urlencode(metrics)
+ telemetry_url = constants.TELEMETRY_URL.format(
+ **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
+ )
+ logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
+ response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
+ if not response.is_success:
+ logger.warning(
+ "Unable to emit usage metrics to %s. Status code: %s. Message: %s",
+ telemetry_url,
+ response.status_code,
+ response.text,
+ )
+ return response.is_success
+
+
+def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool:
+ """
+ Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics
+ and emit them to remote telemetry endpoint.
+
+ :returns: If the event was successfully sent to the telemetry backend or not.
+ """
+ if should_emit():
+ metrics = collect_standard_usage_metrics()
+ metrics["event_type"] = event_type
+ metrics["variables"].update(additional_metrics) # type: ignore[attr-defined]
+ metrics.update(additional_metrics)
+ is_success = emit_usage_metrics(metrics)
+ return is_success
+ else:
+ logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.")
+ return False
diff --git a/docs/index.rst b/docs/index.rst
index e788bd04c..7a56b8df7 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -137,10 +137,14 @@ _______
`Apache License 2.0 `_
+
Privacy Notice
______________
-This project follows `Astronomer's Privacy Policy `_
+The application and this website collect telemetry to support the project's development. These can be disabled by the end-users.
+
+Read the `Privacy Notice `_ to learn more about it.
+
.. Tracking pixel for Scarf
.. raw:: html
diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py
new file mode 100644
index 000000000..a547f20ad
--- /dev/null
+++ b/tests/listeners/test_dag_run_listener.py
@@ -0,0 +1,127 @@
+import logging
+import uuid
+from datetime import datetime
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+from airflow.models import DAG
+from airflow.utils.state import State
+
+from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig
+from cosmos.airflow.dag import DbtDag
+from cosmos.airflow.task_group import DbtTaskGroup
+from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks
+from cosmos.profiles import PostgresUserPasswordProfileMapping
+
+DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt"
+DBT_PROJECT_NAME = "jaffle_shop"
+
+profile_config = ProfileConfig(
+ profile_name="default",
+ target_name="dev",
+ profile_mapping=PostgresUserPasswordProfileMapping(
+ conn_id="example_conn",
+ profile_args={"schema": "public"},
+ disable_event_tracking=True,
+ ),
+)
+
+
+@pytest.mark.integration
+def test_is_cosmos_dag_is_true():
+ dag = DbtDag(
+ project_config=ProjectConfig(
+ DBT_ROOT_PATH / "jaffle_shop",
+ ),
+ profile_config=profile_config,
+ start_date=datetime(2023, 1, 1),
+ dag_id="basic_cosmos_dag",
+ )
+ assert total_cosmos_tasks(dag) == 13
+
+
+@pytest.mark.integration
+def test_total_cosmos_tasks_in_task_group():
+ with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag:
+ _ = DbtTaskGroup(
+ project_config=ProjectConfig(
+ DBT_ROOT_PATH / "jaffle_shop",
+ ),
+ profile_config=profile_config,
+ )
+
+ assert total_cosmos_tasks(dag) == 13
+
+
+def test_total_cosmos_tasks_is_one():
+
+ with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
+ run_operator = DbtRunLocalOperator(
+ profile_config=profile_config,
+ project_dir=DBT_ROOT_PATH / "jaffle_shop",
+ task_id="run",
+ install_deps=True,
+ append_env=True,
+ )
+ run_operator
+
+ assert total_cosmos_tasks(dag) == 1
+
+
+def test_not_cosmos_dag():
+
+ with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
+ pass
+
+ assert total_cosmos_tasks(dag) == 0
+
+
+@pytest.mark.integration
+@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled")
+def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog):
+ caplog.set_level(logging.DEBUG)
+
+ dag = DbtDag(
+ project_config=ProjectConfig(
+ DBT_ROOT_PATH / "jaffle_shop",
+ ),
+ profile_config=profile_config,
+ start_date=datetime(2023, 1, 1),
+ dag_id="basic_cosmos_dag",
+ )
+ run_id = str(uuid.uuid1())
+ dag_run = dag.create_dagrun(
+ state=State.NONE,
+ run_id=run_id,
+ )
+
+ on_dag_run_success(dag_run, msg="test success")
+ assert "Running on_dag_run_success" in caplog.text
+ assert "Completed on_dag_run_success" in caplog.text
+ assert mock_emit_usage_metrics_if_enabled.call_count == 1
+
+
+@pytest.mark.integration
+@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled")
+def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog):
+ caplog.set_level(logging.DEBUG)
+
+ dag = DbtDag(
+ project_config=ProjectConfig(
+ DBT_ROOT_PATH / "jaffle_shop",
+ ),
+ profile_config=profile_config,
+ start_date=datetime(2023, 1, 1),
+ dag_id="basic_cosmos_dag",
+ )
+ run_id = str(uuid.uuid1())
+ dag_run = dag.create_dagrun(
+ state=State.FAILED,
+ run_id=run_id,
+ )
+
+ on_dag_run_failed(dag_run, msg="test failed")
+ assert "Running on_dag_run_failed" in caplog.text
+ assert "Completed on_dag_run_failed" in caplog.text
+ assert mock_emit_usage_metrics_if_enabled.call_count == 1
diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py
new file mode 100644
index 000000000..b11caabe1
--- /dev/null
+++ b/tests/test_telemetry.py
@@ -0,0 +1,115 @@
+import logging
+from unittest.mock import patch
+
+import pytest
+
+from cosmos import telemetry
+
+
+def test_should_emit_is_true_by_default():
+ assert telemetry.should_emit()
+
+
+@patch("cosmos.settings.enable_telemetry", True)
+def test_should_emit_is_true_when_only_enable_telemetry_is_true():
+ assert telemetry.should_emit()
+
+
+@patch("cosmos.settings.do_not_track", True)
+def test_should_emit_is_false_when_do_not_track():
+ assert not telemetry.should_emit()
+
+
+@patch("cosmos.settings.no_analytics", True)
+def test_should_emit_is_false_when_no_analytics():
+ assert not telemetry.should_emit()
+
+
+def test_collect_standard_usage_metrics():
+ metrics = telemetry.collect_standard_usage_metrics()
+ expected_keys = [
+ "airflow_version",
+ "cosmos_version",
+ "platform_machine",
+ "platform_system",
+ "python_version",
+ "variables",
+ ]
+ assert sorted(metrics.keys()) == expected_keys
+
+
+class MockFailedResponse:
+ is_success = False
+ status_code = "404"
+ text = "Non existent URL"
+
+
+@patch("cosmos.telemetry.httpx.get", return_value=MockFailedResponse())
+def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
+ sample_metrics = {
+ "cosmos_version": "1.8.0a4",
+ "airflow_version": "2.10.1",
+ "python_version": "3.11",
+ "platform_system": "darwin",
+ "platform_machine": "amd64",
+ "event_type": "dag_run",
+ "status": "success",
+ "dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
+ "task_count": 3,
+ "cosmos_task_count": 3,
+ }
+ is_success = telemetry.emit_usage_metrics(sample_metrics)
+ mock_httpx_get.assert_called_once_with(
+ f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3""",
+ timeout=1.0,
+ follow_redirects=True,
+ )
+ assert not is_success
+ log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3. Status code: 404. Message: Non existent URL"""
+ assert caplog.text.startswith("WARNING")
+ assert log_msg in caplog.text
+
+
+@pytest.mark.integration
+def test_emit_usage_metrics_succeeds(caplog):
+ caplog.set_level(logging.DEBUG)
+ sample_metrics = {
+ "cosmos_version": "1.8.0a4",
+ "airflow_version": "2.10.1",
+ "python_version": "3.11",
+ "platform_system": "darwin",
+ "platform_machine": "amd64",
+ "event_type": "dag_run",
+ "status": "success",
+ "dag_hash": "dag-hash-ci",
+ "task_count": 33,
+ "cosmos_task_count": 33,
+ }
+ is_success = telemetry.emit_usage_metrics(sample_metrics)
+ assert is_success
+ assert caplog.text.startswith("DEBUG")
+ assert "Telemetry is enabled. Emitting the following usage metrics to" in caplog.text
+
+
+@patch("cosmos.telemetry.should_emit", return_value=False)
+def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog):
+ caplog.set_level(logging.DEBUG)
+ assert not telemetry.emit_usage_metrics_if_enabled("any", {})
+ assert caplog.text.startswith("DEBUG")
+ assert "Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True." in caplog.text
+
+
+@patch("cosmos.telemetry.should_emit", return_value=True)
+@patch("cosmos.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "k2": "v2", "variables": {}})
+@patch("cosmos.telemetry.emit_usage_metrics")
+def test_emit_usage_metrics_if_enabled_succeeds(
+ mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit
+):
+ assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"})
+ mock_emit_usage_metrics.assert_called_once()
+ assert mock_emit_usage_metrics.call_args.args[0] == {
+ "k1": "v1",
+ "k2": "v2",
+ "event_type": "any",
+ "variables": {"k2": "v2"},
+ }