Skip to content

Commit

Permalink
Improve validation report output (#320)
Browse files Browse the repository at this point in the history
* improve report output

* working on orshapes

* add reason to orshape

* add orshape diff to report

* hashable

* fixing result_uri?

* remove print

* use actual validation report

* fix imports

* formatting

* fix tests to match new shacl interpretation output

* fix tests to match new shacl interpretation output

* add test case for property shapes

* use pyshacl parsing of paths

* formatting and imports

* remove Literal import

* clean up commented code

* add test case for Or

* format

* qname for paths in validation reasons

* handle prefixes whe nconverting to sparql paths

* format code

* fixing up shape_to_query and adding tests

* dedup count code

* move count formatting function to graphdiff and fix test
  • Loading branch information
gtfierro authored Nov 19, 2024
1 parent b1275d5 commit dd5b048
Show file tree
Hide file tree
Showing 7 changed files with 975 additions and 769 deletions.
171 changes: 109 additions & 62 deletions buildingmotif/dataclasses/shape_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union

import rdflib
from pyshacl.helper.path_helper import shacl_path_to_sparql_path
from rdflib import RDF, RDFS, Graph, URIRef
from rdflib.paths import ZeroOrMore, ZeroOrOne
from rdflib.term import Node
Expand Down Expand Up @@ -267,7 +268,7 @@ def shape_to_query(self, shape: URIRef) -> str:
- `<shape> sh:property [ sh:path <path>; sh:hasValue <value>]` ->
?target <path> <value>
"""
clauses, project = _shape_to_where(self.graph, shape)
clauses, project = _shape_to_where(self.graph, shape, "?target")
preamble = """PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
Expand All @@ -279,30 +280,73 @@ def _is_list(graph: Graph, node: Node):
return (node, RDF.first, None) in graph


def _sh_path_to_path(graph: Graph, sh_path_value: Node):
# check if sh:path points to a list
if _is_list(graph, sh_path_value):
components = list(
graph.objects(sh_path_value, (RDF.rest * ZeroOrMore) / RDF.first) # type: ignore
def _target_to_sparql(graph: Graph, nodeshape: Node, root_var: str = "?target") -> str:
"""
Takes the nodeshape and returns the SPARQL query that would be used to
find the target nodes of that nodeshape. This is a helper function for
_shape_to_where
Handles:
- targetClass
- targetSubjectsOf
- targetObjectsOf
- targetNode
If there is more than one of these clauses on the nodeshape, they are
combined with a UNION.
Returns the string of the query.
"""

# get all the clauses for the targetClass
targetClasses = graph.objects(nodeshape, SH.targetClass)
tc_clauses = [
f"{root_var} rdf:type/rdfs:subClassOf* {tc.n3()} .\n" for tc in targetClasses # type: ignore
]
# get all the clauses for the targetSubjectsOf
targetSubjectsOf = graph.objects(nodeshape, SH.targetSubjectsOf)
tso_clauses = [
f"{root_var} {tso.n3()} ?ignore .\n" for tso in targetSubjectsOf # type: ignore
]
# get all the clauses for the targetObjectsOf
targetObjectsOf = graph.objects(nodeshape, SH.targetObjectsOf)
too_clauses = [
f"?ignore {too.n3()} {root_var} .\n" for too in targetObjectsOf # type: ignore
]

# get all the clauses for the targetNode
targetNode = list(graph.objects(nodeshape, SH.targetNode))
tn_clauses = [
f"BIND({tn.n3()} AS {root_var}) .\n" for tn in targetNode # type: ignore
]

# combine all the clauses with a UNION
all_clauses = tc_clauses + tso_clauses + too_clauses + tn_clauses
return " UNION ".join(f"{{ {clause} }}" for clause in all_clauses)


def _clauses_on_nodeshape(
graph: Graph, nodeshape: Node, root_variable: str = "?target"
) -> str:
"""handles the constraint components on a node shape (other than targetClass, targetSubjectsOf, targetObjectsOf, targetNode).
Builds up the SPARQL query for the given node shape, starting with the given root variable.
"""
clauses = []
# handle sh:class
for class_constraint in graph.objects(nodeshape, SH["class"]):
clauses.append(
f"{root_variable} rdf:type/rdfs:subClassOf* {class_constraint.n3()} .\n"
)
return "/".join([_sh_path_to_path(graph, comp) for comp in components])
part = graph.value(sh_path_value, SH.oneOrMorePath)
if part is not None:
return f"{_sh_path_to_path(graph, part)}+"
part = graph.value(sh_path_value, SH.zeroOrMorePath)
if part is not None:
return f"{_sh_path_to_path(graph, part)}*"
part = graph.value(sh_path_value, SH.zeroOrOnePath)
if part is not None:
return f"{_sh_path_to_path(graph, part)}?"
return sh_path_value.n3()


def _shape_to_where(graph: Graph, shape: URIRef) -> Tuple[str, List[str]]:

return " ".join(clauses)


def _shape_to_where(
graph: Graph, shape: URIRef, root_var: str = "?target"
) -> Tuple[str, List[str]]:
# we will build the query as a string
clauses: str = ""
# build up the SELECT clause as a set of vars
project: Set[str] = {"?target"}
project: Set[str] = {root_var}

# local state for generating unique variable names
prefix = "".join(random.choice(string.ascii_lowercase) for _ in range(2))
Expand All @@ -314,62 +358,55 @@ def gensym():
variable_counter += 1
return varname

# `<shape> sh:targetClass <class>` -> `?target rdf:type/rdfs:subClassOf* <class>`
targetClasses = graph.objects(shape, SH.targetClass | SH["class"])
tc_clauses = [
f"?target rdf:type/rdfs:subClassOf* {tc.n3()} .\n" for tc in targetClasses # type: ignore
]
clauses += " UNION ".join(tc_clauses)

# handle targetSubjectsOf
targetSubjectsOf = graph.objects(shape, SH.targetSubjectsOf)
tso_clauses = [
f"?target {tso.n3()} ?ignore .\n" for tso in targetSubjectsOf # type: ignore
]
clauses += " UNION ".join(tso_clauses)
# get all the target clauses
clauses += _target_to_sparql(graph, shape, root_var)

# handle targetObjectsOf
targetObjectsOf = graph.objects(shape, SH.targetObjectsOf)
too_clauses = [
f"?ignore {too.n3()} ?target .\n" for too in targetObjectsOf # type: ignore
]
clauses += " UNION ".join(too_clauses)

# handle targetNode
targetNode = list(graph.objects(shape, SH.targetNode))
if len(targetNode) == 1:
clauses += f"BIND({targetNode[0].n3()} AS ?target) .\n"
elif len(targetNode) > 1:
raise ValueError(
"More than one targetNode found. This is not currently supported"
)
clauses += _clauses_on_nodeshape(graph, shape, root_var)

# find all of the non-qualified property shapes. All of these will use the same variable
# for all uses of the same sh:path value
pshapes_by_path: Dict[Node, List[Node]] = defaultdict(list)
qualified_pshapes: Set[Node] = set()
for pshape in graph.objects(shape, SH.property):
path = _sh_path_to_path(graph, graph.value(pshape, SH.path))
path = shacl_path_to_sparql_path(graph, graph.value(pshape, SH.path))
if not graph.value(pshape, SH.qualifiedValueShape):
pshapes_by_path[path].append(pshape) # type: ignore
else:
qualified_pshapes.add(pshape)

# look at pshapes implicitly defined by sh:path
for pshape in graph.subjects(predicate=SH.path):
if (
pshape == shape
): # skip the input 'shape', otherwise this will infinitely recurse
continue
path = shacl_path_to_sparql_path(graph, graph.value(pshape, SH.path))
if not graph.value(pshape, SH.qualifiedValueShape):
pshapes_by_path[path].append(pshape) # type: ignore
else:
qualified_pshapes.add(pshape)

for dep_shape in graph.objects(shape, SH.node):
dep_clause, dep_project = _shape_to_where(graph, dep_shape)
dep_clause, dep_project = _shape_to_where(graph, dep_shape, root_var)
clauses += dep_clause
project.update(dep_project)

for or_clause in graph.objects(shape, SH["or"]):
items = list(graph.objects(or_clause, (RDF.rest * ZeroOrMore) / RDF.first)) # type: ignore
or_parts = []
for item in items:
or_body, or_project = _shape_to_where(graph, item)
or_body, or_project = _shape_to_where(graph, item, root_var)
or_parts.append(or_body)
project.update(or_project)
clauses += " UNION ".join(f"{{ {or_body} }}" for or_body in or_parts)

# 'pshapes_by_path' maps a path to all of the property shapes that use that path on the target
# assign a unique variable for each sh:path w/o a qualified shape
pshape_vars: Dict[Node, str] = {}
for pshape_list in pshapes_by_path.values():
varname = f"?{gensym()}"
# get name if it exists, otherwise generate a new one
pshape_name = graph.value(pshape_list[0], SH.name | RDFS.label) or gensym()
varname = f"?{pshape_name}"
for pshape in pshape_list:
pshape_vars[pshape] = varname

Expand All @@ -378,16 +415,17 @@ def gensym():
# or generate a new one. When generating a name, use the SH.name field
# in the PropertyShape or generate a unique one
name = pshape_vars.get(
pshape, f"?{graph.value(pshape, SH.name) or gensym()}".replace(" ", "_")
pshape,
f"?{graph.value(pshape, SH.name|RDFS.label) or gensym()}".replace(" ", "_"),
)
path = _sh_path_to_path(graph, graph.value(pshape, SH.path))
path = shacl_path_to_sparql_path(graph, graph.value(pshape, SH.path))
qMinCount = graph.value(pshape, SH.qualifiedMinCount) or 0

pclass = graph.value(
pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["class"]) # type: ignore
)
if pclass:
clause = f"?target {path} {name} .\n {name} rdf:type/rdfs:subClassOf* {pclass.n3()} .\n"
clause = f"{root_var} {path} {name} .\n {name} rdf:type/rdfs:subClassOf* {pclass.n3()} .\n"
if qMinCount == 0:
clause = f"OPTIONAL {{ {clause} }} .\n"
clauses += clause
Expand All @@ -397,29 +435,38 @@ def gensym():
pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["node"]) # type: ignore
)
if pnode:
node_clauses, node_project = _shape_to_where(graph, pnode)
clause = f"?target {path} {name} .\n"
clause += node_clauses.replace("?target", name)
node_clauses, node_project = _shape_to_where(graph, pnode, root_var)
clause = f"{root_var} {path} {name} .\n"
clause += node_clauses.replace(root_var, name)
if qMinCount == 0:
clause = f"OPTIONAL {{ {clause} }}"
clauses += clause
project.update({p.replace("?target", name) for p in node_project})
project.update({p.replace(root_var, name) for p in node_project})

or_values = graph.value(
pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["or"])
)
if or_values:
# or clauses share the variable name. Get the variablen name from the SH.name
# or RDFS.label for the current pshape, or generate a new one
or_var = graph.value(pshape, SH.name | RDFS.label) or gensym()
or_var = f"?{or_var}".replace(" ", "_")
# connect ?target to the variable that will be used in the OR clauses
clauses += f"{root_var} {path} {or_var} .\n"
items = list(graph.objects(or_values, (RDF.rest * ZeroOrMore) / RDF.first))
or_parts = []
for item in items:
or_body, or_project = _shape_to_where(graph, item)
or_body, or_project = _shape_to_where(graph, item, or_var)
or_parts.append(or_body)
project.update(or_project)
clauses += " UNION ".join(f"{{ {or_body} }}" for or_body in or_parts)

pvalue = graph.value(pshape, SH.hasValue)
if pvalue:
clauses += f"?target {path} {pvalue.n3()} .\n"
clauses += f"{root_var} {path} {pvalue.n3()} .\n"

if not pclass and not pnode and not or_values and not pvalue:
clauses += f"{root_var} {path} {name} .\n"

return clauses, list(project)

Expand Down
Loading

0 comments on commit dd5b048

Please sign in to comment.