Skip to content

Commit

Permalink
[DOP-22750] update extractor condition
Browse files Browse the repository at this point in the history
  • Loading branch information
TiGrib committed Dec 24, 2024
1 parent 87c3ee3 commit 015f819
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 6 deletions.
21 changes: 15 additions & 6 deletions data_rentgen/consumer/extractors/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,21 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
# and also it does not mean that this exact user started this run.
# Airflow using different facets for version above provider-opelineage/1.11.0.
airflow_application_details = event.run.facets.airflow
if airflow_application_details and airflow_application_details.dag.owner != "airflow":
if airflow_application_details.dag.owner is not None:
run.user = UserDTO(name=airflow_application_details.dag.owner)
if airflow_application_details and all(
(
airflow_application_details.dag.owner is not None,
airflow_application_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]

airflow_application_dag_details = event.run.facets.airflowDagRun
if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow":
if airflow_application_dag_details.dag.owner is not None:
run.user = UserDTO(name=airflow_application_dag_details.dag.owner)
if airflow_application_dag_details and all(
(
airflow_application_dag_details.dag.owner is not None,
airflow_application_dag_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]

return run
127 changes: 127 additions & 0 deletions tests/test_consumer/test_extractors/test_extractors_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x():
)


def test_extractors_extract_run_airflow_dag_check_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
Expand Down

0 comments on commit 015f819

Please sign in to comment.