From 19f21627cf42b2b4008d30a54dc6d2f6f888f74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Tue, 24 Dec 2024 10:57:57 +0300 Subject: [PATCH 1/5] [DOP-22750] Add owner parsing for airflow run events --- data_rentgen/consumer/extractors/run.py | 9 +++++++++ .../consumer/openlineage/run_facets/airflow.py | 1 + .../test_extractors/test_extractors_batch_airflow.py | 4 ++-- .../test_extractors/test_extractors_run.py | 10 +++++----- .../test_handlers/test_runs_handler_airflow.py | 6 +++++- .../test_openlineage/test_run_event_airflow.py | 2 ++ 6 files changed, 24 insertions(+), 8 deletions(-) diff --git a/data_rentgen/consumer/extractors/run.py b/data_rentgen/consumer/extractors/run.py index cd76ed6d..e67d707f 100644 --- a/data_rentgen/consumer/extractors/run.py +++ b/data_rentgen/consumer/extractors/run.py @@ -182,6 +182,15 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: spark_application_details = event.run.facets.spark_applicationDetails if spark_application_details: run.user = UserDTO(name=spark_application_details.userName) + # Airflow DAG and task have 'owner' field, but if can be either user or group name, # and also it does not mean that this exact user started this run. + # Airflow using different facets for version above provider-opelineage/1.11.0. + airflow_application_details = event.run.facets.airflow + if airflow_application_details and airflow_application_details.dag.owner != "airflow": + run.user = UserDTO(name=airflow_application_details.dag.owner) + airflow_application_dag_details = event.run.facets.airflowDagRun + if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow": + run.user = UserDTO(name=airflow_application_dag_details.dag.owner) + return run diff --git a/data_rentgen/consumer/openlineage/run_facets/airflow.py b/data_rentgen/consumer/openlineage/run_facets/airflow.py index d56405b3..d89b7810 100644 --- a/data_rentgen/consumer/openlineage/run_facets/airflow.py +++ b/data_rentgen/consumer/openlineage/run_facets/airflow.py @@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase): """ dag_id: str + owner: str class OpenLineageAirflowDagRunType(Enum): diff --git a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py index aa9d1434..76c4926c 100644 --- a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py @@ -66,7 +66,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent: openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -137,7 +137,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent: openlineageAdapterVersion=Version("1.10.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, diff --git a/tests/test_consumer/test_extractors/test_extractors_run.py b/tests/test_consumer/test_extractors/test_extractors_run.py index 136637bc..2edb2c57 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run.py +++ b/tests/test_consumer/test_extractors/test_extractors_run.py @@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus(): openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x(): openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url(): openlineageAdapterVersion=Version("1.10.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus(): openlineageAdapterVersion=Version("1.9.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="backfill__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB, @@ -436,7 +436,7 @@ def test_extractors_extract_run_airflow_task_2_x(): runId=run_id, facets=OpenLineageRunFacets( airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="scheduled__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.SCHEDULED, diff --git a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py index 10bd9bca..fd288d6e 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py @@ -17,6 +17,7 @@ Run, RunStartReason, RunStatus, + User, ) RESOURCES_PATH = Path(__file__).parent.parent.joinpath("resources").resolve() @@ -97,7 +98,10 @@ async def test_runs_handler_airflow( assert task_run.status == RunStatus.SUCCEEDED assert task_run.started_at == datetime(2024, 7, 5, 9, 4, 20, 783845, tzinfo=timezone.utc) assert task_run.ended_at == datetime(2024, 7, 5, 9, 7, 37, 858423, tzinfo=timezone.utc) - assert task_run.started_by_user_id is None + user_query = select(User).where(User.name == "myuser") + user_scalars = await async_session.scalars(user_query) + user = user_scalars.one_or_none() + assert task_run.started_by_user_id == user.id assert task_run.start_reason == RunStartReason.MANUAL assert task_run.end_reason is None assert task_run.external_id == "manual__2024-07-05T09:04:12.162809+00:00" diff --git a/tests/test_consumer/test_openlineage/test_run_event_airflow.py b/tests/test_consumer/test_openlineage/test_run_event_airflow.py index 70723263..ee2ec84a 100644 --- a/tests/test_consumer/test_openlineage/test_run_event_airflow.py +++ b/tests/test_consumer/test_openlineage/test_run_event_airflow.py @@ -150,6 +150,7 @@ def test_run_event_airflow_dag_start(): airflowDagRun=OpenLineageAirflowDagRunFacet( dag=OpenLineageAirflowDagInfo( dag_id="mydag", + owner="myuser", ), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:12.162809+00:00", @@ -400,6 +401,7 @@ def test_run_event_airflow_task_start(): airflow=OpenLineageAirflowTaskRunFacet( dag=OpenLineageAirflowDagInfo( dag_id="mydag", + owner="myuser", ), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:12.162809+00:00", From 67133f7b475bb02e221863fd7a1f790f249b0c45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Tue, 24 Dec 2024 11:27:01 +0300 Subject: [PATCH 2/5] [DOP-22750] Update tests --- .../test_extractors_batch_airflow.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py index 76c4926c..bcaa8d9c 100644 --- a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py @@ -36,6 +36,7 @@ RunDTO, RunStartReasonDTO, RunStatusDTO, + UserDTO, ) @@ -66,7 +67,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent: openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -137,7 +138,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent: openlineageAdapterVersion=Version("1.10.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -231,6 +232,10 @@ def extracted_airflow_dag_run( status=RunStatusDTO.SUCCEEDED, started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO( + name="myuser", + id=None, + ), ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc), external_id="manual__2024-07-05T09:04:13:979349+00:00", persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00", @@ -247,6 +252,10 @@ def extracted_airflow_task_run( status=RunStatusDTO.SUCCEEDED, started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO( + name="myuser", + id=None, + ), ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), external_id="manual__2024-07-05T09:04:13:979349+00:00", attempt="1", @@ -300,7 +309,6 @@ def test_extractors_extract_batch_airflow( assert not extracted.datasets() assert not extracted.dataset_symlinks() - assert not extracted.users() assert not extracted.schemas() assert not extracted.operations() assert not extracted.inputs() From 1e548cca7f2ac57c7bfb4aeb6698700ffdd02f88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Tue, 24 Dec 2024 12:50:57 +0300 Subject: [PATCH 3/5] [DOP-22750] add unit tests for 'airflow' owner --- .../test_extractors_batch_airflow.py | 175 ++++++++++++++++++ .../test_runs_handler_airflow.py | 8 +- 2 files changed, 180 insertions(+), 3 deletions(-) diff --git a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py index bcaa8d9c..508d2bb1 100644 --- a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py @@ -80,6 +80,46 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent: ) +@pytest.fixture +def airflow_dag_run_event_start_owner_airflow() -> OpenLineageRunEvent: + event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839") + return OpenLineageRunEvent( + eventType=OpenLineageRunEventType.START, + eventTime=event_time, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.DAG, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.9.2"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflowDagRun=OpenLineageAirflowDagRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + ), + ), + ), + ) + + @pytest.fixture def airflow_dag_run_event_stop() -> OpenLineageRunEvent: event_time = datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc) @@ -160,6 +200,55 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent: ) +@pytest.fixture +def airflow_task_run_event_start_owner_airflow() -> OpenLineageRunEvent: + event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0782-7fc0-9d69-b1df9dac2c60") + return OpenLineageRunEvent( + eventType=OpenLineageRunEventType.START, + eventTime=event_time, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.TASK, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.9.2"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + log_url=( + "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask" + ), + ), + ), + ), + ), + ) + + @pytest.fixture def airflow_task_run_event_stop() -> OpenLineageRunEvent: event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) @@ -222,6 +311,41 @@ def extracted_airflow_task_job( ) +@pytest.fixture +def extracted_airflow_dag_run_owner_airflow( + extracted_airflow_dag_job: JobDTO, +) -> RunDTO: + return RunDTO( + id=UUID("01908223-0782-79b8-9495-b1c38aaee839"), + job=extracted_airflow_dag_job, + status=RunStatusDTO.SUCCEEDED, + started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + start_reason=RunStartReasonDTO.MANUAL, + ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc), + external_id="manual__2024-07-05T09:04:13:979349+00:00", + persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00", + ) + + +@pytest.fixture +def extracted_airflow_task_run_owner_airflow( + extracted_airflow_task_job: JobDTO, +) -> RunDTO: + return RunDTO( + id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"), + job=extracted_airflow_task_job, + status=RunStatusDTO.SUCCEEDED, + started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + start_reason=RunStartReasonDTO.MANUAL, + ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + external_id="manual__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask" + ), + ) + + @pytest.fixture def extracted_airflow_dag_run( extracted_airflow_dag_job: JobDTO, @@ -313,3 +437,54 @@ def test_extractors_extract_batch_airflow( assert not extracted.operations() assert not extracted.inputs() assert not extracted.outputs() + + +@pytest.mark.parametrize( + "input_transformation", + [ + # receiving data out of order does not change result + pytest.param( + list, + id="preserve order", + ), + pytest.param( + reversed, + id="reverse order", + ), + ], +) +def test_extractors_extract_batch_airflow_with_airflow_owner( + airflow_dag_run_event_start_owner_airflow: OpenLineageRunEvent, + airflow_dag_run_event_stop: OpenLineageRunEvent, + airflow_task_run_event_start_owner_airflow: OpenLineageRunEvent, + airflow_task_run_event_stop: OpenLineageRunEvent, + extracted_airflow_location: LocationDTO, + extracted_airflow_dag_job: JobDTO, + extracted_airflow_task_job: JobDTO, + extracted_airflow_dag_run_owner_airflow: RunDTO, + extracted_airflow_task_run_owner_airflow: RunDTO, + input_transformation, +): + events = [ + airflow_dag_run_event_start_owner_airflow, + airflow_task_run_event_start_owner_airflow, + airflow_task_run_event_stop, + airflow_dag_run_event_stop, + ] + + extracted = extract_batch(input_transformation(events)) + + assert extracted.locations() == [extracted_airflow_location] + assert extracted.jobs() == [extracted_airflow_dag_job, extracted_airflow_task_job] + assert extracted.runs() == [ + extracted_airflow_dag_run_owner_airflow, + extracted_airflow_task_run_owner_airflow, + ] + + assert not extracted.datasets() + assert not extracted.dataset_symlinks() + assert not extracted.schemas() + assert not extracted.users() + assert not extracted.operations() + assert not extracted.inputs() + assert not extracted.outputs() diff --git a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py index fd288d6e..1a9215e8 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py @@ -77,6 +77,10 @@ async def test_runs_handler_airflow( runs = run_scalars.all() assert len(runs) == 2 + user_query = select(User).where(User.name == "myuser") + user_scalars = await async_session.scalars(user_query) + user = user_scalars.one_or_none() + dag_run = runs[0] assert dag_run.id == UUID("01908223-0782-79b8-9495-b1c38aaee839") assert dag_run.created_at == datetime(2024, 7, 5, 9, 4, 12, 162000, tzinfo=timezone.utc) @@ -98,9 +102,7 @@ async def test_runs_handler_airflow( assert task_run.status == RunStatus.SUCCEEDED assert task_run.started_at == datetime(2024, 7, 5, 9, 4, 20, 783845, tzinfo=timezone.utc) assert task_run.ended_at == datetime(2024, 7, 5, 9, 7, 37, 858423, tzinfo=timezone.utc) - user_query = select(User).where(User.name == "myuser") - user_scalars = await async_session.scalars(user_query) - user = user_scalars.one_or_none() + assert task_run.started_by_user_id is not None assert task_run.started_by_user_id == user.id assert task_run.start_reason == RunStartReason.MANUAL assert task_run.end_reason is None From 87c3ee3e8aa609179b7a80226bd11ccc69609346 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Tue, 24 Dec 2024 14:19:04 +0300 Subject: [PATCH 4/5] [DOP-22750] make owner optional, add test for airflow dag/task without owner --- data_rentgen/consumer/extractors/run.py | 6 +- .../openlineage/run_facets/airflow.py | 2 +- .../test_extractors_batch_airflow.py | 175 ------------------ .../test_extractors/test_extractors_run.py | 127 +++++++++++++ 4 files changed, 132 insertions(+), 178 deletions(-) diff --git a/data_rentgen/consumer/extractors/run.py b/data_rentgen/consumer/extractors/run.py index e67d707f..9ecab38b 100644 --- a/data_rentgen/consumer/extractors/run.py +++ b/data_rentgen/consumer/extractors/run.py @@ -188,9 +188,11 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: # Airflow using different facets for version above provider-opelineage/1.11.0. airflow_application_details = event.run.facets.airflow if airflow_application_details and airflow_application_details.dag.owner != "airflow": - run.user = UserDTO(name=airflow_application_details.dag.owner) + if airflow_application_details.dag.owner is not None: + run.user = UserDTO(name=airflow_application_details.dag.owner) airflow_application_dag_details = event.run.facets.airflowDagRun if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow": - run.user = UserDTO(name=airflow_application_dag_details.dag.owner) + if airflow_application_dag_details.dag.owner is not None: + run.user = UserDTO(name=airflow_application_dag_details.dag.owner) return run diff --git a/data_rentgen/consumer/openlineage/run_facets/airflow.py b/data_rentgen/consumer/openlineage/run_facets/airflow.py index d89b7810..79860aa2 100644 --- a/data_rentgen/consumer/openlineage/run_facets/airflow.py +++ b/data_rentgen/consumer/openlineage/run_facets/airflow.py @@ -14,7 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase): """ dag_id: str - owner: str + owner: str | None = None class OpenLineageAirflowDagRunType(Enum): diff --git a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py index 508d2bb1..bcaa8d9c 100644 --- a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py @@ -80,46 +80,6 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent: ) -@pytest.fixture -def airflow_dag_run_event_start_owner_airflow() -> OpenLineageRunEvent: - event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839") - return OpenLineageRunEvent( - eventType=OpenLineageRunEventType.START, - eventTime=event_time, - job=OpenLineageJob( - namespace="http://airflow-host:8081", - name="mydag", - facets=OpenLineageJobFacets( - jobType=OpenLineageJobTypeJobFacet( - processingType=None, - integration=OpenLineageJobIntegrationType.AIRFLOW, - jobType=OpenLineageJobType.DAG, - ), - ), - ), - run=OpenLineageRun( - runId=run_id, - facets=OpenLineageRunFacets( - processing_engine=OpenLineageProcessingEngineRunFacet( - version=Version("2.9.2"), - name=OpenLineageProcessingEngineName.AIRFLOW, - openlineageAdapterVersion=Version("1.10.0"), - ), - airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), - dagRun=OpenLineageAirflowDagRunInfo( - run_id="manual__2024-07-05T09:04:13:979349+00:00", - run_type=OpenLineageAirflowDagRunType.MANUAL, - data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - ), - ), - ), - ), - ) - - @pytest.fixture def airflow_dag_run_event_stop() -> OpenLineageRunEvent: event_time = datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc) @@ -200,55 +160,6 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent: ) -@pytest.fixture -def airflow_task_run_event_start_owner_airflow() -> OpenLineageRunEvent: - event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) - run_id = UUID("01908223-0782-7fc0-9d69-b1df9dac2c60") - return OpenLineageRunEvent( - eventType=OpenLineageRunEventType.START, - eventTime=event_time, - job=OpenLineageJob( - namespace="http://airflow-host:8081", - name="mydag.mytask", - facets=OpenLineageJobFacets( - jobType=OpenLineageJobTypeJobFacet( - processingType=None, - integration=OpenLineageJobIntegrationType.AIRFLOW, - jobType=OpenLineageJobType.TASK, - ), - ), - ), - run=OpenLineageRun( - runId=run_id, - facets=OpenLineageRunFacets( - processing_engine=OpenLineageProcessingEngineRunFacet( - version=Version("2.9.2"), - name=OpenLineageProcessingEngineName.AIRFLOW, - openlineageAdapterVersion=Version("1.10.0"), - ), - airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), - dagRun=OpenLineageAirflowDagRunInfo( - run_id="manual__2024-07-05T09:04:13:979349+00:00", - run_type=OpenLineageAirflowDagRunType.MANUAL, - data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - ), - task=OpenLineageAirflowTaskInfo( - task_id="mytask", - ), - taskInstance=OpenLineageAirflowTaskInstanceInfo( - try_number=1, - log_url=( - "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask" - ), - ), - ), - ), - ), - ) - - @pytest.fixture def airflow_task_run_event_stop() -> OpenLineageRunEvent: event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) @@ -311,41 +222,6 @@ def extracted_airflow_task_job( ) -@pytest.fixture -def extracted_airflow_dag_run_owner_airflow( - extracted_airflow_dag_job: JobDTO, -) -> RunDTO: - return RunDTO( - id=UUID("01908223-0782-79b8-9495-b1c38aaee839"), - job=extracted_airflow_dag_job, - status=RunStatusDTO.SUCCEEDED, - started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - start_reason=RunStartReasonDTO.MANUAL, - ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc), - external_id="manual__2024-07-05T09:04:13:979349+00:00", - persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00", - ) - - -@pytest.fixture -def extracted_airflow_task_run_owner_airflow( - extracted_airflow_task_job: JobDTO, -) -> RunDTO: - return RunDTO( - id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"), - job=extracted_airflow_task_job, - status=RunStatusDTO.SUCCEEDED, - started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - start_reason=RunStartReasonDTO.MANUAL, - ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), - external_id="manual__2024-07-05T09:04:13:979349+00:00", - attempt="1", - persistent_log_url=( - "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask" - ), - ) - - @pytest.fixture def extracted_airflow_dag_run( extracted_airflow_dag_job: JobDTO, @@ -437,54 +313,3 @@ def test_extractors_extract_batch_airflow( assert not extracted.operations() assert not extracted.inputs() assert not extracted.outputs() - - -@pytest.mark.parametrize( - "input_transformation", - [ - # receiving data out of order does not change result - pytest.param( - list, - id="preserve order", - ), - pytest.param( - reversed, - id="reverse order", - ), - ], -) -def test_extractors_extract_batch_airflow_with_airflow_owner( - airflow_dag_run_event_start_owner_airflow: OpenLineageRunEvent, - airflow_dag_run_event_stop: OpenLineageRunEvent, - airflow_task_run_event_start_owner_airflow: OpenLineageRunEvent, - airflow_task_run_event_stop: OpenLineageRunEvent, - extracted_airflow_location: LocationDTO, - extracted_airflow_dag_job: JobDTO, - extracted_airflow_task_job: JobDTO, - extracted_airflow_dag_run_owner_airflow: RunDTO, - extracted_airflow_task_run_owner_airflow: RunDTO, - input_transformation, -): - events = [ - airflow_dag_run_event_start_owner_airflow, - airflow_task_run_event_start_owner_airflow, - airflow_task_run_event_stop, - airflow_dag_run_event_stop, - ] - - extracted = extract_batch(input_transformation(events)) - - assert extracted.locations() == [extracted_airflow_location] - assert extracted.jobs() == [extracted_airflow_dag_job, extracted_airflow_task_job] - assert extracted.runs() == [ - extracted_airflow_dag_run_owner_airflow, - extracted_airflow_task_run_owner_airflow, - ] - - assert not extracted.datasets() - assert not extracted.dataset_symlinks() - assert not extracted.schemas() - assert not extracted.users() - assert not extracted.operations() - assert not extracted.inputs() - assert not extracted.outputs() diff --git a/tests/test_consumer/test_extractors/test_extractors_run.py b/tests/test_consumer/test_extractors/test_extractors_run.py index 2edb2c57..6b4fdadc 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run.py +++ b/tests/test_consumer/test_extractors/test_extractors_run.py @@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x(): ) +def test_extractors_extract_run_airflow_dag_without_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.DAG, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.1.4"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflowDagRun=OpenLineageAirflowDagRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_DAG, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.MANUAL, + user=None, + ended_at=now, + external_id="manual__2024-07-05T09:04:13:979349+00:00", + attempt=None, + persistent_log_url=( + "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_task_without_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.TASK, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_TASK, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + @pytest.mark.parametrize( ["event_type", "expected_status"], [ From 015f81995db9ab7dafbcca54fb4489a02adddd19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Tue, 24 Dec 2024 16:26:19 +0300 Subject: [PATCH 5/5] [DOP-22750] update extractor condition --- data_rentgen/consumer/extractors/run.py | 21 ++- .../test_extractors/test_extractors_run.py | 127 ++++++++++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/data_rentgen/consumer/extractors/run.py b/data_rentgen/consumer/extractors/run.py index 9ecab38b..a61e8fe7 100644 --- a/data_rentgen/consumer/extractors/run.py +++ b/data_rentgen/consumer/extractors/run.py @@ -187,12 +187,21 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: # and also it does not mean that this exact user started this run. # Airflow using different facets for version above provider-opelineage/1.11.0. airflow_application_details = event.run.facets.airflow - if airflow_application_details and airflow_application_details.dag.owner != "airflow": - if airflow_application_details.dag.owner is not None: - run.user = UserDTO(name=airflow_application_details.dag.owner) + if airflow_application_details and all( + ( + airflow_application_details.dag.owner is not None, + airflow_application_details.dag.owner != "airflow", + ), + ): + run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type] + airflow_application_dag_details = event.run.facets.airflowDagRun - if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow": - if airflow_application_dag_details.dag.owner is not None: - run.user = UserDTO(name=airflow_application_dag_details.dag.owner) + if airflow_application_dag_details and all( + ( + airflow_application_dag_details.dag.owner is not None, + airflow_application_dag_details.dag.owner != "airflow", + ), + ): + run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type] return run diff --git a/tests/test_consumer/test_extractors/test_extractors_run.py b/tests/test_consumer/test_extractors/test_extractors_run.py index 6b4fdadc..4cd922a0 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run.py +++ b/tests/test_consumer/test_extractors/test_extractors_run.py @@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x(): ) +def test_extractors_extract_run_airflow_dag_check_with_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.DAG, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.1.4"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflowDagRun=OpenLineageAirflowDagRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_DAG, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO(name="myuser"), + ended_at=now, + external_id="manual__2024-07-05T09:04:13:979349+00:00", + attempt=None, + persistent_log_url=( + "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_task_with_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.TASK, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_TASK, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=UserDTO(name="myuser"), + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + def test_extractors_extract_run_airflow_dag_without_owner(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")