Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

Commit

Permalink
#14: Integrate frictionless baseline validators with workflow_runner
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-yan committed Oct 17, 2024
1 parent f83f704 commit 3fc7c4c
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 38 deletions.
16 changes: 13 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile requirements.in
#
--trusted-host pypi.org
--trusted-host files.pythonhosted.org
--trusted-host pypi.python.org

alembic==1.13.3
# via -r requirements.in
annotated-types==0.7.0
Expand Down Expand Up @@ -41,6 +45,10 @@ dnspython==2.6.1
# via email-validator
email-validator==2.2.0
# via fastapi
exceptiongroup==1.2.2
# via
# anyio
# pytest
fastapi[all]==0.115.0
# via
# -r requirements.in
Expand All @@ -51,8 +59,6 @@ fastapi-cli[standard]==0.0.5
# via fastapi
frictionless==5.17.1
# via -r requirements.in
greenlet==3.1.1
# via sqlalchemy
h11==0.14.0
# via
# httpcore
Expand Down Expand Up @@ -187,19 +193,23 @@ tabulate==0.9.0
# via frictionless
text-unidecode==1.3
# via python-slugify
tomli==2.0.2
# via pytest
typer==0.12.5
# via
# fastapi-cli
# frictionless
typing-extensions==4.12.2
# via
# alembic
# anyio
# fastapi
# frictionless
# pydantic
# pydantic-core
# sqlalchemy
# typer
# uvicorn
ujson==5.10.0
# via fastapi
urllib3==2.2.3
Expand Down
26 changes: 5 additions & 21 deletions server/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,39 +274,23 @@ def run_workflow(
resource = Resource(file.file.read(), format="csv")

try:
# check if the csv is even a valid csv file. Note that this is a stronger
# check than what is performed in `process_workflow` because we are checking
# for if the file adheres to the csv format, and not just the data within it
resource.infer(stats=True)
except frictionless.exception.FrictionlessException:
raise HTTPException(
status_code=400,
detail="Could not parse the input file. Please check that it is a valid .csv file!",
)

# TODO: eventually most, or all, of our `process_workflow` function should make use of
# the Frictionless library so we can take advantage of its streaming functionality
# when validating files.
# POC of validating the input csv just for basic CSV formatting
# see https://framework.frictionlessdata.io/docs/checks/baseline.html#reference-checks.baseline
# for list of baseline checks
report = validate(resource)
if not report.valid:
raise HTTPException(
status_code=400, detail={"errors": report.flatten(["message"])}
)

# get the CSV data from the Frictionless Resource to run our workflow
all_rows: list[dict[str, Any]] = [row.to_dict() for row in resource.read_rows()] # type: ignore
fieldnames = [field.name for field in resource.schema.fields]
filename = file.filename if file.filename else ''
csv_data = CsvData(
column_names=fieldnames,
data=all_rows,
)

# run our workflow
validation_results = process_workflow(filename, csv_data, {}, db_workflow.schema)
validation_results = process_workflow(filename, resource, {}, db_workflow.schema)

return WorkflowRunReport(
row_count=len(all_rows),
row_count=resource.rows,
filename=file.filename if file.filename else '',
workflow_id=workflow_id,
validation_failures=validation_results,
Expand Down
25 changes: 23 additions & 2 deletions server/workflow_runner/validators.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
from typing import Any
from dataclasses import dataclass
import datetime
from server.models.workflow.workflow_schema import BasicFieldDataTypeSchema, FieldSchema, FieldsetSchema, FileTypeValidation, RowCountValidation, TimestampDataTypeSchema
from server.models.workflow.api_schemas import ValidationFailure

from frictionless import validate, Resource
from pydantic import BaseModel, Field

from server.models.workflow.workflow_schema import BasicFieldDataTypeSchema, FieldSchema, FieldsetSchema, FileTypeValidation, RowCountValidation, TimestampDataTypeSchema, CsvData
from server.models.workflow.api_schemas import ValidationFailure

def parse_frictionless(file_contents: str | Resource) -> tuple[Resource, list[ValidationFailure]]:
"""Validate the file using the Frictionless baseline checks"""

if isinstance(file_contents, str):
resource = Resource(file_contents, format="csv")
else:
resource = file_contents

report = validate(resource)
if not report.valid:
return resource, [
ValidationFailure(
row_number=error[0],
message=error[1]
)
for error in report.flatten(["rowNumber", "message"])
]
return resource,[]

def validate_file_type(file_name: str, validation: FileTypeValidation) -> list[ValidationFailure]:
"""Validate the file type of a file."""
Expand Down
40 changes: 28 additions & 12 deletions server/workflow_runner/workflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
from typing import Any
import csv
import io

from frictionless import Resource

from server.models.workflow.workflow_schema import CsvData, FieldsetSchema, FieldsetSchemaValidation, FileTypeValidation, ParamReference, RowCountValidation, WorkflowParam, WorkflowSchema
from server.models.workflow.api_schemas import ValidationFailure
from .exceptions import FieldsetSchemaNotFoundException, ParameterDefinitionNotFoundException
from .validators import validate_fieldset, validate_file_type, validate_row_count
from .validators import validate_fieldset, validate_file_type, validate_row_count, parse_frictionless

def process_workflow(
file_name: str,
file_contents: str | CsvData,
file_contents: Resource | str,
param_values: dict[str, Any],
schema: WorkflowSchema
schema: WorkflowSchema,
implicit_frictionless_validation: bool = True
) -> list[ValidationFailure]:
"""Validate and execute a workflow based on the configured schema and user-provided parameters."""

# generally, workflows will have an implicit frictionless baseline validation
# but this can be turned off if we want the schema to be a completely faithful
# representation of the total validations that will be performed
if implicit_frictionless_validation:
file_resource,frictionless_baseline_validation_failures = parse_frictionless(file_contents)

_validate_param_values(param_values, schema)
csv_data = _get_csv_contents(file_contents) if isinstance(file_contents, str) else file_contents
return _validate_csv(file_name, csv_data, param_values, schema)

csv_data = _get_csv_contents_from_resource(file_resource)
workflow_validation_failures = _validate_csv(file_name, csv_data, param_values, schema)

return frictionless_baseline_validation_failures + workflow_validation_failures


def _validate_param_values(param_values: dict[str, Any], schema: WorkflowSchema):
Expand All @@ -27,14 +41,15 @@ def _validate_param_values(param_values: dict[str, Any], schema: WorkflowSchema)
if not any(param.name == param_name for param in schema.params):
raise ParameterDefinitionNotFoundException(f"Parameter definition for {param_name} not found in schema.")

def _get_csv_contents(contents: str) -> CsvData:
def _get_csv_contents_from_resource(file_resource: Resource) -> CsvData:
"""Get the CSV data from the contents of a file."""
reader = csv.DictReader(io.StringIO(contents))
if reader.fieldnames:
fieldnames = list(reader.fieldnames)
else:
fieldnames = []
return CsvData(column_names=fieldnames, data=list(reader))
all_rows: list[dict[str, Any]] = [row.to_dict() for row in file_resource.read_rows()] # type: ignore
fieldnames = [field.name for field in file_resource.schema.fields]

return CsvData(
column_names=fieldnames,
data=all_rows,
)


def _get_fieldset_schema(fieldset_id: str, fieldsets: list[FieldsetSchema]) -> FieldsetSchema:
Expand All @@ -52,6 +67,7 @@ def _validate_csv(
) -> list[ValidationFailure]:
"""Validate the CSV data based on the configured schema and user-provided parameters."""
validations = []

for operation in schema.operations:
match operation:
case FieldsetSchemaValidation():
Expand Down

0 comments on commit 3fc7c4c

Please sign in to comment.