Skip to content

Commit

Permalink
Merge pull request #23 from AbsaOSS/Release/2.0.3
Browse files Browse the repository at this point in the history
added timestamp to bookkeeper
  • Loading branch information
MDobransky authored Jan 27, 2025
2 parents 1ee9c53 + 24cb4e6 commit 3f3d648
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Change Log
All notable changes to this project will be documented in this file.

## 2.0.3 - 2025-01-27
### Runner
- added run time timestamp to record

## 2.0.2 - 2025-01-24
### Runner
- new bookkeeping functionality
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
project = "rialto"
copyright = "2022, Marek Dobransky"
author = "Marek Dobransky"
release = "2.0.2"
release = "2.0.3"

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "rialto"

version = "2.0.2"
version = "2.0.3"

packages = [
{ include = "rialto" },
Expand Down
2 changes: 1 addition & 1 deletion rialto/runner/bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def add(self, record: Record) -> None:
:param record: Record to add to the table.
"""
new = self.spark.createDataFrame([record.to_spark_row()], Record.schema)
new = self.spark.createDataFrame([record.to_spark_row()], Record.get_schema())
db = self._load()
if db:
db = db.unionByName(new)
Expand Down
40 changes: 26 additions & 14 deletions rialto/runner/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import ClassVar, Optional
from typing import Optional

from pyspark.sql import Row
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType
from pyspark.sql.types import (
DateType,
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)


@dataclass
Expand All @@ -34,19 +41,23 @@ class Record:
status: str
reason: str
exception: Optional[str] = None
run_timestamp: datetime.timestamp = datetime.now().isoformat(sep=" ", timespec="seconds")

schema: ClassVar[StructType] = StructType(
[
StructField("job", StringType(), nullable=False),
StructField("target", StringType(), nullable=False),
StructField("date", DateType(), nullable=False),
StructField("time", StringType(), nullable=False),
StructField("records", IntegerType(), nullable=False),
StructField("status", StringType(), nullable=False),
StructField("reason", StringType(), nullable=False),
StructField("exception", StringType(), nullable=True),
]
)
def get_schema(self) -> StructType:
"""Retrieve schema of pyspark DataFrame"""
return StructType(
[
StructField("job", StringType(), nullable=False),
StructField("target", StringType(), nullable=False),
StructField("date", DateType(), nullable=False),
StructField("time", StringType(), nullable=False),
StructField("records", IntegerType(), nullable=False),
StructField("status", StringType(), nullable=False),
StructField("reason", StringType(), nullable=False),
StructField("exception", StringType(), nullable=True),
StructField("run_timestamp", TimestampType(), nullable=False),
]
)

def to_spark_row(self) -> Row:
"""Convert Record to Spark Row"""
Expand All @@ -59,4 +70,5 @@ def to_spark_row(self) -> Row:
status=self.status,
reason=self.reason,
exception=self.exception,
run_timestamp=self.run_timestamp,
)
5 changes: 4 additions & 1 deletion tests/runner/test_bookkeeping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta
from datetime import datetime, timedelta

from rialto.runner.date_manager import DateManager
from rialto.runner.record import Record
Expand All @@ -11,6 +11,8 @@
1,
"status",
"reason",
None,
datetime(2024, 1, 1, 1, 2, 3),
)


Expand All @@ -24,3 +26,4 @@ def test_record_to_spark(spark):
assert row.status == "status"
assert row.reason == "reason"
assert row.exception is None
assert row.run_timestamp == datetime(2024, 1, 1, 1, 2, 3)

0 comments on commit 3f3d648

Please sign in to comment.