Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrating drand interfaces into drand-cli codebase #5

Merged
merged 12 commits into from
Aug 13, 2024
26 changes: 13 additions & 13 deletions client/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"
"time"

"github.com/drand/drand/v2/common/client"
"github.com/drand/drand/v2/common/log"
"github.com/drand/go-clients/drand"
)

const (
Expand All @@ -24,7 +24,7 @@ const (
// is passed, a `watch` will be run on the watch client in the absence of external watchers,
// which will swap watching over to the main client. If no watch client is set and autowatch is off
// then a single watch will only run when an external watch is requested.
func newWatchAggregator(l log.Logger, c, wc client.Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator {
func newWatchAggregator(l log.Logger, c, wc drand.Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator {
if autoWatchRetry == 0 {
autoWatchRetry = defaultAutoWatchRetry
}
Expand All @@ -41,12 +41,12 @@ func newWatchAggregator(l log.Logger, c, wc client.Client, autoWatch bool, autoW

type subscriber struct {
ctx context.Context
c chan client.Result
c chan drand.Result
}

type watchAggregator struct {
client.Client
passiveClient client.Client
drand.Client
passiveClient drand.Client
autoWatch bool
autoWatchRetry time.Duration
log log.Logger
Expand Down Expand Up @@ -82,7 +82,7 @@ func (c *watchAggregator) startAutoWatch(full bool) {
c.cancelAutoWatch = cancel
go func() {
for {
var results <-chan client.Result
var results <-chan drand.Result
if full {
results = c.Watch(ctx)
} else if c.passiveClient != nil {
Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *watchAggregator) startAutoWatch(full bool) {

// passiveWatch is a degraded form of watch, where watch only hits the 'passive client'
// unless distribution is actually needed.
func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result {
func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan drand.Result {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

Expand All @@ -127,7 +127,7 @@ func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result
return nil
}

wc := make(chan client.Result)
wc := make(chan drand.Result)
if len(c.subscribers) == 0 {
ctx, cancel := context.WithCancel(ctx)
c.cancelPassive = cancel
Expand All @@ -139,11 +139,11 @@ func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result
return wc
}

func (c *watchAggregator) Watch(ctx context.Context) <-chan client.Result {
func (c *watchAggregator) Watch(ctx context.Context) <-chan drand.Result {
c.subscriberLock.Lock()
defer c.subscriberLock.Unlock()

sub := subscriber{ctx, make(chan client.Result, aggregatorWatchBuffer)}
sub := subscriber{ctx, make(chan drand.Result, aggregatorWatchBuffer)}
c.subscribers = append(c.subscribers, sub)

if len(c.subscribers) == 1 {
Expand All @@ -157,14 +157,14 @@ func (c *watchAggregator) Watch(ctx context.Context) <-chan client.Result {
return sub.c
}

func (c *watchAggregator) sink(in <-chan client.Result, out chan client.Result) {
func (c *watchAggregator) sink(in <-chan drand.Result, out chan drand.Result) {
defer close(out)
for range in {
continue
}
}

func (c *watchAggregator) distribute(in <-chan client.Result, cancel context.CancelFunc) {
func (c *watchAggregator) distribute(in <-chan drand.Result, cancel context.CancelFunc) {
defer cancel()
for {
c.subscriberLock.Lock()
Expand All @@ -176,7 +176,7 @@ func (c *watchAggregator) distribute(in <-chan client.Result, cancel context.Can
aCtx := c.subscribers[0].ctx
c.subscriberLock.Unlock()

var m client.Result
var m drand.Result
var ok bool

select {
Expand Down
8 changes: 4 additions & 4 deletions client/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"testing"
"time"

"github.com/drand/drand/v2/common/client"
"github.com/drand/drand/v2/common/log"
clientMock "github.com/drand/go-clients/client/mock"
"github.com/drand/go-clients/client/test/result/mock"
"github.com/drand/go-clients/drand"
)

func TestAggregatorClose(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)

c := &clientMock.Client{
WatchCh: make(chan client.Result),
WatchCh: make(chan drand.Result),
CloseF: func() error {
wg.Done()
return nil
Expand All @@ -38,15 +38,15 @@ func TestAggregatorPassive(t *testing.T) {
wg.Add(1)

c := &clientMock.Client{
WatchCh: make(chan client.Result, 1),
WatchCh: make(chan drand.Result, 1),
CloseF: func() error {
wg.Done()
return nil
},
}

wc := &clientMock.Client{
WatchCh: make(chan client.Result, 1),
WatchCh: make(chan drand.Result, 1),
CloseF: func() error {
return nil
},
Expand Down
27 changes: 14 additions & 13 deletions client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (

lru "github.com/hashicorp/golang-lru"

"github.com/drand/drand/v2/common/client"
"github.com/drand/go-clients/drand"

"github.com/drand/drand/v2/common/log"
)

// Cache provides a mechanism to check for rounds in the cache.
type Cache interface {
// TryGet provides a round beacon or nil if it is not cached.
TryGet(round uint64) client.Result
TryGet(round uint64) drand.Result
// Add adds an item to the cache
Add(uint64, client.Result)
Add(uint64, drand.Result)
}

// makeCache creates a cache of a given size
Expand All @@ -36,14 +37,14 @@ type typedCache struct {
}

// Add a result to the cache
func (t *typedCache) Add(round uint64, result client.Result) {
func (t *typedCache) Add(round uint64, result drand.Result) {
t.ARCCache.Add(round, result)
}

// TryGet attempts to get a result from the cache
func (t *typedCache) TryGet(round uint64) client.Result {
func (t *typedCache) TryGet(round uint64) drand.Result {
if val, ok := t.ARCCache.Get(round); ok {
return val.(client.Result)
return val.(drand.Result)
}
return nil
}
Expand All @@ -52,17 +53,17 @@ func (t *typedCache) TryGet(round uint64) client.Result {
type nilCache struct{}

// Add a result to the cache
func (*nilCache) Add(_ uint64, _ client.Result) {
func (*nilCache) Add(_ uint64, _ drand.Result) {
}

// TryGet attempts to get ar esult from the cache
func (*nilCache) TryGet(_ uint64) client.Result {
func (*nilCache) TryGet(_ uint64) drand.Result {
return nil
}

// NewCachingClient is a meta client that stores an LRU cache of
// recently fetched random values.
func NewCachingClient(l log.Logger, c client.Client, cache Cache) (client.Client, error) {
func NewCachingClient(l log.Logger, c drand.Client, cache Cache) (drand.Client, error) {
return &cachingClient{
Client: c,
cache: cache,
Expand All @@ -71,7 +72,7 @@ func NewCachingClient(l log.Logger, c client.Client, cache Cache) (client.Client
}

type cachingClient struct {
client.Client
drand.Client

cache Cache
log log.Logger
Expand All @@ -91,7 +92,7 @@ func (c *cachingClient) String() string {
}

// Get returns the randomness at `round` or an error.
func (c *cachingClient) Get(ctx context.Context, round uint64) (res client.Result, err error) {
func (c *cachingClient) Get(ctx context.Context, round uint64) (res drand.Result, err error) {
if val := c.cache.TryGet(round); val != nil {
return val, nil
}
Expand All @@ -102,9 +103,9 @@ func (c *cachingClient) Get(ctx context.Context, round uint64) (res client.Resul
return val, err
}

func (c *cachingClient) Watch(ctx context.Context) <-chan client.Result {
func (c *cachingClient) Watch(ctx context.Context) <-chan drand.Result {
in := c.Client.Watch(ctx)
out := make(chan client.Result)
out := make(chan drand.Result)
go func() {
for result := range in {
if ctx.Err() != nil {
Expand Down
6 changes: 3 additions & 3 deletions client/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"sync"
"testing"

"github.com/drand/drand/v2/common/client"
"github.com/drand/drand/v2/common/log"
clientMock "github.com/drand/go-clients/client/mock"
"github.com/drand/go-clients/client/test/result/mock"
"github.com/drand/go-clients/drand"
)

func TestCacheGet(t *testing.T) {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestCacheGetLatest(t *testing.T) {

func TestCacheWatch(t *testing.T) {
m := clientMock.ClientWithResults(2, 6)
rc := make(chan client.Result, 1)
rc := make(chan drand.Result, 1)
m.WatchCh = rc
arcCache, err := makeCache(3)
if err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestCacheClose(t *testing.T) {
wg.Add(1)

c := &clientMock.Client{
WatchCh: make(chan client.Result),
WatchCh: make(chan drand.Result),
CloseF: func() error {
wg.Done()
return nil
Expand Down
Loading