Skip to content

Commit

Permalink
feat: parallelize listener calls
Browse files Browse the repository at this point in the history
  • Loading branch information
yannickkirschen committed Jun 11, 2024
1 parent 430fa3c commit 42506de
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 64 deletions.
11 changes: 6 additions & 5 deletions example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ type MySpec struct {
Message string `yaml:"message" json:"message"`
}

func MyListener(action manifesto.Action, manifest *manifesto.Manifest) error {
func MyListener(_ *manifesto.Pool, action manifesto.Action, manifest manifesto.Manifest) {
if manifest.ApiVersion != "example.com/v1alpha1" || manifest.Kind != "MyManifest" {
log.Printf("Unknown API Version and kind: %s/%s", manifest.ApiVersion, manifest.Kind)
return nil
return
}

spec := manifest.Spec.(*MySpec)
Expand All @@ -27,18 +27,19 @@ func MyListener(action manifesto.Action, manifest *manifesto.Manifest) error {
case manifesto.Deleted:
fmt.Println("Deleted:", spec.Message)
}

return nil
}

func main() {
m1 := manifesto.ParseFile("example/my-manifest-1.yaml", &MySpec{}, &MySpec{})

pool := manifesto.CreatePool()
pool.Listen(MyListener)
pool.Apply(m1)
pool.Apply(*m1)
pool.Apply(*m1)
pool.ApplyPartial(MyListener, *m1)

m3, _ := pool.GetByKey(m1.CreateKey())
m3.Error("Houston, we have a problem!")
pool.Delete(m3.CreateKey())
pool.Wait()
}
101 changes: 74 additions & 27 deletions manifesto.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package manifesto

import (
"fmt"
"io"
"log"
"os"
"sync"

"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -52,8 +54,8 @@ type Manifest struct {
}

// CreateKey created a new ManifestKey based on the ApiVersion and Kind.
func (manifest *Manifest) CreateKey() ManifestKey {
return ManifestKey{manifest.ApiVersion, manifest.Kind, manifest.Metadata.Name}
func (manifest *Manifest) CreateKey() *ManifestKey {
return &ManifestKey{manifest.ApiVersion, manifest.Kind, manifest.Metadata.Name}
}

// Error adds an error message to the list of errors.
Expand Down Expand Up @@ -83,69 +85,109 @@ type Metadata struct {
}

// Listener is a function that is called when a manifest has been changed.
type Listener func(Action, *Manifest) error
type Listener func(*Pool, Action, Manifest)

// Pool holds all manifests and listeners.
type Pool struct {
manifests map[ManifestKey]*Manifest
manifests map[ManifestKey]Manifest
listeners []Listener
wg sync.WaitGroup
}

// CreatePool creates an empty Pool.
func CreatePool() *Pool {
return &Pool{make(map[ManifestKey]*Manifest), make([]Listener, 0)}
return &Pool{
manifests: make(map[ManifestKey]Manifest),
listeners: make([]Listener, 0),
}
}

// Listen add a listener to the pool.
func (pool *Pool) Listen(listener Listener) {
pool.listeners = append(pool.listeners, listener)
}

// Apply add or updates the manifest to or in the pool and calls all listeners.
func (pool *Pool) Apply(manifest *Manifest) []error {
// Apply adds or updates the manifest to or in the pool and calls all listeners.
// The manifest is transferred as value, not as reference. By doing so, we
// prevent race conditions.
func (pool *Pool) Apply(manifest Manifest) {
key := manifest.CreateKey()

errors := make([]error, 0)
if _, ok := pool.manifests[key]; ok {
pool.manifests[key] = manifest
if _, ok := pool.manifests[*key]; ok {
pool.manifests[*key] = manifest
for _, listener := range pool.listeners {
err := listener(Updated, manifest)
if err != nil {
errors = append(errors, err)
}
pool.apply(listener, Updated, &manifest)
}
} else {
pool.manifests[key] = manifest
pool.manifests[*key] = manifest
for _, listener := range pool.listeners {
pool.apply(listener, Created, &manifest)
}
}
}

// ApplyPartial adds or updates the manifest to or in the pool and calls all
// listeners except the specified one. This is meant to be used when a listener
// changes a manifest and should not be called again for that change (that could
// result in an endless loop). The manifest is transferred as value, not as
// reference. By doing so, we prevent race conditions.
func (pool *Pool) ApplyPartial(except Listener, manifest Manifest) {
key := manifest.CreateKey()
exceptName := fmt.Sprintf("%v", except)

if _, ok := pool.manifests[*key]; ok {
pool.manifests[*key] = manifest
for _, listener := range pool.listeners {
err := listener(Created, manifest)
if err != nil {
errors = append(errors, err)
if fmt.Sprintf("%v", listener) != exceptName {
pool.apply(listener, Updated, &manifest)
}
}
} else {
pool.manifests[*key] = manifest
for _, listener := range pool.listeners {
pool.apply(listener, Created, &manifest)
}
}
}

return errors
// ApplySilent adds or updates the manifest to or in the pool WITHOUT calling
// the listeners. This function is especially useful when using Manifesto just
// as a simple database without listeners.
func (pool *Pool) ApplySilent(manifest Manifest) {
pool.manifests[*manifest.CreateKey()] = manifest
}

func (pool *Pool) apply(listener Listener, action Action, manifest *Manifest) {
pool.wg.Add(1)
go func(listener Listener) {
defer pool.wg.Done()
listener(pool, action, *manifest)
}(listener)
}

// Delete deletes a manifest from the pool.
func (pool *Pool) Delete(key ManifestKey) {
if manifest, ok := pool.manifests[key]; ok {
delete(pool.manifests, key)
func (pool *Pool) Delete(key *ManifestKey) {
if manifest, ok := pool.manifests[*key]; ok {
delete(pool.manifests, *key)
for _, listener := range pool.listeners {
listener(Deleted, manifest)
pool.wg.Add(1)
go func(listener Listener) {
defer pool.wg.Done()
listener(pool, Deleted, manifest)
}(listener)
}
}
}

// GetByKey searches for a manifest and returns it.
func (pool *Pool) GetByKey(key ManifestKey) (*Manifest, bool) {
manifest, ok := pool.manifests[key]
func (pool *Pool) GetByKey(key *ManifestKey) (Manifest, bool) {
manifest, ok := pool.manifests[*key]
return manifest, ok
}

// Find goes through all existing manifests and filters for a testing function.
func (pool *Pool) Find(test func(*Manifest) bool) []*Manifest {
manifests := make([]*Manifest, 0)
func (pool *Pool) Find(test func(Manifest) bool) []Manifest {
manifests := make([]Manifest, 0)
for _, manifest := range pool.manifests {
if test(manifest) {
manifests = append(manifests, manifest)
Expand All @@ -154,6 +196,11 @@ func (pool *Pool) Find(test func(*Manifest) bool) []*Manifest {
return manifests
}

// Waits till all listeners have completed their work.
func (pool *Pool) Wait() {
pool.wg.Wait()
}

// ParseFile reads a JSON/YAML file and returns the parsed Manifest.
func ParseFile(filename string, spec any, status any) *Manifest {
content, err := os.ReadFile(filename)
Expand Down
62 changes: 30 additions & 32 deletions manifesto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,48 @@ func wantMyManifest(manifest *Manifest) bool {
return manifest.Metadata.Name == "my-manifest-1" && spec.Message == "hello, world"
}

func checkMyManifest(t *testing.T, manifest *Manifest, from string) {
if !wantMyManifest(manifest) {
t.Fatalf("Unable to parse manifest from %s", from)
}
}

type MySpec struct {
Message string `yaml:"message" json:"message"`
}

func TestParseFile(t *testing.T) {
manifest := ParseFile("example/my-manifest-1.yaml", &MySpec{}, &MySpec{})

if !wantMyManifest(manifest) {
t.Fatalf("Unable to parse manifest from file")
}
checkMyManifest(t, manifest, "file")
}

func TestParseReader(t *testing.T) {
r := io.NopCloser(strings.NewReader(myManifest))
manifest := ParseReader(r, &MySpec{}, &MySpec{})

if !wantMyManifest(manifest) {
t.Fatalf("Unable to parse manifest from reader")
}
checkMyManifest(t, manifest, "reader")
}

func TestParseString(t *testing.T) {
manifest := ParseString(myManifest, &MySpec{}, &MySpec{})

if !wantMyManifest(manifest) {
t.Fatalf("Unable to parse manifest from string")
}
checkMyManifest(t, manifest, "string")
}

func TestParseBytes(t *testing.T) {
b := []byte(myManifest)
manifest := ParseBytes(b, &MySpec{}, &MySpec{})

if !wantMyManifest(manifest) {
t.Fatalf("Unable to parse manifest from bytes")
}
checkMyManifest(t, manifest, "bytes")
}

func TestFind(t *testing.T) {
m1 := ParseFile("example/my-manifest-1.yaml", &MySpec{}, &MySpec{})
m2 := ParseFile("example/my-manifest-2.yaml", &MySpec{}, &MySpec{})

pool := CreatePool()
err1 := pool.Apply(m1)
err2 := pool.Apply(m2)

if len(err1) != 0 || len(err2) != 0 {
t.Fatalf("Unable to apply manifests: %s, %s", err1, err2)
}
pool.Apply(*m1)
pool.Apply(*m2)

manifests := pool.Find(
func(m *Manifest) bool {
func(m Manifest) bool {
spec := m.Spec.(*MySpec)
return m.ApiVersion == "example.com/v1alpha1" && m.Kind == "MyManifest" && strings.Contains(spec.Message, "world")
})
Expand All @@ -82,17 +72,10 @@ func TestFind(t *testing.T) {

func TestDelete(t *testing.T) {
manifest := ParseString(myManifest, &MySpec{}, &MySpec{})

if !wantMyManifest(manifest) {
t.Fatalf("Unable to parse manifest from string")
}
checkMyManifest(t, manifest, "string")

pool := CreatePool()
err := pool.Apply(manifest)

if len(err) != 0 {
t.Fatalf("Unable to apply manifest: %s", err)
}
pool.Apply(*manifest)

key := manifest.CreateKey()
_, ok := pool.GetByKey(key)
Expand All @@ -106,3 +89,18 @@ func TestDelete(t *testing.T) {
t.Fatalf("Manifest does exist after deletion")
}
}

func TestReferences(t *testing.T) {
m1 := ParseString(myManifest, &MySpec{}, &MySpec{})
checkMyManifest(t, m1, "string")

pool := CreatePool()
pool.Apply(*m1)

m2, _ := pool.GetByKey(m1.CreateKey())
m1.Metadata.Name = "new-name"

if m2.Metadata.Name != "my-manifest-1" {
t.Fatalf("Manifest name changed")
}
}

0 comments on commit 42506de

Please sign in to comment.