Skip to content

Commit

Permalink
Tweak topological sort to preserve client-specified order
Browse files Browse the repository at this point in the history
If there are no explicit dependencies between two nodes,
try to keep them in the same order.

By doing this, we can minimize the need for synthetic dependencies.
  • Loading branch information
justinsb committed Jan 30, 2025
1 parent cf278c7 commit b1c232f
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 180 deletions.
28 changes: 16 additions & 12 deletions pkg/graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ func (b *Builder) NewResourceGraphDefinition(originalCR *v1alpha1.ResourceGraphD

// we'll also store the resources in a map for easy access later.
resources := make(map[string]*Resource)
for _, rgResource := range rgd.Spec.Resources {
r, err := b.buildRGResource(rgResource, namespacedResources)
for i, rgResource := range rgd.Spec.Resources {
id := rgResource.ID
order := i
r, err := b.buildRGResource(rgResource, namespacedResources, order)
if err != nil {
return nil, fmt.Errorf("failed to build resource '%v': %v", rgResource.ID, err)
return nil, fmt.Errorf("failed to build resource %q: %w", id, err)
}
resources[rgResource.ID] = r
if resources[id] != nil {
return nil, fmt.Errorf("found resources with duplicate id %q", id)
}
resources[id] = r
}

// At this stage we have a superficial understanding of the resources that are
Expand Down Expand Up @@ -246,7 +251,7 @@ func (b *Builder) NewResourceGraphDefinition(originalCR *v1alpha1.ResourceGraphD
// It provides a high-level understanding of the resource, by extracting the
// OpenAPI schema, emualting the resource and extracting the cel expressions
// from the schema.
func (b *Builder) buildRGResource(rgResource *v1alpha1.Resource, namespacedResources map[k8sschema.GroupKind]bool) (*Resource, error) {
func (b *Builder) buildRGResource(rgResource *v1alpha1.Resource, namespacedResources map[k8sschema.GroupKind]bool, order int) (*Resource, error) {
// 1. We need to unmashal the resource into a map[string]interface{} to
// make it easier to work with.
resourceObject := map[string]interface{}{}
Expand Down Expand Up @@ -334,6 +339,7 @@ func (b *Builder) buildRGResource(rgResource *v1alpha1.Resource, namespacedResou
readyWhenExpressions: readyWhen,
includeWhenExpressions: includeWhen,
namespaced: isNamespaced,
order: order,
}, nil
}

Expand Down Expand Up @@ -364,13 +370,13 @@ func (b *Builder) buildDependencyGraph(

directedAcyclicGraph := dag.NewDirectedAcyclicGraph()
// Set the vertices of the graph to be the resources defined in the resource graph definition.
for resourceName := range resources {
if err := directedAcyclicGraph.AddVertex(resourceName); err != nil {
for _, resource := range resources {
if err := directedAcyclicGraph.AddVertex(resource.id, resource.order); err != nil {
return nil, fmt.Errorf("failed to add vertex to graph: %w", err)
}
}

for resourceName, resource := range resources {
for _, resource := range resources {
for _, resourceVariable := range resource.variables {
for _, expression := range resourceVariable.Expressions {
// We need to inspect the expression to understand how it relates to the
Expand All @@ -397,10 +403,8 @@ func (b *Builder) buildDependencyGraph(
resource.addDependencies(resourceDependencies...)
resourceVariable.AddDependencies(resourceDependencies...)
// We need to add the dependencies to the graph.
for _, dependency := range resourceDependencies {
if err := directedAcyclicGraph.AddEdge(resourceName, dependency); err != nil {
return nil, err
}
if err := directedAcyclicGraph.AddDependencies(resource.id, resourceDependencies); err != nil {
return nil, err
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/graph/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
assert.Contains(t, clusterDeps, "subnet2")

// Validate topological order
assert.Equal(t, []string{"clusterpolicy", "clusterrole", "vpc", "subnet1", "subnet2", "cluster"}, g.TopologicalOrder)
assert.Equal(t, []string{"vpc", "clusterpolicy", "clusterrole", "subnet1", "subnet2", "cluster"}, g.TopologicalOrder)
},
},
{
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
}, nil, nil),
},
wantErr: true,
errMsg: "This would create a cycle",
errMsg: "graph contains a cycle",
},
{
name: "independent pods",
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {
}, nil, nil),
},
wantErr: true,
errMsg: "This would create a cycle",
errMsg: "graph contains a cycle",
},
{
name: "shared infrastructure dependencies",
Expand Down Expand Up @@ -919,17 +919,17 @@ func TestGraphBuilder_DependencyValidation(t *testing.T) {

// Validate topological order
assert.Equal(t, []string{
"policy",
"role",
"vpc",
"subnet1",
"subnet2",
"subnet3",
"secgroup",
"policy",
"role",
"cluster1",
"cluster2",
"cluster3",
"monitor",
"secgroup",
}, g.TopologicalOrder)
},
},
Expand Down
178 changes: 93 additions & 85 deletions pkg/graph/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package dag

import (
"errors"
"fmt"
"sort"
"strings"
Expand All @@ -23,9 +24,22 @@ import (
type Vertex struct {
// ID is a unique identifier for the node
ID string
// Edges stores the IDs of the nodes that this node has an outgoing edge to.
// In kro, this would be the children of a resource.
Edges map[string]struct{}
// Order records the original order, and is used to preserve the original user-provided ordering as far as posible.
Order int
// DependsOn stores the IDs of the nodes that this node depends on.
// If we depend on another vertex, we must appear after that vertex in the topological sort.
DependsOn map[string]struct{}
}

func (v Vertex) String() string {
var dependsOn strings.Builder
for id := range v.DependsOn {
if dependsOn.Len() != 0 {
dependsOn.WriteString(",")
}
dependsOn.WriteString(id)
}
return fmt.Sprintf("Vertex[ID: %s, Order: %d, DependsOn: %s]", v.ID, v.Order, dependsOn.String())
}

// DirectedAcyclicGraph represents a directed acyclic graph
Expand All @@ -42,134 +56,128 @@ func NewDirectedAcyclicGraph() *DirectedAcyclicGraph {
}

// AddVertex adds a new node to the graph.
func (d *DirectedAcyclicGraph) AddVertex(id string) error {
func (d *DirectedAcyclicGraph) AddVertex(id string, order int) error {
if _, exists := d.Vertices[id]; exists {
return fmt.Errorf("node %s already exists", id)
}
d.Vertices[id] = &Vertex{
ID: id,
Edges: make(map[string]struct{}),
ID: id,
Order: order,
DependsOn: make(map[string]struct{}),
}
return nil
}

type CycleError struct {
From, to string
Cycle []string
Cycle []string
}

func (e *CycleError) Error() string {
return fmt.Sprintf("Cannot add edge from %s to %s. This would create a cycle: %s", e.From, e.to, formatCycle(e.Cycle))
return fmt.Sprintf("graph contains a cycle: %s", formatCycle(e.Cycle))
}

func formatCycle(cycle []string) string {
return strings.Join(cycle, " -> ")
}

// AddEdge adds a directed edge from one node to another.
func (d *DirectedAcyclicGraph) AddEdge(from, to string) error {
// AsCycleError returns the (potentially wrapped) CycleError, or nil if it is not a CycleError.
func AsCycleError(err error) *CycleError {
cycleError := &CycleError{}
if errors.As(err, &cycleError) {
return cycleError
}
return nil
}

// AddDependencies adds a set of dependencies to the "from" vertex.
// This indicates that all the vertexes in "dependencies" must occur before "from".
func (d *DirectedAcyclicGraph) AddDependencies(from string, dependencies []string) error {
fromNode, fromExists := d.Vertices[from]
_, toExists := d.Vertices[to]
if !fromExists {
return fmt.Errorf("node %s does not exist", from)
}
if !toExists {
return fmt.Errorf("node %s does not exist", to)
}
if from == to {
return fmt.Errorf("self references are not allowed")
}

fromNode.Edges[to] = struct{}{}
for _, dependency := range dependencies {
_, toExists := d.Vertices[dependency]
if !toExists {
return fmt.Errorf("node %s does not exist", dependency)
}
if from == dependency {
return fmt.Errorf("self references are not allowed")
}
fromNode.DependsOn[dependency] = struct{}{}
}

// Check if the graph is still a DAG
hasCycle, cycle := d.HasCycle()
hasCycle, cycle := d.hasCycle()
if hasCycle {
// Ehmmm, we have a cycle, let's remove the edge we just added
delete(fromNode.Edges, to)
for _, dependency := range dependencies {
delete(fromNode.DependsOn, dependency)
}
return &CycleError{
From: from,
to: to,
Cycle: cycle,
}
}

return nil
}

// TopologicalSort returns the vertexes of the graph, respecting topological ordering first,
// and preserving order of nodes within each "depth" of the topological ordering.
func (d *DirectedAcyclicGraph) TopologicalSort() ([]string, error) {
if cyclic, _ := d.HasCycle(); cyclic {
return nil, fmt.Errorf("graph has a cycle")
}

visited := make(map[string]bool)
var order []string

// Get a sorted list of all vertices
vertices := d.GetVertices()
// Make a list of vertices, sorted by Order
vertices := make([]*Vertex, 0, len(d.Vertices))
for _, vertex := range d.Vertices {
vertices = append(vertices, vertex)
}
sort.Slice(vertices, func(i, j int) bool {
return vertices[i].Order < vertices[j].Order
})

var dfs func(string)
dfs = func(node string) {
visited[node] = true
for len(visited) < len(vertices) {
progress := false

// Sort the neighbors to ensure deterministic order
neighbors := make([]string, 0, len(d.Vertices[node].Edges))
for neighbor := range d.Vertices[node].Edges {
neighbors = append(neighbors, neighbor)
}
sort.Strings(neighbors)
for _, vertex := range vertices {
if visited[vertex.ID] {
continue
}

for _, neighbor := range neighbors {
if !visited[neighbor] {
dfs(neighbor)
allDependenciesReady := true
for dep := range vertex.DependsOn {
if !visited[dep] {
allDependenciesReady = false
break
}
}
if !allDependenciesReady {
continue
}

order = append(order, vertex.ID)
visited[vertex.ID] = true
progress = true
}
order = append(order, node)
}

// Visit nodes in a deterministic order
for _, node := range vertices {
if !visited[node] {
dfs(node)
if !progress {
hasCycle, cycle := d.hasCycle()
if !hasCycle {
// Unexpected!
return nil, &CycleError{}
}
return nil, &CycleError{
Cycle: cycle,
}
}
}

return order, nil
}

// GetVertices returns the nodes in the graph in sorted alphabetical
// order.
func (d *DirectedAcyclicGraph) GetVertices() []string {
nodes := make([]string, 0, len(d.Vertices))
for node := range d.Vertices {
nodes = append(nodes, node)
}

// Ensure deterministic order. This is important for TopologicalSort
// to return a deterministic result.
sort.Strings(nodes)
return nodes
}

// GetEdges returns the edges in the graph in sorted order...
func (d *DirectedAcyclicGraph) GetEdges() [][2]string {
var edges [][2]string
for from, node := range d.Vertices {
for to := range node.Edges {
edges = append(edges, [2]string{from, to})
}
}
sort.Slice(edges, func(i, j int) bool {
// Sort by from node first
if edges[i][0] == edges[j][0] {
return edges[i][1] < edges[j][1]
}
return edges[i][0] < edges[j][0]
})
return edges
}

func (d *DirectedAcyclicGraph) HasCycle() (bool, []string) {
func (d *DirectedAcyclicGraph) hasCycle() (bool, []string) {
visited := make(map[string]bool)
recStack := make(map[string]bool)
var cyclePath []string
Expand All @@ -180,14 +188,14 @@ func (d *DirectedAcyclicGraph) HasCycle() (bool, []string) {
recStack[node] = true
cyclePath = append(cyclePath, node)

for neighbor := range d.Vertices[node].Edges {
if !visited[neighbor] {
if dfs(neighbor) {
for dependency := range d.Vertices[node].DependsOn {
if !visited[dependency] {
if dfs(dependency) {
return true
}
} else if recStack[neighbor] {
} else if recStack[dependency] {
// Found a cycle, add the closing node to complete the cycle
cyclePath = append(cyclePath, neighbor)
cyclePath = append(cyclePath, dependency)
return true
}
}
Expand Down
Loading

0 comments on commit b1c232f

Please sign in to comment.