diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b8b5da..63e9791 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,10 +14,12 @@ All notable changes to this project will be documented in this file. - transformation header changed - added argument to skip dependency checking - added overrides parameter to allow for dynamic overriding of config values + - removed date_from and date_to from arguments, use overrides instead #### Jobs - jobs are now the main way to create all pipelines - config holder removed from jobs - metadata_manager and feature_loader are now available arguments, depending on configuration + - added @config decorator, similar use case to @datasource, for parsing configuration #### TableReader - function signatures changed - until -> date_until diff --git a/README.md b/README.md index 974244b..2ac915f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ - +from pydantic import BaseModelfrom rialto.runner.config_loader import PipelineConfigfrom rialto.jobs import config # Rialto @@ -54,8 +54,6 @@ A runner by default executes all the jobs provided in the configuration file, fo This behavior can be modified by various parameters and switches available. * **run_date** - date at which the runner is triggered (defaults to day of running) -* **date_from** - starting date (defaults to rundate - config watch period) -* **date_until** - end date (defaults to rundate) * **rerun** - rerun all jobs even if they already succeeded in the past runs * **op** - run only selected operation / pipeline * **skip_dependencies** - ignore dependency checks and run all jobs @@ -131,6 +129,9 @@ pipelines: # a list of pipelines to run interval: units: "days" value: 6 + target: + target_schema: catalog.schema # schema where tables will be created, must exist + target_partition_column: INFORMATION_DATE # date to partition new tables on ``` The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema. @@ -371,8 +372,18 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo ```python from pyspark.sql import DataFrame from rialto.common import TableReader -from rialto.jobs.decorators import job, datasource +from rialto.jobs.decorators import config, job, datasource +from rialto.runner.config_loader import PipelineConfig +from pydantic import BaseModel + + +class ConfigModel(BaseModel): + some_value: int + some_other_value: str +@config +def my_config(config: PipelineConfig): + return ConfigModel(**config.extras) @datasource def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame: @@ -380,8 +391,8 @@ def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFra @job -def my_job(my_datasource: DataFrame) -> DataFrame: - return my_datasource.withColumn("HelloWorld", F.lit(1)) +def my_job(my_datasource: DataFrame, my_config: ConfigModel) -> DataFrame: + return my_datasource.withColumn("HelloWorld", F.lit(my_config.some_value)) ``` This piece of code 1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner. diff --git a/rialto/jobs/__init__.py b/rialto/jobs/__init__.py index 90183bd..a6ee6cb 100644 --- a/rialto/jobs/__init__.py +++ b/rialto/jobs/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from rialto.jobs.decorators import datasource, job +from rialto.jobs.decorators import config, datasource, job diff --git a/rialto/jobs/decorators/__init__.py b/rialto/jobs/decorators/__init__.py index ba62141..6f2713a 100644 --- a/rialto/jobs/decorators/__init__.py +++ b/rialto/jobs/decorators/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .decorators import datasource, job +from .decorators import config, datasource, job diff --git a/rialto/jobs/decorators/decorators.py b/rialto/jobs/decorators/decorators.py index 217b436..d288b7b 100644 --- a/rialto/jobs/decorators/decorators.py +++ b/rialto/jobs/decorators/decorators.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ["datasource", "job"] +__all__ = ["datasource", "job", "config"] import inspect import typing @@ -24,6 +24,20 @@ from rialto.jobs.decorators.resolver import Resolver +def config(ds_getter: typing.Callable) -> typing.Callable: + """ + Config parser functions decorator. + + Registers a config parsing function into a rialto job prerequisite. + You can then request the job via job function arguments. + + :param ds_getter: dataset reader function + :return: raw reader function, unchanged + """ + Resolver.register_callable(ds_getter) + return ds_getter + + def datasource(ds_getter: typing.Callable) -> typing.Callable: """ Dataset reader functions decorator. diff --git a/rialto/jobs/decorators/test_utils.py b/rialto/jobs/decorators/test_utils.py index 5465d6e..39d76ce 100644 --- a/rialto/jobs/decorators/test_utils.py +++ b/rialto/jobs/decorators/test_utils.py @@ -17,9 +17,10 @@ import importlib import typing from contextlib import contextmanager -from unittest.mock import patch, create_autospec, MagicMock -from rialto.jobs.decorators.resolver import Resolver, ResolverException +from unittest.mock import MagicMock, create_autospec, patch + from rialto.jobs.decorators.job_base import JobBase +from rialto.jobs.decorators.resolver import Resolver, ResolverException def _passthrough_decorator(*args, **kwargs) -> typing.Callable: @@ -34,6 +35,8 @@ def _disable_job_decorators() -> None: patches = [ patch("rialto.jobs.decorators.datasource", _passthrough_decorator), patch("rialto.jobs.decorators.decorators.datasource", _passthrough_decorator), + patch("rialto.jobs.decorators.config", _passthrough_decorator), + patch("rialto.jobs.decorators.decorators.config", _passthrough_decorator), patch("rialto.jobs.decorators.job", _passthrough_decorator), patch("rialto.jobs.decorators.decorators.job", _passthrough_decorator), ] diff --git a/rialto/runner/config_loader.py b/rialto/runner/config_loader.py index c4ce193..86c142d 100644 --- a/rialto/runner/config_loader.py +++ b/rialto/runner/config_loader.py @@ -71,7 +71,6 @@ class MetadataManagerConfig(BaseModel): class FeatureLoaderConfig(BaseModel): - config_path: str feature_schema: str metadata_schema: str @@ -81,7 +80,7 @@ class PipelineConfig(BaseModel): module: ModuleConfig schedule: ScheduleConfig dependencies: Optional[List[DependencyConfig]] = [] - target: Optional[TargetConfig] = None + target: TargetConfig = None metadata_manager: Optional[MetadataManagerConfig] = None feature_loader: Optional[FeatureLoaderConfig] = None extras: Optional[Dict] = {} diff --git a/rialto/runner/runner.py b/rialto/runner/runner.py index 0178f05..ac9d6bc 100644 --- a/rialto/runner/runner.py +++ b/rialto/runner/runner.py @@ -39,8 +39,6 @@ def __init__( spark: SparkSession, config_path: str, run_date: str = None, - date_from: str = None, - date_until: str = None, rerun: bool = False, op: str = None, skip_dependencies: bool = False, @@ -49,9 +47,6 @@ def __init__( self.spark = spark self.config = get_pipelines_config(config_path, overrides) self.reader = TableReader(spark) - - self.date_from = date_from - self.date_until = date_until self.rerun = rerun self.skip_dependencies = skip_dependencies self.op = op @@ -61,19 +56,15 @@ def __init__( run_date = DateManager.str_to_date(run_date) else: run_date = date.today() - if self.date_from: - self.date_from = DateManager.str_to_date(date_from) - if self.date_until: - self.date_until = DateManager.str_to_date(date_until) - - if not self.date_from: - self.date_from = DateManager.date_subtract( - run_date=run_date, - units=self.config.runner.watched_period_units, - value=self.config.runner.watched_period_value, - ) - if not self.date_until: - self.date_until = run_date + + self.date_from = DateManager.date_subtract( + run_date=run_date, + units=self.config.runner.watched_period_units, + value=self.config.runner.watched_period_value, + ) + + self.date_until = run_date + if self.date_from > self.date_until: raise ValueError(f"Invalid date range from {self.date_from} until {self.date_until}") logger.info(f"Running period from {self.date_from} until {self.date_until}") diff --git a/tests/jobs/test_decorators.py b/tests/jobs/test_decorators.py index 6496a2d..54cb4a4 100644 --- a/tests/jobs/test_decorators.py +++ b/tests/jobs/test_decorators.py @@ -25,6 +25,13 @@ def test_dataset_decorator(): assert test_dataset == "dataset_return" +def test_config_decorator(): + _ = import_module("tests.jobs.test_job.test_job") + test_dataset = Resolver.resolve("custom_config") + + assert test_dataset == "config_return" + + def _rialto_import_stub(module_name, class_name): module = import_module(module_name) class_obj = getattr(module, class_name) diff --git a/tests/jobs/test_job/test_job.py b/tests/jobs/test_job/test_job.py index bc3cb69..3d648b5 100644 --- a/tests/jobs/test_job/test_job.py +++ b/tests/jobs/test_job/test_job.py @@ -11,9 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from rialto.jobs.decorators import config, datasource, job -from rialto.jobs.decorators import datasource, job +@config +def custom_config(): + return "config_return" @datasource diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index 9f16ea0..e23eee8 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -108,10 +108,10 @@ def test_init_dates(spark): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-01", - date_until="2023-03-31", + run_date="2023-03-31", + overrides={"runner.watched_period_units": "weeks", "runner.watched_period_value": 2}, ) - assert runner.date_from == DateManager.str_to_date("2023-03-01") + assert runner.date_from == DateManager.str_to_date("2023-03-17") assert runner.date_until == DateManager.str_to_date("2023-03-31") runner = Runner( @@ -156,8 +156,7 @@ def test_check_dates_have_partition(spark, mocker): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-01", - date_until="2023-03-31", + run_date="2023-03-31", ) runner.reader = MockReader(spark) dates = ["2023-03-04", "2023-03-05", "2023-03-06"] @@ -173,8 +172,7 @@ def test_check_dates_have_partition_no_table(spark, mocker): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-01", - date_until="2023-03-31", + run_date="2023-03-31", ) dates = ["2023-03-04", "2023-03-05", "2023-03-06"] dates = [DateManager.str_to_date(d) for d in dates] @@ -193,8 +191,7 @@ def test_check_dependencies(spark, mocker, r_date, expected): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-01", - date_until="2023-03-31", + run_date="2023-03-31", ) runner.reader = MockReader(spark) res = runner.check_dependencies(runner.config.pipelines[0], DateManager.str_to_date(r_date)) @@ -207,8 +204,7 @@ def test_check_no_dependencies(spark, mocker): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-01", - date_until="2023-03-31", + run_date="2023-03-31", ) runner.reader = MockReader(spark) res = runner.check_dependencies(runner.config.pipelines[1], DateManager.str_to_date("2023-03-05")) @@ -221,8 +217,8 @@ def test_select_dates(spark, mocker): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-01", - date_until="2023-03-31", + run_date="2023-03-31", + overrides={"runner.watched_period_units": "months", "runner.watched_period_value": 1}, ) runner.reader = MockReader(spark) @@ -243,8 +239,8 @@ def test_select_dates_all_done(spark, mocker): runner = Runner( spark, config_path="tests/runner/transformations/config.yaml", - date_from="2023-03-02", - date_until="2023-03-02", + run_date="2023-03-02", + overrides={"runner.watched_period_units": "months", "runner.watched_period_value": 0}, ) runner.reader = MockReader(spark)