Skip to content

Commit

Permalink
Merge pull request #511 from OP-TED/feature/TED4-42
Browse files Browse the repository at this point in the history
Feature/ted4 42
  • Loading branch information
CaptainOfHacks authored Oct 30, 2023
2 parents 5aa4c19 + 7bfe169 commit 0edf76e
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 30 deletions.
22 changes: 22 additions & 0 deletions dags/reprocess_dag_params.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from airflow.models import Param

from ted_sws.core.model.notice import NoticeStatus

FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
Expand Down Expand Up @@ -41,3 +43,23 @@
description="This field is optional. If you want to filter notices by XSD version, please insert a XSD version."
)
}

RE_NORMALISE_TARGET_NOTICE_STATES = [NoticeStatus.RAW, NoticeStatus.INDEXED]

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]

RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED]

RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
NoticeStatus.INELIGIBLE_FOR_PUBLISHING]

RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE
]

RE_PROCESS_PUBLISHED_PUBLICLY_AVAILABLE_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE]
11 changes: 5 additions & 6 deletions dags/reprocess_published_in_cellar_notices.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM, RE_PROCESS_PUBLISHED_PUBLICLY_AVAILABLE_TARGET_NOTICE_STATES
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "reprocess_published_in_cellar_notices"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"


Expand All @@ -42,9 +40,10 @@ def select_notices_for_re_transform():
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM)
notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES,
form_number=form_number, start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
notice_ids = notice_ids_selector_by_status(
notice_statuses=RE_PROCESS_PUBLISHED_PUBLICLY_AVAILABLE_TARGET_NOTICE_STATES,
form_number=form_number, start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
Expand Down
6 changes: 3 additions & 3 deletions dags/reprocess_unnormalised_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.operators.DagBatchPipelineOperator import TriggerNoticeBatchPipelineOperator, NOTICE_IDS_KEY
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from dags.reprocess_dag_params import START_DATE_DAG_PARAM, END_DATE_DAG_PARAM, REPROCESS_DATE_RANGE_DAG_PARAMS
from dags.reprocess_dag_params import START_DATE_DAG_PARAM, END_DATE_DAG_PARAM, REPROCESS_DATE_RANGE_DAG_PARAMS, \
RE_NORMALISE_TARGET_NOTICE_STATES

DAG_NAME = "reprocess_unnormalised_notices_from_backlog"

Expand All @@ -36,7 +36,7 @@ def reprocess_unnormalised_notices_from_backlog():
def select_all_raw_notices():
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
notice_ids = notice_ids_selector_by_status(notice_statuses=[NoticeStatus.RAW, NoticeStatus.INDEXED],
notice_ids = notice_ids_selector_by_status(notice_statuses=RE_NORMALISE_TARGET_NOTICE_STATES,
start_date=start_date,
end_date=end_date)
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
Expand Down
6 changes: 1 addition & 5 deletions dags/reprocess_unpackaged_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@
from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM, RE_PACKAGE_TARGET_NOTICE_STATES

DAG_NAME = "reprocess_unpackaged_notices_from_backlog"

RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
NoticeStatus.INELIGIBLE_FOR_PUBLISHING]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"


Expand Down
6 changes: 1 addition & 5 deletions dags/reprocess_unpublished_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@
from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM, RE_PUBLISH_TARGET_NOTICE_STATES
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "reprocess_unpublished_notices_from_backlog"

RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"


Expand Down
11 changes: 3 additions & 8 deletions dags/reprocess_untransformed_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status, \
notice_ids_selector_by_mapping_suite_id
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM, RE_TRANSFORM_TARGET_NOTICE_STATES
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
Expand All @@ -22,11 +21,6 @@

MAPPING_SUITE_ID_DAG_PARAM = "mapping_suite_id"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"

RE_TRANSFORM_DAG_PARAMS = {**REPROCESS_DAG_PARAMS,
Expand Down Expand Up @@ -65,7 +59,8 @@ def select_notices_for_re_transform():
form_number=form_number, start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
if mapping_suite_id:
filtered_notice_ids_by_mapping_suite_id = notice_ids_selector_by_mapping_suite_id(mapping_suite_id=mapping_suite_id)
filtered_notice_ids_by_mapping_suite_id = notice_ids_selector_by_mapping_suite_id(
mapping_suite_id=mapping_suite_id)
notice_ids = list(set(notice_ids).intersection(set(filtered_notice_ids_by_mapping_suite_id)))
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

Expand Down
4 changes: 1 addition & 3 deletions dags/reprocess_unvalidated_notices_from_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM
from ted_sws.core.model.notice import NoticeStatus
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM, RE_VALIDATE_TARGET_NOTICE_STATES
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "reprocess_unvalidated_notices_from_backlog"

RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"


Expand Down
72 changes: 72 additions & 0 deletions dags/reprocessing_backlog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
This DAG is used to re-validate notices.
"""

from airflow.decorators import dag, task
from airflow.models import Param

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from dags.reprocess_dag_params import REPROCESS_DAG_PARAMS, FORM_NUMBER_DAG_PARAM, START_DATE_DAG_PARAM, \
END_DATE_DAG_PARAM, XSD_VERSION_DAG_PARAM, RE_NORMALISE_TARGET_NOTICE_STATES, RE_TRANSFORM_TARGET_NOTICE_STATES, \
RE_PACKAGE_TARGET_NOTICE_STATES, RE_PROCESS_PUBLISHED_PUBLICLY_AVAILABLE_TARGET_NOTICE_STATES, \
RE_PUBLISH_TARGET_NOTICE_STATES
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "reprocess_backlog"

RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"

REPROCESS_TYPE_MAP = {"unnormalised": RE_NORMALISE_TARGET_NOTICE_STATES,
"untransformed": RE_TRANSFORM_TARGET_NOTICE_STATES,
"unvalidated": RE_VALIDATE_TARGET_NOTICE_STATES,
"unpackaged": RE_PACKAGE_TARGET_NOTICE_STATES,
"unpublished": RE_PUBLISH_TARGET_NOTICE_STATES,
"publicly available": RE_PROCESS_PUBLISHED_PUBLICLY_AVAILABLE_TARGET_NOTICE_STATES}


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
description=__doc__[0: __doc__.find(".")],
doc_md=__doc__,
tags=['selector', 're-validate'],
params={**REPROCESS_DAG_PARAMS,
"reprocess_type": Param(default="unnormalised", title="Reprocess Type",
enum=list(REPROCESS_TYPE_MAP.keys()))
}
)
def reprocess_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_re_processing",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
))
)
def select_notices_for_re_validate():
form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM)
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM)
reprocess_type = get_dag_param(key="reprocess_type")

notice_ids = notice_ids_selector_by_status(notice_statuses=REPROCESS_TYPE_MAP[reprocess_type],
form_number=form_number, start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
)
select_notices_for_re_validate() >> trigger_notice_process_workflow


dag = reprocess_backlog()

0 comments on commit 0edf76e

Please sign in to comment.