Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-22750] Add owner parsing for airflow run events #135

Merged
merged 5 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions data_rentgen/consumer/extractors/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
spark_application_details = event.run.facets.spark_applicationDetails
if spark_application_details:
run.user = UserDTO(name=spark_application_details.userName)

# Airflow DAG and task have 'owner' field, but if can be either user or group name,
# and also it does not mean that this exact user started this run.
# Airflow using different facets for version above provider-opelineage/1.11.0.
airflow_application_details = event.run.facets.airflow
if airflow_application_details and all(
(
airflow_application_details.dag.owner is not None,
airflow_application_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]

airflow_application_dag_details = event.run.facets.airflowDagRun
if airflow_application_dag_details and all(
(
airflow_application_dag_details.dag.owner is not None,
airflow_application_dag_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]

return run
1 change: 1 addition & 0 deletions data_rentgen/consumer/openlineage/run_facets/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase):
"""

dag_id: str
owner: str | None = None


class OpenLineageAirflowDagRunType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
RunDTO,
RunStartReasonDTO,
RunStatusDTO,
UserDTO,
)


Expand Down Expand Up @@ -66,7 +67,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dolfinus marked this conversation as resolved.
Show resolved Hide resolved
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -137,7 +138,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
openlineageAdapterVersion=Version("1.10.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -231,6 +232,10 @@ def extracted_airflow_dag_run(
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
Expand All @@ -247,6 +252,10 @@ def extracted_airflow_task_run(
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt="1",
Expand Down Expand Up @@ -300,7 +309,6 @@ def test_extractors_extract_batch_airflow(

assert not extracted.datasets()
assert not extracted.dataset_symlinks()
assert not extracted.users()
assert not extracted.schemas()
assert not extracted.operations()
assert not extracted.inputs()
Expand Down
262 changes: 258 additions & 4 deletions tests/test_consumer/test_extractors/test_extractors_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus():
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x():
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url():
openlineageAdapterVersion=Version("1.10.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
openlineageAdapterVersion=Version("1.9.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="backfill__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB,
Expand Down Expand Up @@ -418,6 +418,260 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
def test_extractors_extract_run_airflow_task_2_x():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=None,
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_check_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=None,
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
Expand Down
Loading
Loading