Skip to content

Commit

Permalink
Support idempotent terminator creation. Fixes #450
Browse files Browse the repository at this point in the history
Use consistent keys for a given service listener, so that repeated requests will work. Provide a listener id, so that the router can correlate requests.
  • Loading branch information
plorenz committed Nov 8, 2023
1 parent 195164f commit e14acba
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 10 deletions.
3 changes: 3 additions & 0 deletions ziti/edge/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package edge
import (
"fmt"
"github.com/openziti/edge-api/rest_model"
"github.com/openziti/secretstream/kx"
"io"
"net"
"os"
Expand Down Expand Up @@ -219,6 +220,8 @@ type ListenOptions struct {
IdentitySecret string
BindUsingEdgeIdentity bool
ManualStart bool
ListenerId string
KeyPair *kx.KeyPair
}

func (options *ListenOptions) GetConnectTimeout() time.Duration {
Expand Down
7 changes: 6 additions & 1 deletion ziti/edge/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const (
TraceHopIdHeader = 1018
TraceSourceRequestIdHeader = 1019
TraceError = 1020
SdkProvidedTerminatorId = 1021
ListenerId = 1021

ErrorCodeInternal = 1
ErrorCodeInvalidApiSession = 2
Expand Down Expand Up @@ -218,6 +218,11 @@ func NewBindMsg(connId uint32, token string, pubKey []byte, options *ListenOptio
msg.Headers[TerminatorIdentitySecretHeader] = []byte(options.IdentitySecret)
}
}

if options.ListenerId != "" {
msg.Headers[ListenerId] = []byte(options.ListenerId)
}

msg.PutBoolHeader(RouterProvidedConnId, true)
return msg
}
Expand Down
26 changes: 23 additions & 3 deletions ziti/edge/network/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (conn *routerConn) BindChannel(binding channel.Binding) error {
return nil
}

func (conn *routerConn) NewConn(service *rest_model.ServiceDetail, connType ConnType) *edgeConn {
func (conn *routerConn) NewDialConn(service *rest_model.ServiceDetail, connType ConnType) *edgeConn {
id := conn.msgMux.GetNextId()

edgeCh := &edgeConn{
Expand All @@ -103,8 +103,28 @@ func (conn *routerConn) NewConn(service *rest_model.ServiceDetail, connType Conn
return edgeCh
}

func (conn *routerConn) NewListenConn(service *rest_model.ServiceDetail, connType ConnType, keyPair *kx.KeyPair) *edgeConn {
id := conn.msgMux.GetNextId()

edgeCh := &edgeConn{
MsgChannel: *edge.NewEdgeMsgChannel(conn.ch, id),
readQ: NewNoopSequencer[*channel.Message](4),
msgMux: conn.msgMux,
serviceId: *service.Name,
connType: connType,
keyPair: keyPair,
crypto: keyPair != nil,
}

// duplicate errors only happen on the server side, since client controls ids
if err := conn.msgMux.AddMsgSink(edgeCh); err != nil {
pfxlog.Logger().Warnf("error adding message sink %s[%d]: %v", *service.Name, id, err)
}
return edgeCh
}

func (conn *routerConn) Connect(service *rest_model.ServiceDetail, session *rest_model.SessionDetail, options *edge.DialOptions) (edge.Conn, error) {
ec := conn.NewConn(service, ConnTypeDial)
ec := conn.NewDialConn(service, ConnTypeDial)
dialConn, err := ec.Connect(session, options)
if err != nil {
if err2 := ec.Close(); err2 != nil {
Expand All @@ -115,7 +135,7 @@ func (conn *routerConn) Connect(service *rest_model.ServiceDetail, session *rest
}

func (conn *routerConn) Listen(service *rest_model.ServiceDetail, session *rest_model.SessionDetail, options *edge.ListenOptions) (edge.Listener, error) {
ec := conn.NewConn(service, ConnTypeBind)
ec := conn.NewListenConn(service, ConnTypeBind, options.KeyPair)
listener, err := ec.Listen(session, service, options)
if err != nil {
if err2 := ec.Close(); err2 != nil {
Expand Down
29 changes: 23 additions & 6 deletions ziti/ziti.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"encoding/json"
"fmt"
"github.com/go-openapi/strfmt"
"github.com/google/uuid"
"github.com/kataras/go-events"
"github.com/openziti/edge-api/rest_client_api_client/authentication"
"github.com/openziti/edge-api/rest_client_api_client/service"
rest_session "github.com/openziti/edge-api/rest_client_api_client/session"
apis "github.com/openziti/sdk-golang/edge-apis"
"github.com/openziti/secretstream/kx"
"math"
"net"
"reflect"
Expand Down Expand Up @@ -1001,12 +1003,12 @@ func (context *ContextImpl) ListenWithOptions(serviceName string, options *Liste
}

if s, ok := context.GetService(serviceName); ok {
return context.listenSession(s, options), nil
return context.listenSession(s, options)
}
return nil, errors.Errorf("service '%s' not found in ZT", serviceName)
}

func (context *ContextImpl) listenSession(service *rest_model.ServiceDetail, options *ListenOptions) edge.Listener {
func (context *ContextImpl) listenSession(service *rest_model.ServiceDetail, options *ListenOptions) (edge.Listener, error) {
edgeListenOptions := &edge.ListenOptions{
Cost: options.Cost,
Precedence: edge.Precedence(options.Precedence),
Expand All @@ -1025,8 +1027,11 @@ func (context *ContextImpl) listenSession(service *rest_model.ServiceDetail, opt
edgeListenOptions.MaxConnections = 1
}

listenerMgr := newListenerManager(service, context, edgeListenOptions)
return listenerMgr.listener
if listenerMgr, err := newListenerManager(service, context, edgeListenOptions); err != nil {
return nil, err
} else {
return listenerMgr.listener, nil
}
}

func (context *ContextImpl) getEdgeRouterConn(session *rest_model.SessionDetail, options edge.ConnOptions) (edge.RouterConn, error) {
Expand Down Expand Up @@ -1411,9 +1416,21 @@ func (context *ContextImpl) RemoveZitiMfa(code string) error {
return context.CtrlClt.RemoveMfa(code)
}

func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl, options *edge.ListenOptions) *listenerManager {
func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl, options *edge.ListenOptions) (*listenerManager, error) {
now := time.Now()

var keyPair *kx.KeyPair
if service.EncryptionRequired != nil && *service.EncryptionRequired {
var err error
keyPair, err = kx.NewKeyPair()
if err != nil {
return nil, errors.Wrapf(err, "unable to create end-to-end encrytpion key-pair while hosting service '%s'", *service.Name)
}
}

options.KeyPair = keyPair
options.ListenerId = uuid.NewString()

listenerMgr := &listenerManager{
service: service,
context: context,
Expand All @@ -1429,7 +1446,7 @@ func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl,

go listenerMgr.run()

return listenerMgr
return listenerMgr, nil
}

type listenerManager struct {
Expand Down

0 comments on commit e14acba

Please sign in to comment.