Skip to content

Commit

Permalink
rework DNS to support direct DNS forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
mandelsoft committed Jun 13, 2020
1 parent a188220 commit d72f6c2
Show file tree
Hide file tree
Showing 22 changed files with 749 additions and 346 deletions.
124 changes: 89 additions & 35 deletions README.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions examples/kubelink1/32-broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ spec:
- --ifce-name=kubelink
- --service-cidr=100.64.0.0/20
- --ipip=shared
- --dns-advertisement
- --dns-propagation=dns
- --coredns-configure
- --cluster-name=kubelink1
securityContext:
privileged: true
livenessProbe:
Expand Down
5 changes: 5 additions & 0 deletions examples/kubelink2/32-broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ spec:
- --ifce-name=kubelink
- --service-cidr=100.64.16.0/20
- --ipip=shared
- --dns-advertisement
- --dns-propagation=dns
- --coredns-configure
- --cluster-name=kubelink2

securityContext:
privileged: true
livenessProbe:
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/kubelink/crds/kubelink.mandelsoft.org_kubelinks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ spec:
type: string
clusterAddress:
type: string
dns:
properties:
baseDomain:
type: string
dnsIP:
type: string
omitDNSPropagation:
type: boolean
type: object
egress:
items:
type: string
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/kubelink/crds/zz_generated_crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ spec:
type: string
clusterAddress:
type: string
dns:
properties:
baseDomain:
type: string
dnsIP:
type: string
omitDNSPropagation:
type: boolean
type: object
egress:
items:
type: string
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/kubelink/v1alpha1/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ type KubeLinkSpec struct {

// +optional
APIAccess *core.SecretReference `json:"apiAccess,omitempty"`

// +optional
DNS *KubeLinkDNS `json:"dns,omitempty"`
}

type KubeLinkDNS struct {
// +optional
OmitDNSPropagation *bool `json:"omitDNSPropagation,omitempty"`
// +optional
DNSIP string `json:"dnsIP,omitempty"`
// +optional
BaseDomain string `json:"baseDomain,omitempty"`
}

type KubeLinkStatus struct {
Expand Down
26 changes: 26 additions & 0 deletions pkg/apis/kubelink/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 49 additions & 25 deletions pkg/controllers/broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/mandelsoft/kubelink/pkg/apis/kubelink/v1alpha1"
"github.com/mandelsoft/kubelink/pkg/controllers"
"github.com/mandelsoft/kubelink/pkg/kubelink"
"github.com/mandelsoft/kubelink/pkg/tcp"
kutils "github.com/mandelsoft/kubelink/pkg/utils"
)

Expand All @@ -46,7 +47,7 @@ type Config struct {
service string
responsible string

ClusterAddress net.IP
ClusterAddress *net.IPNet
ClusterCIDR *net.IPNet
ClusterName string

Expand All @@ -65,18 +66,22 @@ type Config struct {
DNSName string
Service string
Interface string
MeshDomain string

coreServiceAccount string
CoreServiceAccount resources.ObjectName
MeshDomain string
coreDNSServiceIP string
CoreDNSServiceIP net.IP
CoreDNSDeployment string
CoreDNSSecret string
DNSPropagation bool
DNSAdvertisement bool
CoreDNSConfigure bool
CoreDNSMode string
serviceAccount string
ServiceAccount resources.ObjectName
DNSAdvertisement bool

DNSPropagation string
coreDNSServiceIP string
CoreDNSServiceIP net.IP
CoreDNSDeployment string
CoreDNSSecret string
CoreDNSConfigure bool

dnsServiceIP string
DNSServiceIP net.IP
ClusterDomain string

AutoConnect bool
DisableBridge bool
Expand All @@ -99,15 +104,19 @@ func (this *Config) AddOptionsToSet(set config.OptionSet) {
set.AddStringOption(&this.DNSName, "dns-name", "", "", "DNS Name for managed certificate")
set.AddStringOption(&this.Service, "service", "", "", "Service name for managed certificate")
set.AddStringOption(&this.Interface, "ifce-name", "", "", "Name of the tun interface")
set.AddStringOption(&this.coreServiceAccount, "coredns-service-account", "", "", "Service Account to use for CoreDNS API Server Access")
set.AddStringOption(&this.MeshDomain, "mesh-domain", "", "kubelink", "Base domain for cluster mesh services")

set.AddStringOption(&this.serviceAccount, "service-account", "", "", "Service Account for API Access propagation")

set.AddBoolOption(&this.DNSAdvertisement, "dns-advertisement", "", false, "Enable automatic advertisement of DNS access info")
set.AddStringOption(&this.dnsServiceIP, "dns-service-ip", "", "", "IP of Cluster DNS Service (for DNS Info Propagation)")
set.AddStringOption(&this.ClusterDomain, "cluster-domain", "", "cluster.local", "Cluster Domain of Cluster DNS Service (for DNS Info Propagation)")

set.AddStringOption(&this.DNSPropagation, "dns-propagation", "", "none", "Mode for accessing foreign DNS information (none, dns or kubernetes)")
set.AddStringOption(&this.coreDNSServiceIP, "coredns-service-ip", "", "", "Service IP of coredns deployment used by kubelink")
set.AddStringOption(&this.CoreDNSDeployment, "coredns-deployment", "", "kubelink-coredns", "Name of coredns deployment used by kubelink")
set.AddStringOption(&this.CoreDNSSecret, "coredns-secret", "", "kubelink-coredns", "Name of dns secret used by kubelink")
set.AddBoolOption(&this.DNSPropagation, "dns-propagation", "", false, "Enable DNS Record propagation for Services")
set.AddBoolOption(&this.DNSAdvertisement, "dns-advertisement", "", false, "Enable automatic advertisement of DNS access info")
set.AddBoolOption(&this.CoreDNSConfigure, "coredns-configure", "", false, "Enable automatic configuration of cluster DNS (coredns)")
set.AddStringOption(&this.CoreDNSMode, "coredns-mode", "", "kubernetes", "Mode for accessing foreign DNS information (dns or kubernetes)")
set.AddBoolOption(&this.AutoConnect, "auto-connect", "", false, "Automatically register cluster for authenticated incoming requests")
}

Expand All @@ -117,10 +126,12 @@ func (this *Config) Prepare() error {
return err
}

this.ClusterAddress, this.ClusterCIDR, err = this.RequireCIDR(this.address, "link-address")
ip, cidr, err := this.RequireCIDR(this.address, "link-address")
if err != nil {
return err
}
this.ClusterCIDR = cidr
this.ClusterAddress = tcp.CIDRIP(cidr, ip)

_, this.ServiceCIDR, err = this.OptionalCIDR(this.service, "service-cidr")
if err != nil {
Expand Down Expand Up @@ -173,29 +184,42 @@ func (this *Config) Prepare() error {
}
}

if this.coreServiceAccount != "" {
names := strings.Split(this.coreServiceAccount, "/")
if this.serviceAccount != "" {
names := strings.Split(this.serviceAccount, "/")
if len(names) > 2 {
return fmt.Errorf("invalid service account name")
}
if len(names) == 2 {
this.CoreServiceAccount = resources.NewObjectName(names...)
this.ServiceAccount = resources.NewObjectName(names...)
} else {
this.CoreServiceAccount = resources.NewObjectName("kube-system", names[0])
this.ServiceAccount = resources.NewObjectName("kube-system", names[0])
}
}

if this.coreDNSServiceIP != "" {
this.CoreDNSServiceIP = net.ParseIP(this.coreDNSServiceIP)
if this.CoreDNSServiceIP == nil {
return fmt.Errorf("invalid ip of coredns service: %s", this.coreDNSServiceIP)
}
}

this.CoreDNSMode = strings.ToLower(this.CoreDNSMode)
switch this.CoreDNSMode {
case DNSMODE_KUBERNETES, DNSMODE_DNS:
if this.dnsServiceIP != "" {
this.DNSServiceIP = net.ParseIP(this.dnsServiceIP)
if this.DNSServiceIP == nil {
return fmt.Errorf("invalid ip of coredns service: %s", this.coreDNSServiceIP)
}
}
if this.DNSServiceIP == nil {
if this.ServiceCIDR != nil {
this.DNSServiceIP = tcp.SubIP(this.ServiceCIDR, CLUSTER_DNS_IP)
}
}

this.DNSPropagation = strings.ToLower(this.DNSPropagation)
switch this.DNSPropagation {
case DNSMODE_KUBERNETES, DNSMODE_DNS, DNSMODE_NONE:
default:
return fmt.Errorf("invalid dns mode: %s", this.CoreDNSMode)
return fmt.Errorf("invalid dns mode: %s", this.DNSPropagation)
}
return nil
}
Expand Down
65 changes: 49 additions & 16 deletions pkg/controllers/broker/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ import (

const BufferSize = 17000

// Packet types:
// 0: Normal data payload
// 1: Hello message
// More types planned for intermediate transfer of meta information
// Unknown packets have to be skipped and returned with reject bit set

const PACKET_TYPE_DATA = 0
const PACKET_TYPE_HELLO = 1

////////////////////////////////////////////////////////////////////////////////

type ConnectionFailHandler interface {
Expand All @@ -51,6 +60,9 @@ type TunnelConnection struct {
clusterCIDR *net.IPNet
remoteAddress string
handlers []ConnectionFailHandler

wlock sync.Mutex
rlock sync.Mutex
}

func NewTunnelConnection(mux *Mux, conn net.Conn, link *kubelink.Link, handlers ...ConnectionFailHandler) (*TunnelConnection, *ConnectionHello, error) {
Expand Down Expand Up @@ -87,6 +99,7 @@ func NewTunnelConnection(mux *Mux, conn net.Conn, link *kubelink.Link, handlers
}
}
if mux.connectionHandler != nil {
t.Infof("start hello handling....")
go mux.connectionHandler.UpdateAccess(hello)
}
}
Expand Down Expand Up @@ -143,22 +156,34 @@ func (this *TunnelConnection) Close() error {

func (this *TunnelConnection) writeHello(hello *ConnectionHello) error {
data := hello.Data()
return this.write(this.conn, data)
return this.WritePacket(PACKET_TYPE_HELLO, data)
}

func (this *TunnelConnection) readHello() (*ConnectionHello, error) {
var header ConnectionHelloHeader
err := this.read(this.conn, header[:])
var buffer [BufferSize]byte
n, ty, err := this.ReadPacket(buffer[:])
if err != nil {
return nil, err
}
len := header.GetExtensionLength()
buf := make([]byte, len)
err = this.read(this.conn, buf)
if ty != PACKET_TYPE_HELLO {
return nil, fmt.Errorf("unexpected packet %d instead of hello handshake", ty)
}
return this.parseHelloPacket(buffer[:n])
}

func (this *TunnelConnection) parseHelloPacket(data []byte) (*ConnectionHello, error) {
var header ConnectionHelloHeader
if len(data) < len(header) {
return nil, fmt.Errorf("hello packet too short (%d expected %d)", len(data), len(header))
}
copy(header[:], data)
hello, err := ParseConnectionHello(this.mux, &header, data[len(header):])
if err != nil {
this.Errorf("invalid hello packet: %s", err)
return nil, err
}
return ParseConnectionHello(this.mux, &header, buf)
this.Infof("hello packet with %d extensions", len(hello.Extensions))
return hello, nil
}

func (this *TunnelConnection) createHello() *ConnectionHello {
Expand Down Expand Up @@ -208,7 +233,7 @@ func (this *TunnelConnection) Serve() error {
func (this *TunnelConnection) serve() error {
var buffer [BufferSize]byte
for {
n, err := this.ReadPacket(buffer[:])
n, ty, err := this.ReadPacket(buffer[:])
if n < 0 || err != nil {
this.Infof("connection aborted: %d bytes, err=%s", n, err)
if n <= 0 {
Expand All @@ -220,6 +245,10 @@ func (this *TunnelConnection) serve() error {
continue
}
packet := buffer[:n]
if ty != PACKET_TYPE_DATA {
this.Infof("got packet of unknown type %x", ty)
continue
}
vers := int(packet[0]) >> 4
if vers == ipv4.Version {
header, err := ipv4.ParseHeader(packet)
Expand Down Expand Up @@ -298,27 +327,31 @@ func (this *TunnelConnection) write(w io.Writer, data []byte) error {
return nil
}

func (this *TunnelConnection) ReadPacket(data []byte) (int, error) {
lbuf := [2]byte{}
func (this *TunnelConnection) ReadPacket(data []byte) (int, byte, error) {
this.rlock.Lock()
defer this.rlock.Unlock()
lbuf := [3]byte{}
err := this.read(this.conn, lbuf[:])

if err != nil {
return 0, err
return 0, 0, err
}

length := tcp.NtoHs(lbuf[:])
length := tcp.NtoHs(lbuf[:2])
if int(length) > len(data) {
return 0, fmt.Errorf("buffer too small (%d): packet size is %d", len(data), length)
return 0, 0, fmt.Errorf("buffer too small (%d): packet size is %d", len(data), length)
}
return int(length), this.read(this.conn, data[0:length])
return int(length), lbuf[2], this.read(this.conn, data[0:length])
}

func (this *TunnelConnection) WritePacket(data []byte) error {
func (this *TunnelConnection) WritePacket(ty byte, data []byte) error {
if len(data) > 65535 {
return fmt.Errorf("packet too large (%d)", len(data))
}
lbuf := tcp.HtoNs(uint16(len(data)))
err := this.write(this.conn, lbuf)
this.wlock.Lock()
defer this.wlock.Unlock()
err := this.write(this.conn, append(lbuf, ty))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d72f6c2

Please sign in to comment.