From 015f81995db9ab7dafbcca54fb4489a02adddd19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Tue, 24 Dec 2024 16:26:19 +0300 Subject: [PATCH] [DOP-22750] update extractor condition --- data_rentgen/consumer/extractors/run.py | 21 ++- .../test_extractors/test_extractors_run.py | 127 ++++++++++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/data_rentgen/consumer/extractors/run.py b/data_rentgen/consumer/extractors/run.py index 9ecab38b..a61e8fe7 100644 --- a/data_rentgen/consumer/extractors/run.py +++ b/data_rentgen/consumer/extractors/run.py @@ -187,12 +187,21 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: # and also it does not mean that this exact user started this run. # Airflow using different facets for version above provider-opelineage/1.11.0. airflow_application_details = event.run.facets.airflow - if airflow_application_details and airflow_application_details.dag.owner != "airflow": - if airflow_application_details.dag.owner is not None: - run.user = UserDTO(name=airflow_application_details.dag.owner) + if airflow_application_details and all( + ( + airflow_application_details.dag.owner is not None, + airflow_application_details.dag.owner != "airflow", + ), + ): + run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type] + airflow_application_dag_details = event.run.facets.airflowDagRun - if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow": - if airflow_application_dag_details.dag.owner is not None: - run.user = UserDTO(name=airflow_application_dag_details.dag.owner) + if airflow_application_dag_details and all( + ( + airflow_application_dag_details.dag.owner is not None, + airflow_application_dag_details.dag.owner != "airflow", + ), + ): + run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type] return run diff --git a/tests/test_consumer/test_extractors/test_extractors_run.py b/tests/test_consumer/test_extractors/test_extractors_run.py index 6b4fdadc..4cd922a0 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run.py +++ b/tests/test_consumer/test_extractors/test_extractors_run.py @@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x(): ) +def test_extractors_extract_run_airflow_dag_check_with_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.DAG, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.1.4"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflowDagRun=OpenLineageAirflowDagRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_DAG, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO(name="myuser"), + ended_at=now, + external_id="manual__2024-07-05T09:04:13:979349+00:00", + attempt=None, + persistent_log_url=( + "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_task_with_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.TASK, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_TASK, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=UserDTO(name="myuser"), + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + def test_extractors_extract_run_airflow_dag_without_owner(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")