Skip to content

Commit

Permalink
[DOP-22750] Add owner parsing for airflow run events (#135)
Browse files Browse the repository at this point in the history
* [DOP-22750] Add owner parsing for airflow run events

* [DOP-22750] Update tests

* [DOP-22750] add unit tests for 'airflow' owner

* [DOP-22750] make owner optional, add test for airflow dag/task without owner

* [DOP-22750] update extractor condition
  • Loading branch information
TiGrib authored Dec 24, 2024
1 parent 4731716 commit b9d33ff
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 8 deletions.
20 changes: 20 additions & 0 deletions data_rentgen/consumer/extractors/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
spark_application_details = event.run.facets.spark_applicationDetails
if spark_application_details:
run.user = UserDTO(name=spark_application_details.userName)

# Airflow DAG and task have 'owner' field, but if can be either user or group name,
# and also it does not mean that this exact user started this run.
# Airflow using different facets for version above provider-opelineage/1.11.0.
airflow_application_details = event.run.facets.airflow
if airflow_application_details and all(
(
airflow_application_details.dag.owner is not None,
airflow_application_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]

airflow_application_dag_details = event.run.facets.airflowDagRun
if airflow_application_dag_details and all(
(
airflow_application_dag_details.dag.owner is not None,
airflow_application_dag_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]

return run
1 change: 1 addition & 0 deletions data_rentgen/consumer/openlineage/run_facets/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase):
"""

dag_id: str
owner: str | None = None


class OpenLineageAirflowDagRunType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
RunDTO,
RunStartReasonDTO,
RunStatusDTO,
UserDTO,
)


Expand Down Expand Up @@ -66,7 +67,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -137,7 +138,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
openlineageAdapterVersion=Version("1.10.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -231,6 +232,10 @@ def extracted_airflow_dag_run(
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
Expand All @@ -247,6 +252,10 @@ def extracted_airflow_task_run(
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt="1",
Expand Down Expand Up @@ -300,7 +309,6 @@ def test_extractors_extract_batch_airflow(

assert not extracted.datasets()
assert not extracted.dataset_symlinks()
assert not extracted.users()
assert not extracted.schemas()
assert not extracted.operations()
assert not extracted.inputs()
Expand Down
262 changes: 258 additions & 4 deletions tests/test_consumer/test_extractors/test_extractors_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus():
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x():
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url():
openlineageAdapterVersion=Version("1.10.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
openlineageAdapterVersion=Version("1.9.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="backfill__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB,
Expand Down Expand Up @@ -418,6 +418,260 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
def test_extractors_extract_run_airflow_task_2_x():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=None,
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_check_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=None,
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
Expand Down
Loading

0 comments on commit b9d33ff

Please sign in to comment.