Skip to content

Commit

Permalink
Now supporting subscribers with tls connection
Browse files Browse the repository at this point in the history
  • Loading branch information
kpachhai committed Dec 2, 2024
1 parent c936a17 commit ab62959
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 2 deletions.
116 changes: 116 additions & 0 deletions extension/externalsubscriber/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (C) 2024, Nuklai. All rights reserved.
// See the file LICENSE for licensing terms.

package externalsubscriber

import (
"context"
"crypto/tls"
"fmt"
"net"
"strings"

"github.com/ava-labs/avalanchego/utils/logging"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/event"

pb "github.com/ava-labs/hypersdk/proto/pb/externalsubscriber"
)

var _ event.Subscription[*chain.ExecutedBlock] = (*ExternalSubscriberClient)(nil)

type ExternalSubscriberClient struct {
conn *grpc.ClientConn
client pb.ExternalSubscriberClient
log logging.Logger
}

func NewExternalSubscriberClient(
ctx context.Context,
log logging.Logger,
serverAddr string,
genesisBytes []byte,
) (*ExternalSubscriberClient, error) {
// Normalize server address by removing "https://" if present
serverAddr, useTLS := normalizeServerAddress(serverAddr)

// Setup connection options
var opts []grpc.DialOption
if useTLS {
// Use default TLS credentials
creds := credentials.NewTLS(&tls.Config{})
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
// Use insecure credentials for plaintext communication
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

// Establish connection to server
conn, err := grpc.Dial(serverAddr, opts...)
if err != nil {
return nil, fmt.Errorf("failed to dial external subscriber server: %w", err)
}
client := pb.NewExternalSubscriberClient(conn)

// Initialize the connection with the external subscriber server
_, err = client.Initialize(ctx, &pb.InitializeRequest{
Genesis: genesisBytes,
})
if err != nil {
conn.Close() // Close connection on initialization failure
return nil, fmt.Errorf("failed to initialize external subscriber client: %w", err)
}

log.Debug("connected to external subscriber server", zap.String("address", serverAddr), zap.Bool("useTLS", useTLS))
return &ExternalSubscriberClient{
conn: conn,
client: client,
log: log,
}, nil
}

func (e *ExternalSubscriberClient) Accept(blk *chain.ExecutedBlock) error {
blockBytes, err := blk.Marshal()
if err != nil {
return err
}

req := &pb.BlockRequest{
BlockData: blockBytes,
}
e.log.Debug("sending accepted block to server",
zap.Stringer("blockID", blk.BlockID),
zap.Uint64("blockHeight", blk.Block.Hght),
)
_, err = e.client.AcceptBlock(context.TODO(), req)
return err
}

func (e *ExternalSubscriberClient) Close() error {
return e.conn.Close()
}

// Helper function to determine if TLS should be used and normalize the server address
func normalizeServerAddress(serverAddr string) (string, bool) {
useTLS := false

// Remove "https://" prefix if present and set useTLS to true
if strings.HasPrefix(serverAddr, "https://") {
serverAddr = strings.TrimPrefix(serverAddr, "https://")
useTLS = true
}

// Check if the port is 443
host, port, err := net.SplitHostPort(serverAddr)
if err == nil && port == "443" {
useTLS = true
serverAddr = net.JoinHostPort(host, port)
}

return serverAddr, useTLS
}
37 changes: 37 additions & 0 deletions extension/externalsubscriber/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package externalsubscriber

import (
"context"

"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/event"
es "github.com/ava-labs/hypersdk/extension/externalsubscriber"
"github.com/ava-labs/hypersdk/vm"
)

func OptionFunc(v *vm.VM, config es.Config) error {
if !config.Enabled {
return nil
}
server, err := NewExternalSubscriberClient(
context.TODO(),
v.Logger(),
config.ServerAddress,
v.GenesisBytes,
)
if err != nil {
return err
}

blockSubscription := event.SubscriptionFuncFactory[*chain.ExecutedBlock]{
AcceptF: func(blk *chain.ExecutedBlock) error {
return server.Accept(blk)
},
}

vm.WithBlockSubscriptions(blockSubscription)(v)
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.62.0
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -147,7 +148,6 @@ require (
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
4 changes: 3 additions & 1 deletion vm/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ava-labs/hypersdk/extension/externalsubscriber"
"github.com/ava-labs/hypersdk/vm"
"github.com/ava-labs/hypersdk/x/contracts/runtime"

nuklaivmES "github.com/nuklai/nuklaivm/extension/externalsubscriber"
)

const (
Expand Down Expand Up @@ -50,7 +52,7 @@ func WithExternalSubscriber(cfg config.Config) vm.Option {
return vm.NewOption(externalsubscriber.Namespace, externalsubscriber.Config{
Enabled: true,
ServerAddress: cfg.ExternalSubscriberAddr,
}, externalsubscriber.OptionFunc)
}, nuklaivmES.OptionFunc)
}
return vm.NewOption(externalsubscriber.Namespace, externalsubscriber.Config{}, externalsubscriber.OptionFunc)
}
Expand Down

0 comments on commit ab62959

Please sign in to comment.