diff --git a/.golangci.yml b/.golangci.yml index ab2547f..2d92469 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -26,14 +26,9 @@ issues: - path: internal/lib linters: - forbidigo # we use Println in our UX - - path: internal/drand-cli - linters: - - forbidigo # we use Println in our UX - - goconst # we re-use some strings in our flags - path: client/http text: "unexported-return" -run: - skip-dirs: + exclude-dirs: - demo - test diff --git a/Makefile b/Makefile index 7134cea..5191f9e 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,10 @@ +.PHONY: drand-relay-gossip client-tool build clean + +build: drand-relay-gossip client-tool + +clean: + rm -f ./drand-relay-gossip ./drand-cli + drand-relay-gossip: go build -o drand-relay-gossip ./gossip-relay/main.go diff --git a/README.md b/README.md index 4a4af58..2d29eea 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,37 @@ This repo contains most notably: - a client CLI tool to fetch and verify drand beacons from the various available sources in your terminal - a gossipsub relay to relay drand beacons on gossipsub +# Migration from drand/drand + +Prior to drand V2 release, the drand client code lived in the drand/drand repo. Since its V2 release, the drand daemon code aims at being more minimalist and having as few dependencies as possible. +Most notably this meant removing the libp2p code that was exclusively used for Gossip relays and Gossip client, and also trimming down the amount of HTTP-related code. + +From now on, this repo is meant to provide the Go Client code to interact with drand, query drand beacons and verify them through either the HTTP or the Gossip relays. + +Note that drand does not provide public gRPC endpoints since ~2020, therefore the gRPC client code has been moved to the internal package of the relays (to allow relays to directly interface with a working daemon using gRPC). + +There are relatively few changes to the public APIs of the client code and simply using the `drand/go-clients/http` packages should be enough. +We recommend using `go-doc` to see the usage documentation and examples. + +## Most notable changes from the drand/drand V1 APIs + +The `Result` interface now follows the Protobuf getter format: +``` +Result.Round() -> Result.GetRound() +Result.Randomness() -> Result.GetRandomness() +Result.Signature() -> Result.GetSignature() +Result.PreviousSignature() -> Result.GetPreviousSignature() +``` +meaning `PublicRandResponse` now satisfies directly the `Result` interface. + +The HTTP client now returns a concrete type and doesn't need to be cast to a HTTP client to use e.g. `SetUserAgent`. + +The client option `WithVerifiedResult` was renamed `WithTrustedResult`, to properly convey its function. + +Note also that among other packages you might be using in the `github.com/drand/drand/v2` packages, +the `crypto.GetSchemeByIDWithDefault` function was renamed `crypto.GetSchemeByID`; +and the `Beacon` struct now lives in the `github.com/drand/drand/v2/common` package rather than in the `chain` one. + --- ### License diff --git a/client/aggregator.go b/client/aggregator.go index f4cbc5c..1e03014 100644 --- a/client/aggregator.go +++ b/client/aggregator.go @@ -6,8 +6,8 @@ import ( "sync" "time" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" + "github.com/drand/go-clients/drand" ) const ( @@ -24,7 +24,7 @@ const ( // is passed, a `watch` will be run on the watch client in the absence of external watchers, // which will swap watching over to the main client. If no watch client is set and autowatch is off // then a single watch will only run when an external watch is requested. -func newWatchAggregator(l log.Logger, c, wc client.Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator { +func newWatchAggregator(l log.Logger, c, wc drand.Client, autoWatch bool, autoWatchRetry time.Duration) *watchAggregator { if autoWatchRetry == 0 { autoWatchRetry = defaultAutoWatchRetry } @@ -41,12 +41,12 @@ func newWatchAggregator(l log.Logger, c, wc client.Client, autoWatch bool, autoW type subscriber struct { ctx context.Context - c chan client.Result + c chan drand.Result } type watchAggregator struct { - client.Client - passiveClient client.Client + drand.Client + passiveClient drand.Client autoWatch bool autoWatchRetry time.Duration log log.Logger @@ -82,7 +82,7 @@ func (c *watchAggregator) startAutoWatch(full bool) { c.cancelAutoWatch = cancel go func() { for { - var results <-chan client.Result + var results <-chan drand.Result if full { results = c.Watch(ctx) } else if c.passiveClient != nil { @@ -118,7 +118,7 @@ func (c *watchAggregator) startAutoWatch(full bool) { // passiveWatch is a degraded form of watch, where watch only hits the 'passive client' // unless distribution is actually needed. -func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result { +func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan drand.Result { c.subscriberLock.Lock() defer c.subscriberLock.Unlock() @@ -127,7 +127,7 @@ func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result return nil } - wc := make(chan client.Result) + wc := make(chan drand.Result) if len(c.subscribers) == 0 { ctx, cancel := context.WithCancel(ctx) c.cancelPassive = cancel @@ -139,11 +139,11 @@ func (c *watchAggregator) passiveWatch(ctx context.Context) <-chan client.Result return wc } -func (c *watchAggregator) Watch(ctx context.Context) <-chan client.Result { +func (c *watchAggregator) Watch(ctx context.Context) <-chan drand.Result { c.subscriberLock.Lock() defer c.subscriberLock.Unlock() - sub := subscriber{ctx, make(chan client.Result, aggregatorWatchBuffer)} + sub := subscriber{ctx, make(chan drand.Result, aggregatorWatchBuffer)} c.subscribers = append(c.subscribers, sub) if len(c.subscribers) == 1 { @@ -157,14 +157,14 @@ func (c *watchAggregator) Watch(ctx context.Context) <-chan client.Result { return sub.c } -func (c *watchAggregator) sink(in <-chan client.Result, out chan client.Result) { +func (c *watchAggregator) sink(in <-chan drand.Result, out chan drand.Result) { defer close(out) for range in { continue } } -func (c *watchAggregator) distribute(in <-chan client.Result, cancel context.CancelFunc) { +func (c *watchAggregator) distribute(in <-chan drand.Result, cancel context.CancelFunc) { defer cancel() for { c.subscriberLock.Lock() @@ -176,7 +176,7 @@ func (c *watchAggregator) distribute(in <-chan client.Result, cancel context.Can aCtx := c.subscribers[0].ctx c.subscriberLock.Unlock() - var m client.Result + var m drand.Result var ok bool select { diff --git a/client/aggregator_test.go b/client/aggregator_test.go index 7853240..fbf0f0f 100644 --- a/client/aggregator_test.go +++ b/client/aggregator_test.go @@ -5,10 +5,10 @@ import ( "testing" "time" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" clientMock "github.com/drand/go-clients/client/mock" "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" ) func TestAggregatorClose(t *testing.T) { @@ -16,7 +16,7 @@ func TestAggregatorClose(t *testing.T) { wg.Add(1) c := &clientMock.Client{ - WatchCh: make(chan client.Result), + WatchCh: make(chan drand.Result), CloseF: func() error { wg.Done() return nil @@ -38,7 +38,7 @@ func TestAggregatorPassive(t *testing.T) { wg.Add(1) c := &clientMock.Client{ - WatchCh: make(chan client.Result, 1), + WatchCh: make(chan drand.Result, 1), CloseF: func() error { wg.Done() return nil @@ -46,7 +46,7 @@ func TestAggregatorPassive(t *testing.T) { } wc := &clientMock.Client{ - WatchCh: make(chan client.Result, 1), + WatchCh: make(chan drand.Result, 1), CloseF: func() error { return nil }, diff --git a/client/cache.go b/client/cache.go index 4e04993..e001758 100644 --- a/client/cache.go +++ b/client/cache.go @@ -6,16 +6,17 @@ import ( lru "github.com/hashicorp/golang-lru" - "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/log" ) // Cache provides a mechanism to check for rounds in the cache. type Cache interface { // TryGet provides a round beacon or nil if it is not cached. - TryGet(round uint64) client.Result + TryGet(round uint64) drand.Result // Add adds an item to the cache - Add(uint64, client.Result) + Add(uint64, drand.Result) } // makeCache creates a cache of a given size @@ -36,14 +37,14 @@ type typedCache struct { } // Add a result to the cache -func (t *typedCache) Add(round uint64, result client.Result) { +func (t *typedCache) Add(round uint64, result drand.Result) { t.ARCCache.Add(round, result) } // TryGet attempts to get a result from the cache -func (t *typedCache) TryGet(round uint64) client.Result { +func (t *typedCache) TryGet(round uint64) drand.Result { if val, ok := t.ARCCache.Get(round); ok { - return val.(client.Result) + return val.(drand.Result) } return nil } @@ -52,17 +53,17 @@ func (t *typedCache) TryGet(round uint64) client.Result { type nilCache struct{} // Add a result to the cache -func (*nilCache) Add(_ uint64, _ client.Result) { +func (*nilCache) Add(_ uint64, _ drand.Result) { } // TryGet attempts to get ar esult from the cache -func (*nilCache) TryGet(_ uint64) client.Result { +func (*nilCache) TryGet(_ uint64) drand.Result { return nil } // NewCachingClient is a meta client that stores an LRU cache of // recently fetched random values. -func NewCachingClient(l log.Logger, c client.Client, cache Cache) (client.Client, error) { +func NewCachingClient(l log.Logger, c drand.Client, cache Cache) (drand.Client, error) { return &cachingClient{ Client: c, cache: cache, @@ -71,7 +72,7 @@ func NewCachingClient(l log.Logger, c client.Client, cache Cache) (client.Client } type cachingClient struct { - client.Client + drand.Client cache Cache log log.Logger @@ -91,7 +92,7 @@ func (c *cachingClient) String() string { } // Get returns the randomness at `round` or an error. -func (c *cachingClient) Get(ctx context.Context, round uint64) (res client.Result, err error) { +func (c *cachingClient) Get(ctx context.Context, round uint64) (res drand.Result, err error) { if val := c.cache.TryGet(round); val != nil { return val, nil } @@ -102,9 +103,9 @@ func (c *cachingClient) Get(ctx context.Context, round uint64) (res client.Resul return val, err } -func (c *cachingClient) Watch(ctx context.Context) <-chan client.Result { +func (c *cachingClient) Watch(ctx context.Context) <-chan drand.Result { in := c.Client.Watch(ctx) - out := make(chan client.Result) + out := make(chan drand.Result) go func() { for result := range in { if ctx.Err() != nil { diff --git a/client/cache_test.go b/client/cache_test.go index bbf9a23..d7c269c 100644 --- a/client/cache_test.go +++ b/client/cache_test.go @@ -5,10 +5,10 @@ import ( "sync" "testing" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" clientMock "github.com/drand/go-clients/client/mock" "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" ) func TestCacheGet(t *testing.T) { @@ -79,7 +79,7 @@ func TestCacheGetLatest(t *testing.T) { func TestCacheWatch(t *testing.T) { m := clientMock.ClientWithResults(2, 6) - rc := make(chan client.Result, 1) + rc := make(chan drand.Result, 1) m.WatchCh = rc arcCache, err := makeCache(3) if err != nil { @@ -111,7 +111,7 @@ func TestCacheClose(t *testing.T) { wg.Add(1) c := &clientMock.Client{ - WatchCh: make(chan client.Result), + WatchCh: make(chan drand.Result), CloseF: func() error { wg.Done() return nil diff --git a/client/client.go b/client/client.go index 8e37187..211b79f 100644 --- a/client/client.go +++ b/client/client.go @@ -9,19 +9,22 @@ import ( "github.com/prometheus/client_golang/prometheus" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" ) -const clientStartupTimeoutDefault = time.Second * 5 +const ClientStartupTimeout = time.Second * 5 -// New creates a client with specified configuration. -func New(ctx context.Context, l log.Logger, options ...Option) (client.Client, error) { +// New creates a watcher, verifying, optimizing client with the specified options. +// It expects to be provided at least 1 valid client, or more using From(). +// If not specified, a default context with a timeout of ClientStartupTimeout +// will be used when fetching chain information during client setup. +func New(options ...Option) (drand.Client, error) { cfg := clientConfig{ cacheSize: 32, - log: l, } for _, opt := range options { @@ -29,23 +32,33 @@ func New(ctx context.Context, l log.Logger, options ...Option) (client.Client, e return nil, err } } - return makeClient(ctx, l, &cfg) + if cfg.log == nil { + cfg.log = log.DefaultLogger() + } + if cfg.setupCtx == nil { + ctx, cancel := context.WithTimeout(context.Background(), ClientStartupTimeout) + cfg.setupCtx = ctx + defer cancel() + } + return makeClient(&cfg) } // Wrap provides a single entrypoint for wrapping a concrete client -// implementation with configured aggregation, caching, and retry logic -func Wrap(ctx context.Context, l log.Logger, clients []client.Client, options ...Option) (client.Client, error) { - return New(ctx, l, append(options, From(clients...))...) +// implementation with configured aggregation, caching, and retry logic. +// It calls New and has the same expectations. +func Wrap(clients []drand.Client, options ...Option) (drand.Client, error) { + return New(append(options, From(clients...))...) } -func trySetLog(c client.Client, l log.Logger) { - if lc, ok := c.(client.LoggingClient); ok { +func trySetLog(c any, l log.Logger) { + if lc, ok := c.(drand.LoggingClient); ok { lc.SetLog(l) } } -// makeClient creates a client from a configuration. -func makeClient(ctx context.Context, l log.Logger, cfg *clientConfig) (client.Client, error) { +// makeClient creates a watching verifying optimizing client from a configuration. +func makeClient(cfg *clientConfig) (drand.Client, error) { + l := cfg.log if !cfg.insecure && cfg.chainHash == nil && cfg.chainInfo == nil { l.Errorw("no root of trust specified") return nil, errors.New("no root of trust specified") @@ -64,12 +77,12 @@ func makeClient(ctx context.Context, l log.Logger, cfg *clientConfig) (client.Cl } // try to populate chain info - if err := cfg.tryPopulateInfo(ctx, cfg.clients...); err != nil { + if err := cfg.tryPopulateInfo(cfg.setupCtx, cfg.clients...); err != nil { return nil, err } // provision watcher client - var wc client.Client + var wc drand.Client if cfg.watcher != nil { wc, err = makeWatcherClient(cfg, cache) if err != nil { @@ -82,9 +95,9 @@ func makeClient(ctx context.Context, l log.Logger, cfg *clientConfig) (client.Cl trySetLog(c, cfg.log) } - var c client.Client + var c drand.Client - verifiers := make([]client.Client, 0, len(cfg.clients)) + verifiers := make([]drand.Client, 0, len(cfg.clients)) for _, source := range cfg.clients { sch, err := crypto.GetSchemeByID(cfg.chainInfo.Scheme) if err != nil { @@ -113,7 +126,7 @@ func makeClient(ctx context.Context, l log.Logger, cfg *clientConfig) (client.Cl } //nolint:lll // This function has nicely named parameters, so it's long. -func makeOptimizingClient(l log.Logger, cfg *clientConfig, verifiers []client.Client, watcher client.Client, cache Cache) (client.Client, error) { +func makeOptimizingClient(l log.Logger, cfg *clientConfig, verifiers []drand.Client, watcher drand.Client, cache Cache) (drand.Client, error) { oc, err := newOptimizingClient(l, verifiers, 0, 0, 0, 0) if err != nil { return nil, err @@ -121,7 +134,7 @@ func makeOptimizingClient(l log.Logger, cfg *clientConfig, verifiers []client.Cl if watcher != nil { oc.MarkPassive(watcher) } - c := client.Client(oc) + c := drand.Client(oc) trySetLog(c, cfg.log) if cfg.cacheSize > 0 { @@ -140,12 +153,12 @@ func makeOptimizingClient(l log.Logger, cfg *clientConfig, verifiers []client.Cl return c, nil } -func makeWatcherClient(cfg *clientConfig, cache Cache) (client.Client, error) { +func makeWatcherClient(cfg *clientConfig, cache Cache) (drand.Client, error) { if cfg.chainInfo == nil { return nil, fmt.Errorf("chain info cannot be nil") } - w, err := cfg.watcher(cfg.chainInfo, cache) + w, err := cfg.watcher(cfg.log, cfg.chainInfo, cache) if err != nil { return nil, err } @@ -155,7 +168,7 @@ func makeWatcherClient(cfg *clientConfig, cache Cache) (client.Client, error) { type clientConfig struct { // clients is the set of options for fetching randomness - clients []client.Client + clients []drand.Client // watcher is a constructor function for generating a new partial client of randomness watcher WatcherCtor // from `chainInfo.Hash()` - serves as a root of trust for a given @@ -164,7 +177,7 @@ type clientConfig struct { // Full chain information - serves as a root of trust. chainInfo *chain.Info // A previously fetched result serving as a verification checkpoint if one exists. - previousResult client.Result + previousResult drand.Result // chain signature verification back to the 1st round, or to a know result to ensure // determinism in the event of a compromised chain. fullVerify bool @@ -178,6 +191,9 @@ type clientConfig struct { // customized client log. log log.Logger + // only used during setup to try and fetch chain info if chain info is nil + setupCtx context.Context + // autoWatchRetry specifies the time after which the watch channel // created by the autoWatch is re-opened when no context error occurred. autoWatchRetry time.Duration @@ -185,20 +201,16 @@ type clientConfig struct { prometheus prometheus.Registerer } -func (c *clientConfig) tryPopulateInfo(ctx context.Context, clients ...client.Client) (err error) { +func (c *clientConfig) tryPopulateInfo(ctx context.Context, clients ...drand.Client) (err error) { if c.chainInfo == nil { - ctx, cancel := context.WithTimeout(ctx, clientStartupTimeoutDefault) - defer cancel() - + var cerr error for _, cli := range clients { - c.chainInfo, err = cli.Info(ctx) - if err == nil { + c.chainInfo, cerr = cli.Info(ctx) + if cerr == nil { return } - - if ctx.Err() != nil { - return ctx.Err() - } + // we accumulate errors to try all clients even if the first one fails + err = errors.Join(err, cerr, ctx.Err()) } } return @@ -208,7 +220,7 @@ func (c *clientConfig) tryPopulateInfo(ctx context.Context, clients ...client.Cl type Option func(cfg *clientConfig) error // From constructs the client from a set of clients providing randomness -func From(c ...client.Client) Option { +func From(c ...drand.Client) Option { return func(cfg *clientConfig) error { cfg.clients = c return nil @@ -246,7 +258,8 @@ func WithChainHash(chainHash []byte) Option { } // WithChainInfo configures the client to root trust in the given randomness -// chain information +// chain information, this prevents the setup of the client from attempting to +// fetch the chain info through the clients from the remotes. func WithChainInfo(chainInfo *chain.Info) Option { return func(cfg *clientConfig) error { if cfg.chainHash != nil && !bytes.Equal(cfg.chainHash, chainInfo.Hash()) { @@ -257,10 +270,32 @@ func WithChainInfo(chainInfo *chain.Info) Option { } } -// WithVerifiedResult provides a checkpoint of randomness verified at a given round. +// WithLogger overrides the logging options for the client, +// allowing specification of additional tags, or redirection / configuration +// of logging level and output. If it is not used to set a specific logger, +// the client's logger will be used. This only works for clients that satisfy +// the drand.LoggingClient interface. +func WithLogger(l log.Logger) Option { + return func(cfg *clientConfig) error { + cfg.log = l + return nil + } +} + +// WithSetupCtx allows you to provide a custom setup context that will be used +// if WithChainInfo isn't used and the client setup has to try and fetch the +// ChainInfo from the remotes. +func WithSetupCtx(ctx context.Context) Option { + return func(cfg *clientConfig) error { + cfg.setupCtx = ctx + return nil + } +} + +// WithTrustedResult provides a checkpoint of randomness verified at a given round. // Used in combination with `VerifyFullChain`, this allows for catching up only on -// previously not-yet-verified results. -func WithVerifiedResult(result client.Result) Option { +// previously not-yet-verified results. Note that in general this is not something you need. +func WithTrustedResult(result drand.Result) Option { return func(cfg *clientConfig) error { if cfg.previousResult != nil && cfg.previousResult.GetRound() > result.GetRound() { return errors.New("refusing to override verified result with an earlier result") @@ -270,12 +305,13 @@ func WithVerifiedResult(result client.Result) Option { } } -// WithFullChainVerification validates random beacons not just as being generated correctly -// from the group signature, but ensures that the full chain is deterministic by making sure +// WithFullChainVerification validates random beacons using the chained schemes are +// not just as being generated correctly from the group signature, +// but ensures that the full chain is deterministic by making sure // each round is derived correctly from the previous one. In cases of compromise where // a single party learns sufficient shares to derive the full key, malicious randomness // could otherwise be generated that is signed, but not properly derived from previous rounds -// according to protocol. +// according to protocol. Note that in general this is not something you need. func WithFullChainVerification() Option { return func(cfg *clientConfig) error { cfg.fullVerify = true @@ -285,11 +321,11 @@ func WithFullChainVerification() Option { // Watcher supplies the `Watch` portion of the drand client interface. type Watcher interface { - Watch(ctx context.Context) <-chan client.Result + Watch(ctx context.Context) <-chan drand.Result } // WatcherCtor creates a Watcher once chain info is known. -type WatcherCtor func(chainInfo *chain.Info, cache Cache) (Watcher, error) +type WatcherCtor func(l log.Logger, chainInfo *chain.Info, cache Cache) (Watcher, error) // WithWatcher specifies a channel that can provide notifications of new // randomness bootstrappeed from the chain info. diff --git a/client/client_test.go b/client/client_test.go index 330703a..2ce96c0 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -6,34 +6,32 @@ import ( "testing" "time" - "github.com/drand/drand/v2/common/key" - "github.com/drand/drand/v2/common/log" - clock "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" + "github.com/drand/drand/v2/common/key" + "github.com/drand/drand/v2/common/log" + clientMock "github.com/drand/go-clients/client/mock" + "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/crypto" - client2 "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" "github.com/drand/go-clients/client/http" - clientMock "github.com/drand/go-clients/client/mock" httpmock "github.com/drand/go-clients/client/test/http/mock" - "github.com/drand/go-clients/client/test/result/mock" ) func TestClientConstraints(t *testing.T) { - ctx := context.Background() - lg := log.New(nil, log.DebugLevel, true) - if _, e := client2.New(ctx, lg); e == nil { + if _, e := client.New(); e == nil { t.Fatal("client can't be created without root of trust") } - if _, e := client2.New(ctx, lg, client2.WithChainHash([]byte{0})); e == nil { + if _, e := client.New(client.WithChainHash([]byte{0})); e == nil { t.Fatal("Client needs URLs if only a chain hash is specified") } - if _, e := client2.New(ctx, lg, client2.From(clientMock.ClientWithResults(0, 5))); e == nil { + if _, e := client.New(client.From(clientMock.ClientWithResults(0, 5))); e == nil { t.Fatal("Client needs root of trust unless insecure specified explicitly") } @@ -41,7 +39,7 @@ func TestClientConstraints(t *testing.T) { // As we will run is insecurely, we will set chain info so client can fetch it c.OptionalInfo = fakeChainInfo(t) - if _, e := client2.New(ctx, lg, client2.From(c), client2.Insecurely()); e != nil { + if _, e := client.New(client.From(c), client.Insecurely()); e != nil { t.Fatal(e) } } @@ -69,12 +67,7 @@ func TestClientMultiple(t *testing.T) { return } - var c client.Client - var e error - c, e = client2.New(ctx, - lg, - client2.From(httpClients...), - client2.WithChainHash(chainInfo.Hash())) + c, e := client.New(client.From(httpClients...), client.WithChainHash(chainInfo.Hash())) if e != nil { t.Fatal(e) @@ -99,8 +92,7 @@ func TestClientWithChainInfo(t *testing.T) { lg := log.New(nil, log.DebugLevel, true) hc, err := http.NewWithInfo(lg, "http://nxdomain.local/", chainInfo, nil) require.NoError(t, err) - c, err := client2.New(ctx, lg, client2.WithChainInfo(chainInfo), - client2.From(hc)) + c, err := client.New(client.WithChainInfo(chainInfo), client.From(hc)) if err != nil { t.Fatal("existing group creation shouldn't do additional validaiton.") } @@ -126,10 +118,11 @@ func TestClientCache(t *testing.T) { return } - var c client.Client - var e error - c, e = client2.New(ctx, lg, client2.From(httpClients...), - client2.WithChainHash(chainInfo.Hash()), client2.WithCacheSize(1)) + c, e := client.New( + client.From(httpClients...), + client.WithChainHash(chainInfo.Hash()), + client.WithCacheSize(1), + ) if e != nil { t.Fatal(e) @@ -166,26 +159,19 @@ func TestClientWithoutCache(t *testing.T) { return } - var c client.Client - var e error - c, e = client2.New(ctx, - lg, - client2.From(httpClients...), - client2.WithChainHash(chainInfo.Hash()), - client2.WithCacheSize(0)) + c, err := client.New( + client.From(httpClients...), + client.WithChainHash(chainInfo.Hash()), + client.WithCacheSize(0)) - if e != nil { - t.Fatal(e) - } - _, e = c.Get(ctx, 0) - if e != nil { - t.Fatal(e) - } + require.NoError(t, err) + + _, err = c.Get(ctx, 0) + require.NoError(t, err) cancel() - _, e = c.Get(ctx, 0) - if e == nil { - t.Fatal("cache should be disabled.") - } + _, err = c.Get(ctx, 0) + require.Error(t, err) + _ = c.Close() } @@ -196,21 +182,20 @@ func TestClientWithWatcher(t *testing.T) { require.NoError(t, err) info, results := mock.VerifiableResults(2, sch) - ch := make(chan client.Result, len(results)) + ch := make(chan drand.Result, len(results)) for i := range results { ch <- &results[i] } close(ch) - watcherCtor := func(chainInfo *chain.Info, _ client2.Cache) (client2.Watcher, error) { + watcherCtor := func(l log.Logger, chainInfo *chain.Info, _ client.Cache) (client.Watcher, error) { return &clientMock.Client{WatchCh: ch}, nil } - var c client.Client - c, err = client2.New(ctx, - lg, - client2.WithChainInfo(info), - client2.WithWatcher(watcherCtor), + var c drand.Client + c, err = client.New(client.WithLogger(lg), + client.WithChainInfo(info), + client.WithWatcher(watcherCtor), ) require.NoError(t, err) @@ -227,18 +212,15 @@ func TestClientWithWatcher(t *testing.T) { } func TestClientWithWatcherCtorError(t *testing.T) { - ctx := context.Background() - lg := log.New(nil, log.DebugLevel, true) watcherErr := errors.New("boom") - watcherCtor := func(chainInfo *chain.Info, _ client2.Cache) (client2.Watcher, error) { + watcherCtor := func(l log.Logger, chainInfo *chain.Info, _ client.Cache) (client.Watcher, error) { return nil, watcherErr } // constructor should return error returned by watcherCtor - _, err := client2.New(ctx, - lg, - client2.WithChainInfo(fakeChainInfo(t)), - client2.WithWatcher(watcherCtor), + _, err := client.New( + client.WithChainInfo(fakeChainInfo(t)), + client.WithWatcher(watcherCtor), ) if !errors.Is(err, watcherErr) { t.Fatal(err) @@ -246,15 +228,13 @@ func TestClientWithWatcherCtorError(t *testing.T) { } func TestClientChainHashOverrideError(t *testing.T) { - ctx := context.Background() lg := log.New(nil, log.DebugLevel, true) chainInfo := fakeChainInfo(t) - _, err := client2.Wrap( - ctx, - lg, - []client.Client{client2.EmptyClientWithInfo(chainInfo)}, - client2.WithChainInfo(chainInfo), - client2.WithChainHash(fakeChainInfo(t).Hash()), + _, err := client.Wrap( + []drand.Client{client.EmptyClientWithInfo(chainInfo)}, + client.WithChainInfo(chainInfo), + client.WithChainHash(fakeChainInfo(t).Hash()), + client.WithLogger(lg), ) if err == nil { t.Fatal("expected error, received no error") @@ -265,15 +245,13 @@ func TestClientChainHashOverrideError(t *testing.T) { } func TestClientChainInfoOverrideError(t *testing.T) { - ctx := context.Background() lg := log.New(nil, log.DebugLevel, true) chainInfo := fakeChainInfo(t) - _, err := client2.Wrap( - ctx, - lg, - []client.Client{client2.EmptyClientWithInfo(chainInfo)}, - client2.WithChainHash(chainInfo.Hash()), - client2.WithChainInfo(fakeChainInfo(t)), + _, err := client.Wrap( + []drand.Client{client.EmptyClientWithInfo(chainInfo)}, + client.WithChainHash(chainInfo.Hash()), + client.WithChainInfo(fakeChainInfo(t)), + client.WithLogger(lg), ) if err == nil { t.Fatal("expected error, received no error") @@ -300,25 +278,24 @@ func TestClientAutoWatch(t *testing.T) { r1, _ := httpClient[0].Get(ctx, 1) r2, _ := httpClient[0].Get(ctx, 2) - results := []client.Result{r1, r2} + results := []drand.Result{r1, r2} - ch := make(chan client.Result, len(results)) + ch := make(chan drand.Result, len(results)) for i := range results { ch <- results[i] } close(ch) - watcherCtor := func(chainInfo *chain.Info, _ client2.Cache) (client2.Watcher, error) { + watcherCtor := func(l log.Logger, chainInfo *chain.Info, _ client.Cache) (client.Watcher, error) { return &clientMock.Client{WatchCh: ch}, nil } - var c client.Client - c, err = client2.New(ctx, - lg, - client2.From(clientMock.ClientWithInfo(chainInfo)), - client2.WithChainHash(chainInfo.Hash()), - client2.WithWatcher(watcherCtor), - client2.WithAutoWatch(), + var c drand.Client + c, err = client.New( + client.From(clientMock.ClientWithInfo(chainInfo)), + client.WithChainHash(chainInfo.Hash()), + client.WithWatcher(watcherCtor), + client.WithAutoWatch(), ) if err != nil { @@ -337,19 +314,18 @@ func TestClientAutoWatch(t *testing.T) { func TestClientAutoWatchRetry(t *testing.T) { ctx := context.Background() - lg := log.New(nil, log.DebugLevel, true) sch, err := crypto.GetSchemeFromEnv() require.NoError(t, err) info, results := mock.VerifiableResults(5, sch) - resC := make(chan client.Result) + resC := make(chan drand.Result) defer close(resC) // done is closed after all resuls have been written to resC done := make(chan struct{}) // Returns a channel that yields the verifiable results above - watchF := func(ctx context.Context) <-chan client.Result { + watchF := func(ctx context.Context) <-chan drand.Result { go func() { for i := 0; i < len(results); i++ { select { @@ -366,9 +342,9 @@ func TestClientAutoWatchRetry(t *testing.T) { var failer clientMock.Client failer = clientMock.Client{ - WatchF: func(ctx context.Context) <-chan client.Result { + WatchF: func(ctx context.Context) <-chan drand.Result { // First call returns a closed channel - ch := make(chan client.Result) + ch := make(chan drand.Result) close(ch) // Second call returns a channel that writes results failer.WatchF = watchF @@ -376,14 +352,13 @@ func TestClientAutoWatchRetry(t *testing.T) { }, } - var c client.Client - c, err = client2.New(ctx, - lg, - client2.From(&failer, clientMock.ClientWithInfo(info)), - client2.WithChainInfo(info), - client2.WithAutoWatch(), - client2.WithAutoWatchRetry(time.Second), - client2.WithCacheSize(len(results)), + var c drand.Client + c, err = client.New( + client.From(&failer, clientMock.ClientWithInfo(info)), + client.WithChainInfo(info), + client.WithAutoWatch(), + client.WithAutoWatchRetry(time.Second), + client.WithCacheSize(len(results)), ) if err != nil { @@ -409,7 +384,7 @@ func TestClientAutoWatchRetry(t *testing.T) { } // compareResults asserts that two results are the same. -func compareResults(t *testing.T, expected, actual client.Result) { +func compareResults(t *testing.T, expected, actual drand.Result) { t.Helper() require.NotNil(t, expected) diff --git a/client/doc.go b/client/doc.go index c03aca0..801b8f8 100644 --- a/client/doc.go +++ b/client/doc.go @@ -3,36 +3,6 @@ Package client provides transport-agnostic logic to retrieve and verify randomness from drand, including retry, validation, caching and optimization features. -Example: - - package main - - import ( - "context" - "encoding/hex" - "fmt" - - "github.com/drand/drand/v2/client" - "github.com/drand/drand/v2/common/log" - ) - - var chainHash, _ = hex.DecodeString("8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce") - - func main() { - ctx := context.Background() - lg := log.New(nil, log.DebugLevel, true) - - c, err := client.New(ctx, lg, - client.From("..."), // see concrete client implementations - client.WithChainHash(chainHash), - ) - - // e.g. use the client to get the latest randomness round: - r, err := c.Get(ctx, 0) - - fmt.Println(r.Round(), r.Randomness()) - } - The "From" option allows you to specify clients that work over particular transports. HTTP, gRPC and libp2p PubSub clients are provided as subpackages https://pkg.go.dev/github.com/drand/go-clients/internal/client/http, diff --git a/client/empty.go b/client/empty.go index a920d14..d2ec0d3 100644 --- a/client/empty.go +++ b/client/empty.go @@ -5,26 +5,26 @@ import ( "time" "github.com/drand/drand/v2/common" - chain2 "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" + "github.com/drand/drand/v2/common/chain" + "github.com/drand/go-clients/drand" ) const emptyClientStringerValue = "EmptyClient" // EmptyClientWithInfo makes a client that returns the given info but no randomness -func EmptyClientWithInfo(info *chain2.Info) client.Client { +func EmptyClientWithInfo(info *chain.Info) drand.Client { return &emptyClient{info} } type emptyClient struct { - i *chain2.Info + i *chain.Info } func (m *emptyClient) String() string { return emptyClientStringerValue } -func (m *emptyClient) Info(_ context.Context) (*chain2.Info, error) { +func (m *emptyClient) Info(_ context.Context) (*chain.Info, error) { return m.i, nil } @@ -32,12 +32,12 @@ func (m *emptyClient) RoundAt(t time.Time) uint64 { return common.CurrentRound(t.Unix(), m.i.Period, m.i.GenesisTime) } -func (m *emptyClient) Get(_ context.Context, _ uint64) (client.Result, error) { - return nil, common.ErrEmptyClientUnsupportedGet +func (m *emptyClient) Get(_ context.Context, _ uint64) (drand.Result, error) { + return nil, drand.ErrEmptyClientUnsupportedGet } -func (m *emptyClient) Watch(_ context.Context) <-chan client.Result { - ch := make(chan client.Result, 1) +func (m *emptyClient) Watch(_ context.Context) <-chan drand.Result { + ch := make(chan drand.Result, 1) close(ch) return ch } diff --git a/client/empty_test.go b/client/empty_test.go index 8b672a8..9d77f8f 100644 --- a/client/empty_test.go +++ b/client/empty_test.go @@ -2,13 +2,13 @@ package client import ( "context" + "errors" "fmt" "testing" "time" commonutils "github.com/drand/drand/v2/common" - - chain2 "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" ) func TestEmptyClient(t *testing.T) { @@ -45,7 +45,7 @@ func TestEmptyClient(t *testing.T) { if err == nil { t.Fatal("expected an error") } - if err.Error() != "not supported" { + if !errors.Is(err, drand.ErrEmptyClientUnsupportedGet) { t.Fatal("unexpected error from Get", err) } @@ -55,7 +55,7 @@ func TestEmptyClient(t *testing.T) { ch := c.Watch(ctx) //nolint - var rs []chain2.Result + var rs []drand.Result for r := range ch { rs = append(rs, r) } diff --git a/client/http/doc.go b/client/http/doc.go index e246c41..b543439 100644 --- a/client/http/doc.go +++ b/client/http/doc.go @@ -5,36 +5,6 @@ The HTTP client uses drand's JSON HTTP API (https://drand.love/developer/http-api/) to fetch randomness. Watching is implemented by polling the endpoint at the expected round time. -Example: - - package main - - import ( - "context" - "encoding/hex" - - "github.com/drand/drand/v2/client" - "github.com/drand/drand/v2/client/http" - "github.com/drand/drand/v2/common/log" - ) - - var urls = []string{ - "https://api.drand.sh", - "https://drand.cloudflare.com", - } - - var chainHash, _ = hex.DecodeString("8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce") - - func main() { - ctx := context.Background() - lg := log.New(nil, log.DebugLevel, true) - - c, err := client.New(ctx, lg, - client.From(http.ForURLs(ctx, lg, urls, chainHash)...), - client.WithChainHash(chainHash), - ) - } - The "ForURLs" helper creates multiple HTTP clients from a list of URLs. Alternatively you can use the "New" or "NewWithInfo" constructor to create clients. diff --git a/client/http/example_test.go b/client/http/example_test.go index fd17bfb..513b3f1 100644 --- a/client/http/example_test.go +++ b/client/http/example_test.go @@ -4,9 +4,11 @@ import ( "context" "encoding/hex" "fmt" + "time" + "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" - client2 "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" "github.com/drand/go-clients/client/http" ) @@ -17,17 +19,17 @@ func Example_http_New() { panic(err) } - client, err := http.New(context.Background(), nil, "http://api.drand.sh", chainhash, nil) + c, err := http.New(context.Background(), nil, "http://api.drand.sh", chainhash, nil) if err != nil { panic(err) } - result, err := client.Get(context.Background(), 1234) + result, err := c.Get(context.Background(), 1234) if err != nil { panic(err) } - info, err := client.Info(context.Background()) + info, err := c.Info(context.Background()) if err != nil { panic(err) } @@ -38,15 +40,40 @@ func Example_http_New() { } // make sure to verify the beacons when using the raw http client without a verifying client - err = scheme.VerifyBeacon(&client2.RandomData{ - Rnd: result.GetRound(), - Sig: result.GetSignature(), - }, info.PublicKey) + err = scheme.VerifyBeacon(result, info.PublicKey) if err != nil { panic(err) } fmt.Printf("got beacon: round=%d; randomness=%x\n", result.GetRound(), result.GetRandomness()) - //output: got beacon: round=1234; randomness=9ead58abb451d8f521338c43ba5595610642a0c07d0e9babeaae6a98787629de } + +func Example_http_New_with_chainhash() { + var urls = []string{ + "https://api.drand.sh", + "https://drand.cloudflare.com", + } + + var chainHash, _ = hex.DecodeString("52db9ba70e0cc0f6eaf7803dd07447a1f5477735fd3f661792ba94600c84e971") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + lg := log.New(nil, log.DebugLevel, true) + + c, err := client.New(client.From(http.ForURLs(ctx, lg, urls, chainHash)...), + client.WithChainHash(chainHash), + client.WithLogger(lg), + ) + if err != nil { + panic(err) + } + cancel() + + info, err := c.Info(context.Background()) + if err != nil { + panic(err) + } + + fmt.Println(info.GetSchemeName()) + //output: bls-unchained-g1-rfc9380 +} diff --git a/client/http/http.go b/client/http/http.go index 56a5d11..2b07108 100644 --- a/client/http/http.go +++ b/client/http/http.go @@ -12,17 +12,18 @@ import ( "time" "github.com/drand/drand/v2/crypto" - client2 "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" + "github.com/drand/go-clients/drand" json "github.com/nikkolasg/hexjson" "github.com/drand/drand/v2/common" chain2 "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" ) -var _ client.Client = &httpClient{} +var _ drand.Client = &httpClient{} +var _ drand.LoggingClient = &httpClient{} var errClientClosed = fmt.Errorf("client closed") @@ -33,6 +34,16 @@ const httpWaitMaxCounter = 20 const httpWaitInterval = 2 * time.Second const maxTimeoutHTTPRequest = 5 * time.Second +// NewSimpleClient creates a client using the default logger, default transport and a background context +// to instantiate a new Client for that remote host for that specific chainhash. +func NewSimpleClient(host, chainhash string) (*httpClient, error) { + chb, err := hex.DecodeString(chainhash) + if err != nil { + return nil, fmt.Errorf("unable to create basic HTTP client for url %q and chainhash %q: %w", host, chainhash, err) + } + return New(context.Background(), nil, host, chb, nil) +} + // New creates a new client pointing to an HTTP endpoint func New(ctx context.Context, l log.Logger, url string, chainHash []byte, transport nhttp.RoundTripper) (*httpClient, error) { if l == nil { @@ -68,6 +79,9 @@ func New(ctx context.Context, l log.Logger, url string, chainHash []byte, transp // NewWithInfo constructs an http client when the group parameters are already known. func NewWithInfo(l log.Logger, url string, info *chain2.Info, transport nhttp.RoundTripper) (*httpClient, error) { + if l == nil { + l = log.DefaultLogger() + } if transport == nil { transport = nhttp.DefaultTransport } @@ -92,8 +106,8 @@ func NewWithInfo(l log.Logger, url string, info *chain2.Info, transport nhttp.Ro } // ForURLs provides a shortcut for creating a set of HTTP clients for a set of URLs. -func ForURLs(ctx context.Context, l log.Logger, urls []string, chainHash []byte) []client.Client { - clients := make([]client.Client, 0) +func ForURLs(ctx context.Context, l log.Logger, urls []string, chainHash []byte) []drand.Client { + clients := make([]drand.Client, 0) var info *chain2.Info var skipped []string for _, u := range urls { @@ -280,12 +294,12 @@ func (h *httpClient) FetchChainInfo(ctx context.Context, chainHash []byte) (*cha } type httpGetResponse struct { - result client.Result + result drand.Result err error } // Get returns the randomness at `round` or an error. -func (h *httpClient) Get(ctx context.Context, round uint64) (client.Result, error) { +func (h *httpClient) Get(ctx context.Context, round uint64) (drand.Result, error) { var url string if round == 0 { url = fmt.Sprintf("%s%x/public/latest", h.root, h.chainInfo.Hash()) @@ -312,7 +326,7 @@ func (h *httpClient) Get(ctx context.Context, round uint64) (client.Result, erro } defer randResponse.Body.Close() - randResp := client2.RandomData{} + randResp := client.RandomData{} if err := json.NewDecoder(randResponse.Body).Decode(&randResp); err != nil { resC <- httpGetResponse{nil, fmt.Errorf("decoding response: %w", err)} return @@ -340,14 +354,14 @@ func (h *httpClient) Get(ctx context.Context, round uint64) (client.Result, erro } // Watch returns new randomness as it becomes available. -func (h *httpClient) Watch(ctx context.Context) <-chan client.Result { - out := make(chan client.Result) +func (h *httpClient) Watch(ctx context.Context) <-chan drand.Result { + out := make(chan drand.Result) go func() { ctx, cancel := context.WithCancel(ctx) defer cancel() defer close(out) - in := client2.PollingWatcher(ctx, h, h.chainInfo, h.l) + in := client.PollingWatcher(ctx, h, h.chainInfo, h.l) for { select { case res, ok := <-in: diff --git a/client/http/metric.go b/client/http/metric.go index 82fabd9..09caea2 100644 --- a/client/http/metric.go +++ b/client/http/metric.go @@ -6,14 +6,14 @@ import ( "github.com/prometheus/client_golang/prometheus" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common" "github.com/drand/go-clients/internal/metrics" - - chain2 "github.com/drand/drand/v2/common/client" ) // MeasureHeartbeats periodically tracks latency observed on a set of HTTP clients -func MeasureHeartbeats(ctx context.Context, c []chain2.Client) *HealthMetrics { +func MeasureHeartbeats(ctx context.Context, c []drand.Client) *HealthMetrics { m := &HealthMetrics{ next: 0, clients: c, @@ -27,7 +27,7 @@ func MeasureHeartbeats(ctx context.Context, c []chain2.Client) *HealthMetrics { // HealthMetrics is a measurement task around HTTP clients type HealthMetrics struct { next int - clients []chain2.Client + clients []drand.Client } // HeartbeatInterval is the duration between liveness heartbeats sent to an HTTP API. diff --git a/client/lp2p/client.go b/client/lp2p/client.go index 82ea9b4..a87d522 100644 --- a/client/lp2p/client.go +++ b/client/lp2p/client.go @@ -6,7 +6,6 @@ import ( "fmt" "sync" - clock "github.com/jonboulle/clockwork" "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" @@ -16,20 +15,25 @@ import ( "google.golang.org/protobuf/proto" "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" "github.com/drand/drand/v2/protobuf/drand" - client2 "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" + drandi "github.com/drand/go-clients/drand" ) +var _ drandi.LoggingClient = &Client{} + +// WatchBufferSize controls how many incoming messages can be in-flight until they start +// to be dropped by the library when using Client.Watch +var WatchBufferSize = 100 + // Client is a concrete pubsub client implementation type Client struct { - cancel func() - latest uint64 - cache client2.Cache - bufferSize int - log log.Logger + cancel func() + latest uint64 + cache client.Cache + log log.Logger subs struct { sync.Mutex @@ -37,10 +41,6 @@ type Client struct { } } -// DefaultBufferSize controls how many incoming messages can be in-flight until they start -// to be dropped by the library -const DefaultBufferSize = 100 - // SetLog configures the client log output func (c *Client) SetLog(l log.Logger) { c.log = l @@ -48,9 +48,9 @@ func (c *Client) SetLog(l log.Logger) { // WithPubsub provides an option for integrating pubsub notification // into a drand client. -func WithPubsub(l log.Logger, ps *pubsub.PubSub, clk clock.Clock, bufferSize int) client2.Option { - return client2.WithWatcher(func(info *chain.Info, cache client2.Cache) (client2.Watcher, error) { - c, err := NewWithPubsub(l, ps, info, cache, clk, bufferSize) +func WithPubsub(ps *pubsub.PubSub) client.Option { + return client.WithWatcher(func(l log.Logger, info *chain.Info, cache client.Cache) (client.Watcher, error) { + c, err := NewWithPubsub(l, ps, info, cache) if err != nil { return nil, err } @@ -67,7 +67,7 @@ func PubSubTopic(h string) string { // a default Logger, // //nolint:funlen,lll,gocyclo // This is a long line -func NewWithPubsub(l log.Logger, ps *pubsub.PubSub, info *chain.Info, cache client2.Cache, clk clock.Clock, bufferSize int) (*Client, error) { +func NewWithPubsub(l log.Logger, ps *pubsub.PubSub, info *chain.Info, cache client.Cache) (*Client, error) { if info == nil { return nil, fmt.Errorf("no chain supplied for joining") } @@ -85,15 +85,14 @@ func NewWithPubsub(l log.Logger, ps *pubsub.PubSub, info *chain.Info, cache clie ctx, cancel := context.WithCancel(context.Background()) c := &Client{ - cancel: cancel, - cache: cache, - bufferSize: bufferSize, - log: l, + cancel: cancel, + cache: cache, + log: l, } chainHash := hex.EncodeToString(info.Hash()) topic := PubSubTopic(chainHash) - if err := ps.RegisterTopicValidator(topic, randomnessValidator(info, cache, c, clk)); err != nil { + if err := ps.RegisterTopicValidator(topic, randomnessValidator(info, cache, c)); err != nil { cancel() return nil, fmt.Errorf("creating topic: %w", err) } @@ -197,9 +196,9 @@ func (c *Client) Sub(ch chan drand.PublicRandResponse) UnsubFunc { } // Watch implements the client.Watcher interface -func (c *Client) Watch(ctx context.Context) <-chan client.Result { - innerCh := make(chan drand.PublicRandResponse, c.bufferSize) - outerCh := make(chan client.Result, c.bufferSize) +func (c *Client) Watch(ctx context.Context) <-chan drandi.Result { + innerCh := make(chan drand.PublicRandResponse, WatchBufferSize) + outerCh := make(chan drandi.Result, WatchBufferSize) end := c.Sub(innerCh) w := sync.WaitGroup{} @@ -212,12 +211,12 @@ func (c *Client) Watch(ctx context.Context) <-chan client.Result { for { select { - // TODO: do not copy by assignment any drand.PublicRandResponse case resp, ok := <-innerCh: //nolint:govet if !ok { + c.log.Debugw("innerCh closed") return } - dat := &client2.RandomData{ + dat := &client.RandomData{ Rnd: resp.GetRound(), Random: crypto.RandomnessFromSignature(resp.GetSignature()), Sig: resp.GetSignature(), diff --git a/client/lp2p/client_test.go b/client/lp2p/client_test.go index ec67420..a2c8d6f 100644 --- a/client/lp2p/client_test.go +++ b/client/lp2p/client_test.go @@ -16,8 +16,11 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" - chain2 "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" + "github.com/drand/drand/v2/test/mock" + "github.com/drand/go-clients/drand" + "github.com/drand/go-clients/internal/grpc" + + "github.com/drand/drand/v2/common/chain" "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" dhttp "github.com/drand/go-clients/client/http" @@ -25,87 +28,87 @@ import ( "github.com/drand/go-clients/internal/lp2p" ) -// -// func TestGRPCClientTestFunc(t *testing.T) { -// lg := log.New(nil, log.DebugLevel, true) -// // start mock drand node -// sch, err := crypto.GetSchemeFromEnv() -// require.NoError(t, err) -// -// clk := clock.NewFakeClockAt(time.Now()) -// -// grpcLis, svc := mock.NewMockGRPCPublicServer(t, lg, "127.0.0.1:0", false, sch, clk) -// grpcAddr := grpcLis.Addr() -// go grpcLis.Start() -// defer grpcLis.Stop(context.Background()) -// -// dataDir := t.TempDir() -// identityDir := t.TempDir() -// -// infoProto, err := svc.ChainInfo(context.Background(), nil) -// require.NoError(t, err) -// -// info, err := chain2.InfoFromProto(infoProto) -// require.NoError(t, err) -// -// // start mock relay-node -// grpcClient, err := grpc.New(lg, grpcAddr, "", true, []byte("")) -// require.NoError(t, err) -// -// cfg := &lp2p.GossipRelayConfig{ -// ChainHash: info.HashString(), -// PeerWith: nil, -// Addr: "/ip4/127.0.0.1/tcp/" + test.FreePort(), -// DataDir: dataDir, -// IdentityPath: path.Join(identityDir, "identity.key"), -// Client: grpcClient, -// } -// g, err := lp2p.NewGossipRelayNode(lg, cfg) -// require.NoError(t, err, "gossip relay node") -// -// defer g.Shutdown() -// -// // start client -// c, err := newTestClient(t, g.Multiaddrs(), info, clk) -// require.NoError(t, err) -// defer func() { -// err := c.Close() -// require.NoError(t, err) -// }() -// -// // test client -// ctx, cancel := context.WithCancel(context.Background()) -// ch := c.Watch(ctx) -// -// baseRound := uint64(1969) -// -// mockService := svc.(mock.Service) -// // pub sub polls every 200ms -// wait := 250 * time.Millisecond -// for i := uint64(0); i < 3; i++ { -// time.Sleep(wait) -// mockService.EmitRand(false) -// t.Logf("round %d emitted\n", baseRound+i) -// -// select { -// case r, ok := <-ch: -// require.True(t, ok, "expected randomness, watch outer channel was closed instead") -// t.Logf("received round %d\n", r.Round()) -// require.Equal(t, baseRound+i, r.Round()) -// // the period of the mock servers is 1 second -// case <-time.After(5 * time.Second): -// t.Fatal("timeout.") -// } -// } -// -// time.Sleep(wait) -// mockService.EmitRand(true) -// cancel() -// -// drain(t, ch, 5*time.Second) -//} - -func drain(t *testing.T, ch <-chan client.Result, timeout time.Duration) { +func TestGRPCClientTestFunc(t *testing.T) { + lg := log.New(nil, log.DebugLevel, true) + // start mock drand node + sch, err := crypto.GetSchemeFromEnv() + require.NoError(t, err) + + clk := clock.NewFakeClockAt(time.Now()) + + grpcLis, svc := mock.NewMockGRPCPublicServer(t, lg, "127.0.0.1:0", false, sch, clk) + grpcAddr := grpcLis.Addr() + go grpcLis.Start() + defer grpcLis.Stop(context.Background()) + + dataDir := t.TempDir() + identityDir := t.TempDir() + + infoProto, err := svc.ChainInfo(context.Background(), nil) + require.NoError(t, err) + + info, err := chain.InfoFromProto(infoProto) + require.NoError(t, err) + + // start mock relay-node + grpcClient, err := grpc.New(grpcAddr, true, []byte("")) + require.NoError(t, err) + + cfg := &lp2p.GossipRelayConfig{ + ChainHash: info.HashString(), + PeerWith: nil, + Addr: "/ip4/127.0.0.1/tcp/0", + DataDir: dataDir, + IdentityPath: path.Join(identityDir, "identity.key"), + Client: grpcClient, + } + g, err := lp2p.NewGossipRelayNode(lg, cfg) + require.NoError(t, err, "gossip relay node") + + defer g.Shutdown() + + // start client + c, err := newTestClient(t, g.Multiaddrs(), info, clk) + require.NoError(t, err) + defer func() { + err := c.Close() + require.NoError(t, err) + }() + + // test client + ctx, cancel := context.WithCancel(context.Background()) + ch := c.Watch(ctx) + + baseRound := uint64(1969) + + mockService := svc.(mock.Service) + // pub sub polls every 200ms + wait := 250 * time.Millisecond + for i := uint64(0); i < 3; i++ { + time.Sleep(wait) + mockService.EmitRand(false) + t.Logf("round %d emitted\n", baseRound+i) + + select { + case r, ok := <-ch: + require.True(t, ok, "expected randomness, watch outer channel was closed instead") + t.Logf("received round %d\n", r.GetRound()) + require.Equal(t, baseRound+i, r.GetRound()) + // the period of the mock servers is 1 second + case <-time.After(5 * time.Second): + t.Fatal("timeout.") + } + } + + time.Sleep(wait) + mockService.EmitRand(true) + cancel() + + drain(t, ch, 5*time.Second) +} + +func drain(t *testing.T, ch <-chan drand.Result, timeout time.Duration) { + t.Helper() for { select { case _, ok := <-ch: @@ -178,7 +181,7 @@ func TestHTTPClientTestFunc(t *testing.T) { drain(t, ch, 5*time.Second) } -func newTestClient(t *testing.T, relayMultiaddr []ma.Multiaddr, info *chain2.Info, clk clock.Clock) (*Client, error) { +func newTestClient(t *testing.T, relayMultiaddr []ma.Multiaddr, info *chain.Info, clk clock.Clock) (*Client, error) { identityDir := t.TempDir() lg := log.New(nil, log.DebugLevel, true) @@ -186,7 +189,7 @@ func newTestClient(t *testing.T, relayMultiaddr []ma.Multiaddr, info *chain2.Inf if err != nil { return nil, err } - h, ps, err := lp2p.ConstructHost(priv, "/ip4/0.0.0.0/tcp/"+strconv.Itoa(freeport.GetOne(t)), relayMultiaddr, lg) + h, ps, err := lp2p.ConstructHost(priv, "/ip4/0.0.0.0/tcp/0", relayMultiaddr, lg) if err != nil { return nil, err } @@ -198,7 +201,7 @@ func newTestClient(t *testing.T, relayMultiaddr []ma.Multiaddr, info *chain2.Inf if err != nil { return nil, err } - c, err := NewWithPubsub(lg, ps, info, nil, clk, 100) + c, err := NewWithPubsub(lg, ps, info, nil) if err != nil { return nil, err } diff --git a/client/lp2p/example_test.go b/client/lp2p/example_test.go index 676b938..9362717 100644 --- a/client/lp2p/example_test.go +++ b/client/lp2p/example_test.go @@ -6,8 +6,6 @@ import ( "fmt" "time" - clock "github.com/jonboulle/clockwork" - "github.com/drand/drand/v2/common" "github.com/drand/drand/v2/common/chain" "github.com/drand/drand/v2/common/log" @@ -51,7 +49,7 @@ func ExampleNewPubsub() { } // NewWithPubSub will automatically register the topic for the chainhash you're interested in - c, err := gclient.NewWithPubsub(log.DefaultLogger(), ps, info, nil, clock.NewRealClock(), gclient.DefaultBufferSize) + c, err := gclient.NewWithPubsub(log.DefaultLogger(), ps, info, nil) if err != nil { panic(err) } diff --git a/client/lp2p/validator.go b/client/lp2p/validator.go index db4119e..80f31cc 100644 --- a/client/lp2p/validator.go +++ b/client/lp2p/validator.go @@ -8,7 +8,6 @@ import ( commonutils "github.com/drand/drand/v2/common" "github.com/drand/go-clients/client" - clock "github.com/jonboulle/clockwork" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "google.golang.org/protobuf/proto" @@ -18,7 +17,7 @@ import ( "github.com/drand/drand/v2/protobuf/drand" ) -func randomnessValidator(info *chain2.Info, cache client.Cache, c *Client, clk clock.Clock) pubsub.ValidatorEx { +func randomnessValidator(info *chain2.Info, cache client.Cache, c *Client) pubsub.ValidatorEx { var scheme *crypto.Scheme if info != nil { scheme, _ = crypto.GetSchemeByID(info.Scheme) @@ -39,7 +38,7 @@ func randomnessValidator(info *chain2.Info, cache client.Cache, c *Client, clk c } // Unwilling to relay beacons in the future. - timeNow := clk.Now() + timeNow := time.Now() timeOfRound := commonutils.TimeOfRound(info.Period, info.GenesisTime, rand.GetRound()) if time.Unix(timeOfRound, 0).After(timeNow) { c.log.Warnw("", diff --git a/client/lp2p/validator_test.go b/client/lp2p/validator_test.go index 109fa42..08cec65 100644 --- a/client/lp2p/validator_test.go +++ b/client/lp2p/validator_test.go @@ -27,22 +27,6 @@ import ( "github.com/drand/go-clients/client/test/cache" ) -type randomDataWrapper struct { - data client.RandomData -} - -func (r *randomDataWrapper) GetRound() uint64 { - return r.data.Rnd -} - -func (r *randomDataWrapper) GetSignature() []byte { - return r.data.Sig -} - -func (r *randomDataWrapper) GetRandomness() []byte { - return r.data.Random -} - func randomPeerID(t *testing.T) peer.ID { priv, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { @@ -91,8 +75,7 @@ func fakeChainInfo() *chain2.Info { func TestRejectsUnmarshalBeaconFailure(t *testing.T) { c := Client{log: log.New(nil, log.DebugLevel, true)} - clk := clock.NewFakeClock() - validate := randomnessValidator(fakeChainInfo(), nil, &c, clk) + validate := randomnessValidator(fakeChainInfo(), nil, &c) msg := pubsub.Message{Message: &pb.Message{}} res := validate(context.Background(), randomPeerID(t), &msg) @@ -104,8 +87,7 @@ func TestRejectsUnmarshalBeaconFailure(t *testing.T) { func TestAcceptsWithoutTrustRoot(t *testing.T) { c := Client{log: log.New(nil, log.DebugLevel, true)} - clk := clock.NewFakeClock() - validate := randomnessValidator(nil, nil, &c, clk) + validate := randomnessValidator(nil, nil, &c) resp := drand.PublicRandResponse{} data, err := proto.Marshal(&resp) @@ -123,8 +105,7 @@ func TestAcceptsWithoutTrustRoot(t *testing.T) { func TestRejectsFutureBeacons(t *testing.T) { info := fakeChainInfo() c := Client{log: log.New(nil, log.DebugLevel, true)} - clk := clock.NewFakeClock() - validate := randomnessValidator(info, nil, &c, clk) + validate := randomnessValidator(info, nil, &c) resp := drand.PublicRandResponse{ Round: common.CurrentRound(time.Now().Unix(), info.Period, info.GenesisTime) + 5, @@ -144,8 +125,7 @@ func TestRejectsFutureBeacons(t *testing.T) { func TestRejectsVerifyBeaconFailure(t *testing.T) { info := fakeChainInfo() c := Client{log: log.New(nil, log.DebugLevel, true)} - clk := clock.NewFakeClock() - validate := randomnessValidator(info, nil, &c, clk) + validate := randomnessValidator(info, nil, &c) resp := drand.PublicRandResponse{ Round: common.CurrentRound(time.Now().Unix(), info.Period, info.GenesisTime), @@ -168,7 +148,7 @@ func TestIgnoresCachedEqualBeacon(t *testing.T) { ca := cache.NewMapCache() c := Client{log: log.New(nil, log.DebugLevel, true)} clk := clock.NewFakeClockAt(time.Now()) - validate := randomnessValidator(info, ca, &c, clk) + validate := randomnessValidator(info, ca, &c) rdata := fakeRandomData(info, clk) ca.Add(rdata.Rnd, &rdata) @@ -196,7 +176,7 @@ func TestRejectsCachedUnequalBeacon(t *testing.T) { ca := cache.NewMapCache() c := Client{log: log.New(nil, log.DebugLevel, true)} clk := clock.NewFakeClock() - validate := randomnessValidator(info, ca, &c, clk) + validate := randomnessValidator(info, ca, &c) rdata := fakeRandomData(info, clk) ca.Add(rdata.Rnd, &rdata) @@ -227,15 +207,15 @@ func TestIgnoresCachedEqualNonRandomDataBeacon(t *testing.T) { ca := cache.NewMapCache() c := Client{log: log.New(nil, log.DebugLevel, true)} clk := clock.NewFakeClockAt(time.Now()) - validate := randomnessValidator(info, ca, &c, clk) - rdata := randomDataWrapper{fakeRandomData(info, clk)} + validate := randomnessValidator(info, ca, &c) + rdata := fakeRandomData(info, clk) ca.Add(rdata.GetRound(), &rdata) resp := drand.PublicRandResponse{ Round: rdata.GetRound(), Signature: rdata.GetSignature(), - PreviousSignature: rdata.data.PreviousSignature, + PreviousSignature: rdata.GetPreviousSignature(), Randomness: rdata.GetRandomness(), } data, err := proto.Marshal(&resp) @@ -255,8 +235,8 @@ func TestRejectsCachedEqualNonRandomDataBeacon(t *testing.T) { ca := cache.NewMapCache() c := Client{log: log.New(nil, log.DebugLevel, true)} clk := clock.NewFakeClock() - validate := randomnessValidator(info, ca, &c, clk) - rdata := randomDataWrapper{fakeRandomData(info, clk)} + validate := randomnessValidator(info, ca, &c) + rdata := fakeRandomData(info, clk) ca.Add(rdata.GetRound(), &rdata) @@ -266,7 +246,7 @@ func TestRejectsCachedEqualNonRandomDataBeacon(t *testing.T) { resp := drand.PublicRandResponse{ Round: rdata.GetRound(), Signature: sig, // incoming message has incorrect sig - PreviousSignature: rdata.data.PreviousSignature, + PreviousSignature: rdata.GetPreviousSignature(), Randomness: rdata.GetRandomness(), } data, err := proto.Marshal(&resp) diff --git a/client/mock/mock.go b/client/mock/mock.go index e09fd12..2dafa88 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -7,21 +7,19 @@ import ( "time" commonutils "github.com/drand/drand/v2/common" - chain2 "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" + "github.com/drand/drand/v2/common/chain" "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" ) -var _ client.Client = &Client{} - // Client provide a mocked client interface // //nolint:gocritic type Client struct { sync.Mutex - OptionalInfo *chain2.Info - WatchCh chan client.Result - WatchF func(context.Context) <-chan client.Result + OptionalInfo *chain.Info + WatchCh chan drand.Result + WatchF func(context.Context) <-chan drand.Result Results []mock.Result // Delay causes results to be delivered after this period of time has // passed. Note that if the context is canceled a result is still consumed @@ -41,7 +39,7 @@ func (m *Client) String() string { } // Get returns the randomness at `round` or an error. -func (m *Client) Get(ctx context.Context, round uint64) (client.Result, error) { +func (m *Client) Get(ctx context.Context, round uint64) (drand.Result, error) { m.Lock() if len(m.Results) == 0 { m.Unlock() @@ -73,14 +71,14 @@ func (m *Client) Get(ctx context.Context, round uint64) (client.Result, error) { } // Watch returns new randomness as it becomes available. -func (m *Client) Watch(ctx context.Context) <-chan client.Result { +func (m *Client) Watch(ctx context.Context) <-chan drand.Result { if m.WatchCh != nil { return m.WatchCh } if m.WatchF != nil { return m.WatchF(ctx) } - ch := make(chan client.Result, 1) + ch := make(chan drand.Result, 1) r, err := m.Get(ctx, 0) if err == nil { ch <- r @@ -89,7 +87,7 @@ func (m *Client) Watch(ctx context.Context) <-chan client.Result { return ch } -func (m *Client) Info(_ context.Context) (*chain2.Info, error) { +func (m *Client) Info(_ context.Context) (*chain.Info, error) { if m.OptionalInfo != nil { return m.OptionalInfo, nil } @@ -119,19 +117,19 @@ func ClientWithResults(n, m uint64) *Client { } // ClientWithInfo makes a client that returns the given info but no randomness -func ClientWithInfo(info *chain2.Info) client.Client { +func ClientWithInfo(info *chain.Info) *InfoClient { return &InfoClient{info} } type InfoClient struct { - i *chain2.Info + i *chain.Info } func (m *InfoClient) String() string { return "MockInfo" } -func (m *InfoClient) Info(_ context.Context) (*chain2.Info, error) { +func (m *InfoClient) Info(_ context.Context) (*chain.Info, error) { return m.i, nil } @@ -139,12 +137,12 @@ func (m *InfoClient) RoundAt(t time.Time) uint64 { return commonutils.CurrentRound(t.Unix(), m.i.Period, m.i.GenesisTime) } -func (m *InfoClient) Get(_ context.Context, _ uint64) (client.Result, error) { +func (m *InfoClient) Get(_ context.Context, _ uint64) (drand.Result, error) { return nil, errors.New("not supported (mock info client get)") } -func (m *InfoClient) Watch(_ context.Context) <-chan client.Result { - ch := make(chan client.Result, 1) +func (m *InfoClient) Watch(_ context.Context) <-chan drand.Result { + ch := make(chan drand.Result, 1) close(ch) return ch } diff --git a/client/optimizing.go b/client/optimizing.go index 82e24b3..268d28e 100644 --- a/client/optimizing.go +++ b/client/optimizing.go @@ -12,9 +12,10 @@ import ( "github.com/hashicorp/go-multierror" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common" "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" ) @@ -37,8 +38,8 @@ const ( type optimizingClient struct { sync.RWMutex - clients []client.Client - passiveClients []client.Client + clients []drand.Client + passiveClients []drand.Client stats []*requestStat requestTimeout time.Duration requestConcurrency int @@ -68,7 +69,7 @@ type optimizingClient struct { // ensure no unbounded blocking occurs. func newOptimizingClient( l log.Logger, - clients []client.Client, + clients []drand.Client, requestTimeout time.Duration, requestConcurrency int, speedTestInterval, @@ -122,7 +123,7 @@ func (oc *optimizingClient) Start() { // Note: if a client marked as passive closes its results channel from a `watch` call, the // optimizing client will not re-open it, as would be attempted with non-passive clients. // MarkPassive must tag clients as passive before `Start` is run. -func (oc *optimizingClient) MarkPassive(c client.Client) { +func (oc *optimizingClient) MarkPassive(c drand.Client) { oc.passiveClients = append(oc.passiveClients, c) // push passive clients to the back of the list for `Get`s for _, s := range oc.stats { @@ -144,7 +145,7 @@ func (oc *optimizingClient) String() string { type requestStat struct { // client is the client used to make the request. - client client.Client + client drand.Client // rtt is the time it took to make the request. rtt time.Duration // startTime is the time at which the request was started. @@ -153,9 +154,9 @@ type requestStat struct { type requestResult struct { // client is the client used to make the request. - client client.Client + client drand.Client // result is the return value from the call to Get. - result client.Result + result drand.Result // err is the error that occurred from a call to Get (not including context error). err error // stat is stats from the call to Get. @@ -163,7 +164,7 @@ type requestResult struct { } // markedPassive checks if a client should be treated as passive -func (oc *optimizingClient) markedPassive(c client.Client) bool { +func (oc *optimizingClient) markedPassive(c drand.Client) bool { for _, p := range oc.passiveClients { if p == c { return true @@ -173,7 +174,7 @@ func (oc *optimizingClient) markedPassive(c client.Client) bool { } func (oc *optimizingClient) testSpeed() { - clients := make([]client.Client, 0, len(oc.clients)) + clients := make([]drand.Client, 0, len(oc.clients)) for _, c := range oc.clients { if !oc.markedPassive(c) { clients = append(clients, c) @@ -221,11 +222,11 @@ func (oc *optimizingClient) SetLog(l log.Logger) { } // fastestClients returns a ordered slice of clients - fastest first. -func (oc *optimizingClient) fastestClients() []client.Client { +func (oc *optimizingClient) fastestClients() []drand.Client { oc.RLock() defer oc.RUnlock() // copy the current ordered client list so we iterate over a stable slice - clients := make([]client.Client, 0, len(oc.stats)) + clients := make([]drand.Client, 0, len(oc.stats)) for _, s := range oc.stats { clients = append(clients, s.client) } @@ -233,7 +234,7 @@ func (oc *optimizingClient) fastestClients() []client.Client { } // Get returns the randomness at `round` or an error. -func (oc *optimizingClient) Get(ctx context.Context, round uint64) (res client.Result, err error) { +func (oc *optimizingClient) Get(ctx context.Context, round uint64) (res drand.Result, err error) { clients := oc.fastestClients() // no need to race clients when we have only one if len(clients) == 1 { @@ -252,7 +253,7 @@ LOOP: } stats = append(stats, rr.stat) res = rr.result - if rr.err != nil && !errors.Is(rr.err, common.ErrEmptyClientUnsupportedGet) { + if rr.err != nil && !errors.Is(rr.err, drand.ErrEmptyClientUnsupportedGet) { err = errors.Join(err, rr.err) } else if rr.err == nil { err = nil @@ -271,7 +272,7 @@ LOOP: } // get calls Get on the passed client and returns a requestResult or nil if the context was canceled. -func get(ctx context.Context, c client.Client, round uint64) *requestResult { +func get(ctx context.Context, c drand.Client, round uint64) *requestResult { start := time.Now() res, err := c.Get(ctx, round) rtt := time.Since(start) @@ -291,7 +292,7 @@ func get(ctx context.Context, c client.Client, round uint64) *requestResult { return &requestResult{c, res, err, &stat} } -func raceGet(ctx context.Context, clients []client.Client, round uint64, timeout time.Duration, concurrency int) <-chan *requestResult { +func raceGet(ctx context.Context, clients []drand.Client, round uint64, timeout time.Duration, concurrency int) <-chan *requestResult { results := make(chan *requestResult, len(clients)) go func() { @@ -319,7 +320,7 @@ func raceGet(ctx context.Context, clients []client.Client, round uint64, timeout return results } -func parallelGet(ctx context.Context, clients []client.Client, round uint64, timeout time.Duration, concurrency int) <-chan *requestResult { +func parallelGet(ctx context.Context, clients []drand.Client, round uint64, timeout time.Duration, concurrency int) <-chan *requestResult { results := make(chan *requestResult, len(clients)) token := make(chan struct{}, concurrency) @@ -335,7 +336,7 @@ func parallelGet(ctx context.Context, clients []client.Client, round uint64, tim select { case <-token: wg.Add(1) - go func(c client.Client) { + go func(c drand.Client) { gctx, cancel := context.WithTimeout(ctx, timeout) rr := get(gctx, c, round) cancel() @@ -380,11 +381,11 @@ func (oc *optimizingClient) updateStats(stats []*requestStat) { } type watchResult struct { - client.Result - client.Client + drand.Result + drand.Client } -func (oc *optimizingClient) trackWatchResults(info *chain.Info, in chan watchResult, out chan client.Result) { +func (oc *optimizingClient) trackWatchResults(info *chain.Info, in chan watchResult, out chan drand.Result) { defer close(out) latest := uint64(0) @@ -405,8 +406,8 @@ func (oc *optimizingClient) trackWatchResults(info *chain.Info, in chan watchRes } // Watch returns new randomness as it becomes available. -func (oc *optimizingClient) Watch(ctx context.Context) <-chan client.Result { - outChan := make(chan client.Result, defaultChannelBuffer) +func (oc *optimizingClient) Watch(ctx context.Context) <-chan drand.Result { + outChan := make(chan drand.Result, defaultChannelBuffer) inChan := make(chan watchResult, defaultChannelBuffer) info, err := oc.Info(ctx) @@ -425,7 +426,7 @@ func (oc *optimizingClient) Watch(ctx context.Context) <-chan client.Result { retryInterval: oc.watchRetryInterval, } - closingClients := make(chan client.Client, 1) + closingClients := make(chan drand.Client, 1) for _, c := range oc.passiveClients { c := c go state.watchNext(ctx, c, inChan, closingClients) @@ -438,12 +439,12 @@ func (oc *optimizingClient) Watch(ctx context.Context) <-chan client.Result { } type watchingClient struct { - client.Client + drand.Client context.CancelFunc } type failedClient struct { - client.Client + drand.Client backoffUntil time.Time } @@ -456,7 +457,7 @@ type watchState struct { retryInterval time.Duration } -func (ws *watchState) dispatchWatchingClients(resultChan chan watchResult, closingClients chan client.Client) { +func (ws *watchState) dispatchWatchingClients(resultChan chan watchResult, closingClients chan drand.Client) { defer close(resultChan) // spin up initial watcher(s) @@ -495,7 +496,7 @@ func (ws *watchState) dispatchWatchingClients(resultChan chan watchResult, closi } } -func (ws *watchState) tryRepopulate(results chan watchResult, done chan client.Client) { +func (ws *watchState) tryRepopulate(results chan watchResult, done chan drand.Client) { ws.clean() for { @@ -514,7 +515,7 @@ func (ws *watchState) tryRepopulate(results chan watchResult, done chan client.C } } -func (ws *watchState) watchNext(ctx context.Context, c client.Client, out chan watchResult, done chan client.Client) { +func (ws *watchState) watchNext(ctx context.Context, c drand.Client, out chan watchResult, done chan drand.Client) { defer func() { done <- c }() resultStream := c.Watch(ctx) @@ -541,7 +542,7 @@ func (ws *watchState) close(clientIdx int) { ws.active = ws.active[:len(ws.active)-1] } -func (ws *watchState) done(c client.Client) { +func (ws *watchState) done(c drand.Client) { idx := ws.hasActive(c) if idx > -1 { ws.close(idx) @@ -555,7 +556,7 @@ func (ws *watchState) done(c client.Client) { // this happens when the optimizing client has closed it via `closeSlowest` } -func (ws *watchState) hasActive(c client.Client) int { +func (ws *watchState) hasActive(c drand.Client) int { for i, a := range ws.active { if a.Client == c { return i @@ -564,7 +565,7 @@ func (ws *watchState) hasActive(c client.Client) int { return -1 } -func (ws *watchState) hasProtected(c client.Client) int { +func (ws *watchState) hasProtected(c drand.Client) int { for i, p := range ws.protected { if p.Client == c { return i @@ -587,7 +588,7 @@ func (ws *watchState) closeSlowest() { ws.close(idxs[len(idxs)-1]) } -func (ws *watchState) nextUnwatched() client.Client { +func (ws *watchState) nextUnwatched() drand.Client { clients := ws.optimizer.fastestClients() ClientLoop: for _, c := range clients { diff --git a/client/optimizing_test.go b/client/optimizing_test.go index e16a16f..7c59c3a 100644 --- a/client/optimizing_test.go +++ b/client/optimizing_test.go @@ -6,14 +6,14 @@ import ( "testing" "time" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" clientMock "github.com/drand/go-clients/client/mock" "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" ) // waitForSpeedTest waits until all clients have had their initial speed test. -func waitForSpeedTest(t *testing.T, c client.Client, timeout time.Duration) { +func waitForSpeedTest(t *testing.T, c drand.Client, timeout time.Duration) { t.Helper() oc, ok := c.(*optimizingClient) if !ok { @@ -49,14 +49,14 @@ func waitForSpeedTest(t *testing.T, c client.Client, timeout time.Duration) { } } -func expectRound(t *testing.T, res client.Result, r uint64) { +func expectRound(t *testing.T, res drand.Result, r uint64) { t.Helper() if res.GetRound() != r { t.Fatalf("expected round %v, got %v", r, res.GetRound()) } } -func closeClient(t *testing.T, c client.Client) { +func closeClient(t *testing.T, c drand.Client) { t.Helper() err := c.Close() if err != nil { @@ -72,7 +72,7 @@ func TestOptimizingGet(t *testing.T) { c1.Delay = time.Millisecond lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{c0, c1}, time.Second*5, 2, time.Minute*5, 0) + oc, err := newOptimizingClient(lg, []drand.Client{c0, c1}, time.Second*5, 2, time.Minute*5, 0) if err != nil { t.Fatal(err) } @@ -97,13 +97,13 @@ func TestOptimizingWatch(t *testing.T) { c1 := clientMock.ClientWithResults(5, 8) c2 := clientMock.ClientWithInfo(fakeChainInfo(t)) - wc1 := make(chan client.Result, 5) + wc1 := make(chan drand.Result, 5) c1.WatchCh = wc1 c0.Delay = time.Millisecond lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{c0, c1, c2}, time.Second*5, 2, time.Minute*5, 0) + oc, err := newOptimizingClient(lg, []drand.Client{c0, c1, c2}, time.Second*5, 2, time.Minute*5, 0) if err != nil { t.Fatal(err) } @@ -135,8 +135,8 @@ func TestOptimizingWatchRetryOnClose(t *testing.T) { // a single result for the speed test Results: []mock.Result{mock.NewMockResult(0)}, // return a watch channel that yields one result then closes - WatchF: func(context.Context) <-chan client.Result { - ch := make(chan client.Result, 1) + WatchF: func(context.Context) <-chan drand.Result { + ch := make(chan drand.Result, 1) r := mock.NewMockResult(rnd) rnd++ ch <- &r @@ -146,7 +146,7 @@ func TestOptimizingWatchRetryOnClose(t *testing.T) { } lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{c}, 0, 0, 0, time.Millisecond) + oc, err := newOptimizingClient(lg, []drand.Client{c}, 0, 0, 0, time.Millisecond) if err != nil { t.Fatal(err) } @@ -176,10 +176,10 @@ func TestOptimizingWatchFailover(t *testing.T) { var rndlk sync.Mutex var rnd uint64 = 1 - wf := func(context.Context) <-chan client.Result { + wf := func(context.Context) <-chan drand.Result { rndlk.Lock() defer rndlk.Unlock() - ch := make(chan client.Result, 1) + ch := make(chan drand.Result, 1) r := mock.NewMockResult(rnd) rnd++ if rnd < 5 { @@ -198,7 +198,7 @@ func TestOptimizingWatchFailover(t *testing.T) { } lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{clientMock.ClientWithInfo(chainInfo), c1, c2}, 0, 0, 0, time.Millisecond) + oc, err := newOptimizingClient(lg, []drand.Client{clientMock.ClientWithInfo(chainInfo), c1, c2}, 0, 0, 0, time.Millisecond) if err != nil { t.Fatal(err) } @@ -226,7 +226,7 @@ func TestOptimizingWatchFailover(t *testing.T) { func TestOptimizingRequiresClients(t *testing.T) { lg := log.New(nil, log.DebugLevel, true) - _, err := newOptimizingClient(lg, []client.Client{}, 0, 0, 0, 0) + _, err := newOptimizingClient(lg, []drand.Client{}, 0, 0, 0, 0) if err == nil { t.Fatal("expected err is nil but it shouldn't be") } @@ -237,7 +237,7 @@ func TestOptimizingRequiresClients(t *testing.T) { func TestOptimizingIsLogging(t *testing.T) { lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{&clientMock.Client{}}, 0, 0, 0, 0) + oc, err := newOptimizingClient(lg, []drand.Client{&clientMock.Client{}}, 0, 0, 0, 0) if err != nil { t.Fatal(err) } @@ -246,7 +246,7 @@ func TestOptimizingIsLogging(t *testing.T) { func TestOptimizingIsCloser(t *testing.T) { lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{&clientMock.Client{}}, 0, 0, 0, 0) + oc, err := newOptimizingClient(lg, []drand.Client{&clientMock.Client{}}, 0, 0, 0, 0) if err != nil { t.Fatal(err) } @@ -260,7 +260,7 @@ func TestOptimizingIsCloser(t *testing.T) { func TestOptimizingInfo(t *testing.T) { lg := log.New(nil, log.DebugLevel, true) chainInfo := fakeChainInfo(t) - oc, err := newOptimizingClient(lg, []client.Client{clientMock.ClientWithInfo(chainInfo)}, 0, 0, 0, 0) + oc, err := newOptimizingClient(lg, []drand.Client{clientMock.ClientWithInfo(chainInfo)}, 0, 0, 0, 0) if err != nil { t.Fatal(err) } @@ -276,7 +276,7 @@ func TestOptimizingInfo(t *testing.T) { func TestOptimizingRoundAt(t *testing.T) { lg := log.New(nil, log.DebugLevel, true) - oc, err := newOptimizingClient(lg, []client.Client{&clientMock.Client{}}, 0, 0, 0, 0) + oc, err := newOptimizingClient(lg, []drand.Client{&clientMock.Client{}}, 0, 0, 0, 0) if err != nil { t.Fatal(err) } @@ -295,9 +295,9 @@ func TestOptimizingClose(t *testing.T) { return nil } - clients := []client.Client{ - &clientMock.Client{WatchCh: make(chan client.Result), CloseF: closeF}, - &clientMock.Client{WatchCh: make(chan client.Result), CloseF: closeF}, + clients := []drand.Client{ + &clientMock.Client{WatchCh: make(chan drand.Result), CloseF: closeF}, + &clientMock.Client{WatchCh: make(chan drand.Result), CloseF: closeF}, } wg.Add(len(clients)) diff --git a/client/poll.go b/client/poll.go index d9a90d5..583a202 100644 --- a/client/poll.go +++ b/client/poll.go @@ -5,15 +5,15 @@ import ( "time" commonutils "github.com/drand/drand/v2/common" - chain2 "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" + "github.com/drand/drand/v2/common/chain" "github.com/drand/drand/v2/common/log" + "github.com/drand/go-clients/drand" ) // PollingWatcher generalizes the `Watch` interface for clients which learn new values // by asking for them once each group period. -func PollingWatcher(ctx context.Context, c client.Client, chainInfo *chain2.Info, l log.Logger) <-chan client.Result { - ch := make(chan client.Result, 1) +func PollingWatcher(ctx context.Context, c drand.Client, chainInfo *chain.Info, l log.Logger) <-chan drand.Result { + ch := make(chan drand.Result, 1) r := c.RoundAt(time.Now()) val, err := c.Get(ctx, r) if err != nil { diff --git a/client/test/cache/cache.go b/client/test/cache/cache.go index 8104126..4da033e 100644 --- a/client/test/cache/cache.go +++ b/client/test/cache/cache.go @@ -3,22 +3,22 @@ package cache import ( "sync" - "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" ) // MapCache is a simple cache that stores data in memory. type MapCache struct { sync.RWMutex - data map[uint64]client.Result + data map[uint64]drand.Result } // NewMapCache creates a new in memory cache backed by a map. func NewMapCache() *MapCache { - return &MapCache{data: make(map[uint64]client.Result)} + return &MapCache{data: make(map[uint64]drand.Result)} } // TryGet provides a round beacon or nil if it is not cached. -func (mc *MapCache) TryGet(round uint64) client.Result { +func (mc *MapCache) TryGet(round uint64) drand.Result { mc.RLock() defer mc.RUnlock() r, ok := mc.data[round] @@ -29,7 +29,7 @@ func (mc *MapCache) TryGet(round uint64) client.Result { } // Add adds an item to the cache -func (mc *MapCache) Add(round uint64, result client.Result) { +func (mc *MapCache) Add(round uint64, result drand.Result) { mc.Lock() mc.data[round] = result mc.Unlock() diff --git a/client/test/http/mock/httpserver.go b/client/test/http/mock/httpserver.go index 4372b4b..6bb40ff 100644 --- a/client/test/http/mock/httpserver.go +++ b/client/test/http/mock/httpserver.go @@ -7,17 +7,18 @@ import ( "testing" "time" + "github.com/drand/go-clients/drand" clock "github.com/jonboulle/clockwork" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "github.com/drand/drand/v2/common" "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" + old "github.com/drand/drand/v2/common/client" dhttp "github.com/drand/drand/v2/handler/http" - "github.com/drand/drand/v2/protobuf/drand" + proto "github.com/drand/drand/v2/protobuf/drand" "github.com/drand/drand/v2/test/mock" - localClient "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" "github.com/drand/drand/v2/crypto" ) @@ -27,7 +28,7 @@ func NewMockHTTPPublicServer(t *testing.T, badSecondRound bool, sch *crypto.Sche t.Helper() server := mock.NewMockServer(t, badSecondRound, sch, clk) - client := Proxy(server) + c := Proxy(server) ctx, cancel := context.WithCancel(context.Background()) @@ -38,7 +39,7 @@ func NewMockHTTPPublicServer(t *testing.T, badSecondRound bool, sch *crypto.Sche var chainInfo *chain.Info for i := 0; i < 3; i++ { - protoInfo, err := server.ChainInfo(ctx, &drand.ChainInfoRequest{}) + protoInfo, err := server.ChainInfo(ctx, &proto.ChainInfoRequest{}) if err != nil { t.Error("MockServer.ChainInfo error:", err) time.Sleep(10 * time.Millisecond) @@ -59,7 +60,7 @@ func NewMockHTTPPublicServer(t *testing.T, badSecondRound bool, sch *crypto.Sche t.Log("MockServer.ChainInfo:", chainInfo) - handler.RegisterDefaultBeaconHandler(handler.RegisterNewBeaconHandler(client, chainInfo.HashString())) + handler.RegisterDefaultBeaconHandler(handler.RegisterNewBeaconHandler(c, chainInfo.HashString())) listener, err := net.Listen("tcp", ":0") if err != nil { @@ -78,12 +79,13 @@ func NewMockHTTPPublicServer(t *testing.T, badSecondRound bool, sch *crypto.Sche // drandProxy is used as a proxy between a Public service (e.g. the node as a server) // and a Public Client (the client consumed by the HTTP API) type drandProxy struct { - r drand.PublicServer + r proto.PublicServer + proxyChan chan old.Result } -// Proxy wraps a server interface into a client interface so it can be queried -func Proxy(s drand.PublicServer) client.Client { - return &drandProxy{s} +// Proxy wraps a server interface into an old client interface so it can be queried +func Proxy(s proto.PublicServer) old.Client { + return &drandProxy{s, nil} } // String returns the name of this proxy. @@ -92,12 +94,12 @@ func (d *drandProxy) String() string { } // Get returns randomness at a requested round -func (d *drandProxy) Get(ctx context.Context, round uint64) (client.Result, error) { - resp, err := d.r.PublicRand(ctx, &drand.PublicRandRequest{Round: round}) +func (d *drandProxy) Get(ctx context.Context, round uint64) (old.Result, error) { + resp, err := d.r.PublicRand(ctx, &proto.PublicRandRequest{Round: round}) if err != nil { return nil, err } - return &localClient.RandomData{ + return &client.RandomData{ Rnd: resp.GetRound(), Random: crypto.RandomnessFromSignature(resp.GetSignature()), Sig: resp.GetSignature(), @@ -106,21 +108,36 @@ func (d *drandProxy) Get(ctx context.Context, round uint64) (client.Result, erro } // Watch returns new randomness as it becomes available. -func (d *drandProxy) Watch(ctx context.Context) <-chan client.Result { +func (d *drandProxy) Watch(ctx context.Context) <-chan old.Result { proxy := newStreamProxy(ctx) go func() { - err := d.r.PublicRandStream(&drand.PublicRandRequest{}, proxy) + err := d.r.PublicRandStream(&proto.PublicRandRequest{}, proxy) if err != nil { proxy.Close() } }() - return proxy.outgoing + + ch := make(chan old.Result, 1) + go func() { + for { + select { + case <-ctx.Done(): + close(ch) + return + case in := <-proxy.outgoing: + ch <- old.Result(in) + } + } + + }() + d.proxyChan = ch + return ch } // Info returns the parameters of the chain this client is connected to. // The public key, when it started, and how frequently it updates. func (d *drandProxy) Info(ctx context.Context) (*chain.Info, error) { - info, err := d.r.ChainInfo(ctx, &drand.ChainInfoRequest{}) + info, err := d.r.ChainInfo(ctx, &proto.ChainInfoRequest{}) if err != nil { return nil, err } @@ -147,7 +164,7 @@ func (d *drandProxy) Close() error { type streamProxy struct { ctx context.Context cancel context.CancelFunc - outgoing chan client.Result + outgoing chan drand.Result } func newStreamProxy(ctx context.Context) *streamProxy { @@ -155,12 +172,12 @@ func newStreamProxy(ctx context.Context) *streamProxy { s := streamProxy{ ctx: ctx, cancel: cancel, - outgoing: make(chan client.Result, 1), + outgoing: make(chan drand.Result, 1), } return &s } -func (s *streamProxy) Send(next *drand.PublicRandResponse) error { +func (s *streamProxy) Send(next *proto.PublicRandResponse) error { d := common.Beacon{ Round: next.Round, Signature: next.Signature, diff --git a/client/utils_test.go b/client/utils_test.go index 8ee8631..1789613 100644 --- a/client/utils_test.go +++ b/client/utils_test.go @@ -8,8 +8,9 @@ import ( "github.com/stretchr/testify/require" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/key" "github.com/drand/drand/v2/crypto" ) @@ -30,7 +31,7 @@ func fakeChainInfo(t *testing.T) *chain.Info { } } -func latestResult(t *testing.T, c client.Client) client.Result { +func latestResult(t *testing.T, c drand.Client) drand.Result { t.Helper() r, err := c.Get(context.Background(), 0) if err != nil { @@ -40,7 +41,7 @@ func latestResult(t *testing.T, c client.Client) client.Result { } // nextResult reads the next result from the channel and fails the test if it closes before a value is read. -func nextResult(t *testing.T, ch <-chan client.Result) client.Result { +func nextResult(t *testing.T, ch <-chan drand.Result) drand.Result { t.Helper() select { @@ -56,7 +57,7 @@ func nextResult(t *testing.T, ch <-chan client.Result) client.Result { } // compareResults asserts that two results are the same. -func compareResults(t *testing.T, a, b client.Result) { +func compareResults(t *testing.T, a, b drand.Result) { t.Helper() if a.GetRound() != b.GetRound() { diff --git a/client/verify.go b/client/verify.go index d550deb..c2fce1a 100644 --- a/client/verify.go +++ b/client/verify.go @@ -7,20 +7,20 @@ import ( "github.com/drand/drand/v2/common" chain2 "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" + "github.com/drand/go-clients/drand" ) type verifyingClient struct { // Client is the wrapped client. calls to `get` and `watch` return results proxied from this client's fetch - client.Client + drand.Client // indirectClient is used to fetch other rounds of randomness needed for verification. // it is separated so that it can provide a cache or shared pool that the direct client may not. - indirectClient client.Client + indirectClient drand.Client - pointOfTrust client.Result + pointOfTrust drand.Result potLk sync.Mutex strict bool @@ -29,7 +29,7 @@ type verifyingClient struct { } // newVerifyingClient wraps a client to perform `chain.Verify` on emitted results. -func newVerifyingClient(c client.Client, previousResult client.Result, strict bool, sch *crypto.Scheme) client.Client { +func newVerifyingClient(c drand.Client, previousResult drand.Result, strict bool, sch *crypto.Scheme) drand.Client { return &verifyingClient{ Client: c, indirectClient: c, @@ -46,7 +46,7 @@ func (v *verifyingClient) SetLog(l log.Logger) { } // Get returns a requested round of randomness -func (v *verifyingClient) Get(ctx context.Context, round uint64) (client.Result, error) { +func (v *verifyingClient) Get(ctx context.Context, round uint64) (drand.Result, error) { info, err := v.indirectClient.Info(ctx) if err != nil { return nil, err @@ -66,8 +66,8 @@ func (v *verifyingClient) Get(ctx context.Context, round uint64) (client.Result, } // Watch returns new randomness as it becomes available. -func (v *verifyingClient) Watch(ctx context.Context) <-chan client.Result { - outCh := make(chan client.Result, 1) +func (v *verifyingClient) Watch(ctx context.Context) <-chan drand.Result { + outCh := make(chan drand.Result, 1) info, err := v.indirectClient.Info(ctx) if err != nil { @@ -95,7 +95,7 @@ type resultWithPreviousSignature interface { GetPreviousSignature() []byte } -func asRandomData(r client.Result) *RandomData { +func asRandomData(r drand.Result) *RandomData { rd, ok := r.(*RandomData) if ok { rd.Random = crypto.RandomnessFromSignature(rd.GetSignature()) @@ -142,7 +142,7 @@ func (v *verifyingClient) getTrustedPreviousSignature(ctx context.Context, round } initialTrustRound := trustRound - var next client.Result + var next drand.Result for trustRound < round-1 { trustRound++ v.log.Warnw("", "verifying_client", "loading round to verify", "round", trustRound) diff --git a/client/verify_test.go b/client/verify_test.go index 500399d..eee4293 100644 --- a/client/verify_test.go +++ b/client/verify_test.go @@ -7,15 +7,16 @@ import ( "github.com/stretchr/testify/require" - "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" - client2 "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" clientMock "github.com/drand/go-clients/client/mock" "github.com/drand/go-clients/client/test/result/mock" ) -func mockClientWithVerifiableResults(ctx context.Context, t *testing.T, l log.Logger, n int, strictRounds bool) (client.Client, []mock.Result) { +func mockClientWithVerifiableResults(ctx context.Context, t *testing.T, l log.Logger, n int, strictRounds bool) (drand.Client, []mock.Result) { t.Helper() sch, err := crypto.GetSchemeFromEnv() require.NoError(t, err) @@ -23,15 +24,13 @@ func mockClientWithVerifiableResults(ctx context.Context, t *testing.T, l log.Lo info, results := mock.VerifiableResults(n, sch) mc := clientMock.Client{Results: results, StrictRounds: strictRounds, OptionalInfo: info} - var c client.Client + var c drand.Client - c, err = client2.Wrap( - ctx, - l, - []client.Client{clientMock.ClientWithInfo(info), &mc}, - client2.WithChainInfo(info), - client2.WithVerifiedResult(&results[0]), - client2.WithFullChainVerification(), + c, err = client.Wrap( + []drand.Client{clientMock.ClientWithInfo(info), &mc}, + client.WithChainInfo(info), + client.WithTrustedResult(&results[0]), + client.WithFullChainVerification(), ) require.NoError(t, err) diff --git a/client/watcher.go b/client/watcher.go index 3c6a96d..8923117 100644 --- a/client/watcher.go +++ b/client/watcher.go @@ -7,15 +7,15 @@ import ( "github.com/hashicorp/go-multierror" - "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" ) type watcherClient struct { - client.Client + drand.Client watcher Watcher } -func (c *watcherClient) Watch(ctx context.Context) <-chan client.Result { +func (c *watcherClient) Watch(ctx context.Context) <-chan drand.Result { return c.watcher.Watch(ctx) } diff --git a/client/watcher_test.go b/client/watcher_test.go index d6cd6a2..5d9a2fb 100644 --- a/client/watcher_test.go +++ b/client/watcher_test.go @@ -6,9 +6,9 @@ import ( "testing" "time" - "github.com/drand/drand/v2/common/client" clientMock "github.com/drand/go-clients/client/mock" "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" ) func TestWatcherWatch(t *testing.T) { @@ -17,7 +17,7 @@ func TestWatcherWatch(t *testing.T) { {Rnd: 2, Rand: []byte{2}}, } - ch := make(chan client.Result, len(results)) + ch := make(chan drand.Result, len(results)) for i := range results { ch <- &results[i] } diff --git a/drand/errors.go b/drand/errors.go new file mode 100644 index 0000000..4c4a951 --- /dev/null +++ b/drand/errors.go @@ -0,0 +1,11 @@ +package drand + +import ( + "errors" +) + +// ErrInvalidChainHash means there was an error or a mismatch with the chain hash +var ErrInvalidChainHash = errors.New("incorrect chain hash") + +// ErrEmptyClientUnsupportedGet means this client does not support Get +var ErrEmptyClientUnsupportedGet = errors.New("unsupported method Get was used") diff --git a/drand/interfaces.go b/drand/interfaces.go new file mode 100644 index 0000000..94b5e46 --- /dev/null +++ b/drand/interfaces.go @@ -0,0 +1,47 @@ +package drand + +import ( + "context" + "io" + "time" + + "github.com/drand/drand/v2/common/chain" + "github.com/drand/drand/v2/common/log" +) + +// Client represents the drand Client interface. +type Client interface { + // Get returns the randomness at `round` or an error. + // Requesting round = 0 will return randomness for the most + // recent known round, bounded at a minimum to the `RoundAt(time.Now())` + Get(ctx context.Context, round uint64) (Result, error) + + // Watch returns new randomness as it becomes available. + Watch(ctx context.Context) <-chan Result + + // Info returns the parameters of the chain this client is connected to. + // The public key, when it started, and how frequently it updates. + Info(ctx context.Context) (*chain.Info, error) + + // RoundAt will return the most recent round of randomness that will be available + // at time for the current client. + RoundAt(time time.Time) uint64 + + // Closer means Close() will halt the client, any background processes it runs and any + // in-flight Get, Watch or Info requests. Behavior for usage of the client + // after Close is called is undefined. + io.Closer +} + +// Result represents the randomness for a single drand round. +type Result interface { + GetRound() uint64 + GetRandomness() []byte + GetPreviousSignature() []byte + GetSignature() []byte +} + +// LoggingClient sets the logger for use by clients that support it +type LoggingClient interface { + SetLog(log.Logger) +} diff --git a/examples/doc_test.go b/examples/doc_test.go new file mode 100644 index 0000000..c5761b7 --- /dev/null +++ b/examples/doc_test.go @@ -0,0 +1,37 @@ +package examples_test + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/drand/drand/v2/common/log" + "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client/http" +) + +var chainHash = "8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce" + +func Example_Basic_Test() { + lg := log.New(nil, log.DebugLevel, true) + + httpClient, err := http.NewSimpleClient("http://api.drand.sh/", chainHash) + chb, err := hex.DecodeString(chainHash) + + c, err := client.New(client.From(httpClient), // use a concrete client implementations + client.WithChainHash(chb), + client.WithLogger(lg), + ) + if err != nil { + panic(err) + } + + // e.g. use the client to get the latest randomness round: + r, err := c.Get(context.Background(), 1) + if err != nil { + panic(err) + } + + fmt.Println(r.GetRound()) + //output: 1 +} diff --git a/internal/cli.go b/internal/cli.go index cf1bc48..90a4fe7 100644 --- a/internal/cli.go +++ b/internal/cli.go @@ -9,7 +9,7 @@ import ( json "github.com/nikkolasg/hexjson" "github.com/urfave/cli/v2" - client "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" "github.com/drand/drand/v2/common" "github.com/drand/go-clients/internal/lib" @@ -91,7 +91,7 @@ func toArray(flags ...cli.Flag) []cli.Flag { return flags } -func instantiateClient(cctx *cli.Context) (client.Client, error) { +func instantiateClient(cctx *cli.Context) (drand.Client, error) { c, err := lib.Create(cctx, false) if err != nil { return nil, fmt.Errorf("constructing client: %w", err) diff --git a/internal/grpc/client.go b/internal/grpc/client.go index 3a731bc..e0d40f8 100644 --- a/internal/grpc/client.go +++ b/internal/grpc/client.go @@ -12,14 +12,15 @@ import ( "google.golang.org/grpc/credentials" grpcInsec "google.golang.org/grpc/credentials/insecure" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/crypto" commonutils "github.com/drand/drand/v2/common" "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" - "github.com/drand/drand/v2/protobuf/drand" - localClient "github.com/drand/go-clients/client" + proto "github.com/drand/drand/v2/protobuf/drand" + "github.com/drand/go-clients/client" ) const grpcDefaultTimeout = 5 * time.Second @@ -27,13 +28,13 @@ const grpcDefaultTimeout = 5 * time.Second type grpcClient struct { address string chainHash []byte - client drand.PublicClient + client proto.PublicClient conn *grpc.ClientConn l log.Logger } // New creates a drand client backed by a GRPC connection. -func New(address string, insecure bool, chainHash []byte) (client.Client, error) { +func New(address string, insecure bool, chainHash []byte) (drand.Client, error) { var opts []grpc.DialOption if insecure { opts = append(opts, grpc.WithTransportCredentials(grpcInsec.NewCredentials())) @@ -49,11 +50,11 @@ func New(address string, insecure bool, chainHash []byte) (client.Client, error) return nil, err } - return &grpcClient{address, chainHash, drand.NewPublicClient(conn), conn, log.DefaultLogger()}, nil + return &grpcClient{address, chainHash, proto.NewPublicClient(conn), conn, log.DefaultLogger()}, nil } -func asRD(r *drand.PublicRandResponse) *localClient.RandomData { - return &localClient.RandomData{ +func asRD(r *proto.PublicRandResponse) *client.RandomData { + return &client.RandomData{ Rnd: r.GetRound(), Random: crypto.RandomnessFromSignature(r.GetSignature()), Sig: r.GetSignature(), @@ -67,8 +68,8 @@ func (g *grpcClient) String() string { } // Get returns a the randomness at `round` or an error. -func (g *grpcClient) Get(ctx context.Context, round uint64) (client.Result, error) { - curr, err := g.client.PublicRand(ctx, &drand.PublicRandRequest{Round: round, Metadata: g.getMetadata()}) +func (g *grpcClient) Get(ctx context.Context, round uint64) (drand.Result, error) { + curr, err := g.client.PublicRand(ctx, &proto.PublicRandRequest{Round: round, Metadata: g.getMetadata()}) if err != nil { return nil, err } @@ -80,9 +81,9 @@ func (g *grpcClient) Get(ctx context.Context, round uint64) (client.Result, erro } // Watch returns new randomness as it becomes available. -func (g *grpcClient) Watch(ctx context.Context) <-chan client.Result { - stream, err := g.client.PublicRandStream(ctx, &drand.PublicRandRequest{Round: 0, Metadata: g.getMetadata()}) - ch := make(chan client.Result, 1) +func (g *grpcClient) Watch(ctx context.Context) <-chan drand.Result { + stream, err := g.client.PublicRandStream(ctx, &proto.PublicRandRequest{Round: 0, Metadata: g.getMetadata()}) + ch := make(chan drand.Result, 1) if err != nil { close(ch) return ch @@ -93,17 +94,17 @@ func (g *grpcClient) Watch(ctx context.Context) <-chan client.Result { // Info returns information about the chain. func (g *grpcClient) Info(ctx context.Context) (*chain.Info, error) { - proto, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{Metadata: g.getMetadata()}) + p, err := g.client.ChainInfo(ctx, &proto.ChainInfoRequest{Metadata: g.getMetadata()}) if err != nil { return nil, err } - if proto == nil { + if p == nil { return nil, errors.New("no received group - unexpected gPRC response") } - return chain.InfoFromProto(proto) + return chain.InfoFromProto(p) } -func (g *grpcClient) translate(stream drand.Public_PublicRandStreamClient, out chan<- client.Result) { +func (g *grpcClient) translate(stream proto.Public_PublicRandStreamClient, out chan<- drand.Result) { defer close(out) for { next, err := stream.Recv() @@ -117,19 +118,19 @@ func (g *grpcClient) translate(stream drand.Public_PublicRandStreamClient, out c } } -func (g *grpcClient) getMetadata() *drand.Metadata { - return &drand.Metadata{ChainHash: g.chainHash} +func (g *grpcClient) getMetadata() *proto.Metadata { + return &proto.Metadata{ChainHash: g.chainHash} } func (g *grpcClient) RoundAt(t time.Time) uint64 { ctx, cancel := context.WithTimeout(context.Background(), grpcDefaultTimeout) defer cancel() - info, err := g.client.ChainInfo(ctx, &drand.ChainInfoRequest{Metadata: g.getMetadata()}) + info, err := g.client.ChainInfo(ctx, &proto.ChainInfoRequest{Metadata: g.getMetadata()}) if err != nil { return 0 } - return commonutils.CurrentRound(t.Unix(), time.Second*time.Duration(info.Period), info.GenesisTime) + return commonutils.CurrentRound(t.Unix(), time.Second*time.Duration(info.GetPeriod()), info.GetGenesisTime()) } // SetLog configures the client log output diff --git a/internal/grpc/doc.go b/internal/grpc/doc.go index f61c586..dffefdd 100644 --- a/internal/grpc/doc.go +++ b/internal/grpc/doc.go @@ -5,33 +5,6 @@ The client connects to a drand gRPC endpoint to fetch randomness. The gRPC client has some advantages over the HTTP client - it is more compact on-the-wire and supports streaming and authentication. -Example: - - package main - - import ( - "encoding/hex" - - "github.com/drand/drand/v2/client" - "github.com/drand/drand/v2/client/grpc" - ) - - const ( - grpcAddr = "example.drand.grpc.server:4444" - certPath = "/path/to/drand-grpc.cert" - ) - - var chainHash, _ = hex.DecodeString("8990e7a9aaed2ffed73dbd7092123d6f289930540d7651336225dc172e51b2ce") - - func main() { - gc, err := grpc.New(grpcAddr, certPath, false) - - c, err := client.New( - client.From(gc), - client.WithChainHash(chainHash), - ) - } - A path to a file that holds TLS credentials for the drand server is required to validate server connections. Alternatively set the final parameter to `true` to enable _insecure_ connections (not recommended). diff --git a/internal/lib/cli.go b/internal/lib/cli.go index 36f65f3..1514871 100644 --- a/internal/lib/cli.go +++ b/internal/lib/cli.go @@ -13,18 +13,17 @@ import ( "github.com/BurntSushi/toml" "github.com/google/uuid" - clock "github.com/jonboulle/clockwork" pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/key" - commonutils "github.com/drand/drand/v2/common" chainCommon "github.com/drand/drand/v2/common/chain" - "github.com/drand/drand/v2/common/client" "github.com/drand/drand/v2/common/log" - pubClient "github.com/drand/go-clients/client" + "github.com/drand/go-clients/client" http2 "github.com/drand/go-clients/client/http" gclient "github.com/drand/go-clients/client/lp2p" "github.com/drand/go-clients/internal/grpc" @@ -116,9 +115,8 @@ var ClientFlags = []cli.Flag{ // with ClientFlags // //nolint:gocyclo -func Create(c *cli.Context, withInstrumentation bool, opts ...pubClient.Option) (client.Client, error) { - ctx := c.Context - clients := make([]client.Client, 0) +func Create(c *cli.Context, withInstrumentation bool, opts ...client.Option) (drand.Client, error) { + clients := make([]drand.Client, 0) var level int if c.Bool(VerboseFlag.Name) { level = log.DebugLevel @@ -142,7 +140,7 @@ func Create(c *cli.Context, withInstrumentation bool, opts ...pubClient.Option) } l.Debugw("parsing group-conf file, successful") - opts = append(opts, pubClient.WithChainInfo(info)) + opts = append(opts, client.WithChainInfo(info)) } if info != nil { @@ -166,17 +164,17 @@ func Create(c *cli.Context, withInstrumentation bool, opts ...pubClient.Option) if info != nil && !bytes.Equal(hash, info.Hash()) { return nil, fmt.Errorf( "%w for beacon %s %v != %v", - commonutils.ErrInvalidChainHash, + drand.ErrInvalidChainHash, info.ID, c.String(HashFlag.Name), hex.EncodeToString(info.Hash()), ) } - opts = append(opts, pubClient.WithChainHash(hash)) + opts = append(opts, client.WithChainHash(hash)) } if c.Bool(InsecureFlag.Name) { - opts = append(opts, pubClient.Insecurely()) + opts = append(opts, client.Insecurely()) } gc, info, err := buildHTTPClients(c, l, hash, withInstrumentation) @@ -191,7 +189,7 @@ func Create(c *cli.Context, withInstrumentation bool, opts ...pubClient.Option) if info != nil && hash != nil && !bytes.Equal(hash, info.Hash()) { return nil, fmt.Errorf( "%w for beacon %s : expected %v != info %v", - commonutils.ErrInvalidChainHash, + drand.ErrInvalidChainHash, info.ID, hex.EncodeToString(hash), hex.EncodeToString(info.Hash()), @@ -204,10 +202,10 @@ func Create(c *cli.Context, withInstrumentation bool, opts ...pubClient.Option) } opts = append(opts, gopt...) - return pubClient.Wrap(ctx, l, clients, opts...) + return client.Wrap(clients, opts...) } -func buildGrpcClient(c *cli.Context, info *chainCommon.Info) ([]client.Client, *chainCommon.Info, error) { +func buildGrpcClient(c *cli.Context, info *chainCommon.Info) ([]drand.Client, *chainCommon.Info, error) { if !c.IsSet(GRPCConnectFlag.Name) { return nil, info, nil } @@ -238,15 +236,15 @@ func buildGrpcClient(c *cli.Context, info *chainCommon.Info) ([]client.Client, * } } - return []client.Client{gc}, info, nil + return []drand.Client{gc}, info, nil } -func buildHTTPClients(c *cli.Context, l log.Logger, hash []byte, withInstrumentation bool) ([]client.Client, *chainCommon.Info, error) { +func buildHTTPClients(c *cli.Context, l log.Logger, hash []byte, withInstrumentation bool) ([]drand.Client, *chainCommon.Info, error) { ctx := c.Context - clients := make([]client.Client, 0) + clients := make([]drand.Client, 0) var err error var skipped []string - var hc client.Client + var hc drand.Client var info *chainCommon.Info urls := c.StringSlice(URLFlag.Name) @@ -305,7 +303,7 @@ func buildHTTPClients(c *cli.Context, l log.Logger, hash []byte, withInstrumenta return clients, info, nil } -func buildGossipClient(c *cli.Context, l log.Logger) ([]pubClient.Option, error) { +func buildGossipClient(c *cli.Context, l log.Logger) ([]client.Option, error) { if c.IsSet(RelayFlag.Name) { addrs := c.StringSlice(RelayFlag.Name) if len(addrs) > 0 { @@ -321,10 +319,10 @@ func buildGossipClient(c *cli.Context, l log.Logger) ([]pubClient.Option, error) if err != nil { return nil, err } - return []pubClient.Option{gclient.WithPubsub(l, ps, clock.NewRealClock(), gclient.DefaultBufferSize)}, nil + return []client.Option{gclient.WithPubsub(ps)}, nil } } - return []pubClient.Option{}, nil + return []client.Option{}, nil } func buildClientHost(l log.Logger, clientListenAddr string, relayMultiaddr []ma.Multiaddr) (*pubsub.PubSub, error) { diff --git a/internal/lib/cli_test.go b/internal/lib/cli_test.go index 7e1945b..2df830e 100644 --- a/internal/lib/cli_test.go +++ b/internal/lib/cli_test.go @@ -10,11 +10,11 @@ import ( "testing" "time" + "github.com/drand/go-clients/drand" clock "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" - commonutils "github.com/drand/drand/v2/common" "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" "github.com/drand/go-clients/client" @@ -144,7 +144,7 @@ func TestClientLibChainHashOverrideError(t *testing.T) { "--hash", fakeChainHash, }) - if !errors.Is(err, commonutils.ErrInvalidChainHash) { + if !errors.Is(err, drand.ErrInvalidChainHash) { t.Log(fakeChainHash) t.Fatal("expected error from mismatched chain hashes. Got: ", err) } @@ -155,5 +155,5 @@ func groupTOMLPath() string { if !ok { return "" } - return filepath.Join(filepath.Dir(file), "..", "..", "internal", "test", "default.toml") + return filepath.Join(filepath.Dir(file), "..", "..", "internal", "testdata", "default.toml") } diff --git a/internal/lp2p/ctor.go b/internal/lp2p/ctor.go index 2929919..bfe1c74 100644 --- a/internal/lp2p/ctor.go +++ b/internal/lp2p/ctor.go @@ -117,7 +117,7 @@ func ConstructHost(priv crypto.PrivKey, listenAddr string, bootstrap []ma.Multia err := h.Connect(ctx, ai) cancel() if err != nil { - log.Warnw("", "construct_host", "could not bootstrap", "addr", ai) + log.Warnw("[ConstructHost] could not bootstrap", "with_addr", ai) } } }() diff --git a/internal/lp2p/relaynode.go b/internal/lp2p/relaynode.go index cfe3c9e..c79b8df 100644 --- a/internal/lp2p/relaynode.go +++ b/internal/lp2p/relaynode.go @@ -11,10 +11,11 @@ import ( ma "github.com/multiformats/go-multiaddr" "google.golang.org/protobuf/proto" - "github.com/drand/drand/v2/common/client" + "github.com/drand/go-clients/drand" + "github.com/drand/drand/v2/common/log" - "github.com/drand/drand/v2/protobuf/drand" - client2 "github.com/drand/go-clients/client" + protod "github.com/drand/drand/v2/protobuf/drand" + "github.com/drand/go-clients/client" ) // GossipRelayConfig configures a gossip-relay relay node. @@ -27,7 +28,7 @@ type GossipRelayConfig struct { IdentityPath string CertPath string Insecure bool - Client client.Client + Client drand.Client } // GossipRelayNode is a gossip-relay relay runtime. @@ -125,7 +126,7 @@ func ParseMultiaddrSlice(peers []string) ([]ma.Multiaddr, error) { return out, nil } -func (g *GossipRelayNode) background(w client2.Watcher) { +func (g *GossipRelayNode) background(w client.Watcher) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { @@ -139,13 +140,13 @@ func (g *GossipRelayNode) background(w client2.Watcher) { break LOOP } - rd, ok := res.(*client2.RandomData) + rd, ok := res.(*client.RandomData) if !ok { g.l.Errorw("", "relay_node", "unexpected client result type") continue } - randB, err := proto.Marshal(&drand.PublicRandResponse{ + randB, err := proto.Marshal(&protod.PublicRandResponse{ Round: res.GetRound(), Signature: res.GetSignature(), PreviousSignature: rd.GetPreviousSignature(), diff --git a/internal/lp2p/relaynode_test.go b/internal/lp2p/relaynode_test.go index a987183..4aad8d5 100644 --- a/internal/lp2p/relaynode_test.go +++ b/internal/lp2p/relaynode_test.go @@ -11,26 +11,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/drand/drand/v2/common/chain" "github.com/drand/drand/v2/common/key" "github.com/drand/drand/v2/common/log" "github.com/drand/drand/v2/crypto" - "github.com/drand/drand/v2/common/chain" - client2 "github.com/drand/drand/v2/common/client" "github.com/drand/go-clients/client" "github.com/drand/go-clients/client/test/result/mock" + "github.com/drand/go-clients/drand" ) type mockClient struct { chainInfo *chain.Info - watchF func(context.Context) <-chan client2.Result + watchF func(context.Context) <-chan drand.Result } -func (c *mockClient) Get(_ context.Context, _ uint64) (client2.Result, error) { +func (c *mockClient) Get(_ context.Context, _ uint64) (drand.Result, error) { return nil, errors.New("unsupported") } -func (c *mockClient) Watch(ctx context.Context) <-chan client2.Result { +func (c *mockClient) Watch(ctx context.Context) <-chan drand.Result { return c.watchF(ctx) } @@ -85,8 +85,8 @@ func TestWatchRetryOnClose(t *testing.T) { wg.Add(len(results)) // return a channel that writes one result then closes - watchF := func(context.Context) <-chan client2.Result { - ch := make(chan client2.Result, 1) + watchF := func(context.Context) <-chan drand.Result { + ch := make(chan drand.Result, 1) if len(results) > 0 { res := results[0] results = results[1:] diff --git a/internal/test/default.toml b/internal/testdata/default.toml similarity index 100% rename from internal/test/default.toml rename to internal/testdata/default.toml