Skip to content

Commit

Permalink
Merge pull request #145 from loxilb-io/nodelabel
Browse files Browse the repository at this point in the history
Added support to select endpoints based on node labels
  • Loading branch information
TrekkieCoder authored Jun 15, 2024
2 parents eb5c55c + 3a4921b commit 6f38df1
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 24 deletions.
116 changes: 96 additions & 20 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
endPointSelAnnotation = "loxilb.io/epselect"
zoneSelAnnotation = "loxilb.io/zoneselect"
prefLocalPodAnnotation = "loxilb.io/prefLocalPod"
matchNodeLabelAnnotation = "loxilb.io/nodelabel"
MaxExternalSecondaryIPsNum = 4
)

Expand Down Expand Up @@ -132,6 +133,7 @@ type LbCacheEntry struct {
PrefLocal bool
Addr string
State string
NodeLabel string
ProbeType string
ProbePort uint16
ProbeReq string
Expand Down Expand Up @@ -177,6 +179,17 @@ func GenSPKey(IPString string, Port uint16, Protocol string) string {
return fmt.Sprintf("%s:%v:%s", IPString, Port, Protocol)
}

func genExtIPName(ipStr string) string {
prefix := "llb-"
IP := net.ParseIP(ipStr)
if IP != nil {
if IP.IsUnspecified() {
return "llbanyextip"
}
}
return prefix + ipStr
}

// Create and Init Manager.
// Manager is called by kube-loxilb when k8s service is created & updated.
func NewLoadBalancerManager(
Expand Down Expand Up @@ -352,11 +365,17 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
probeRetries := 0
prefLocal := false
epSelect := api.LbSelRr
matchNodeLabel := ""

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 && !needPodEP {
return nil
}

// Check for loxilb specific annotations - MatchNodeLabel
if mnl := svc.Annotations[matchNodeLabelAnnotation]; mnl != "" {
matchNodeLabel = mnl
}

// Check for loxilb specific annotations - PreferLocalPodAlways
if plp := svc.Annotations[prefLocalPodAnnotation]; plp != "" {
if plp == "yes" {
Expand Down Expand Up @@ -527,7 +546,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
numSecondarySvc = 0
}

endpointIPs, err := m.getEndpoints(svc, needPodEP, addrType)
endpointIPs, err := m.getEndpoints(svc, needPodEP, addrType, matchNodeLabel)
if err != nil {
klog.Errorf("getEndpoints return error.")
klog.V(4).Infof("endpointIPs: %v", endpointIPs)
Expand All @@ -550,6 +569,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
PrefLocal: prefLocal,
Timeout: timeout,
State: "Added",
NodeLabel: matchNodeLabel,
ProbeType: probeType,
ProbePort: uint16(probePort),
ProbeReq: probeReq,
Expand Down Expand Up @@ -636,6 +656,15 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
klog.Infof("%s: addr-type update", cacheKey)
}

if matchNodeLabel != m.lbCache[cacheKey].NodeLabel {
m.lbCache[cacheKey].NodeLabel = matchNodeLabel
update = true
if added {
needDelete = true
}
klog.Infof("%s: nodelabel update", cacheKey)
}

if timeout != m.lbCache[cacheKey].Timeout {
m.lbCache[cacheKey].Timeout = timeout
update = true
Expand Down Expand Up @@ -856,7 +885,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
sp.LbModelList = append(sp.LbModelList, lbModel)
m.lbCache[cacheKey].LbServicePairs[GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol)] = &sp
if ingSvcPair.InRange || ingSvcPair.StaticIP {
retIngress := corev1.LoadBalancerIngress{Hostname: "llb-" + ingSvcPair.IPString}
retIngress := corev1.LoadBalancerIngress{Hostname: genExtIPName(ingSvcPair.IPString)}
if !m.checkServiceIngressIPExists(svc, retIngress.Hostname) {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, retIngress)
}
Expand Down Expand Up @@ -1009,24 +1038,65 @@ func (m *Manager) installLB(c *api.LoxiClient, lb api.LoadBalancerModel, prefLoc
return err
}

// getNodeEndpointsWithLabel returns the IP list of nodes available with match labels
func (m *Manager) getNodeEndpointsWithLabel(addrType string, matchLabel string) ([]string, error) {
klog.Infof("getNodeEndpointsWithLabel: label %s", matchLabel)
req, err := labels.NewRequirement(matchLabel, selection.Exists, []string{})
if err != nil {
klog.Infof("getNodeEndpointsWithLabel: failed to make label requirement. err: %v", err)
return nil, err
}

nodes, err := m.nodeLister.List(labels.NewSelector().Add(*req))
if err != nil {
klog.Infof("getNodeEndpointsWithLabel: failed to get nodeList. err: %v", err)
return nil, err
}

var endpoints []string
for _, node := range nodes {
addr, err := m.getNodeAddress(*node, addrType)
if err != nil {
klog.Errorf(err.Error())
continue
}
klog.Infof("getNodeEndpointsWithLabel: found node %s with label %s", addr, matchLabel)
endpoints = append(endpoints, addr)
}

return endpoints, nil
}

// getEndpoints return LB's endpoints IP list.
// If podEP is true, return multus endpoints list.
// If false, return worker nodes IP list.
func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool, addrType string) ([]string, error) {
func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool, addrType, matchNodeLabel string) ([]string, error) {
if podEP {
//klog.Infof("getEndpoints: Pod end-points")
return m.getMultusEndpoints(svc, addrType)
}

var matchNodeList []string
var err error
if matchNodeLabel != "" {
matchNodeList, err = m.getNodeEndpointsWithLabel(addrType, matchNodeLabel)
if err != nil {
return nil, err
}
if len(matchNodeList) <= 0 {
matchNodeList = append(matchNodeList, "xdeadbeefx")
}
}

if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
//klog.Infof("getEndpoints: Traffic Policy Local")
return k8s.GetServiceLocalEndpoints(m.kubeClient, svc, addrType)
//klog.Infof("getEndpoints: Traffic Policy Local %d", len(matchNodeList))
return k8s.GetServiceLocalEndpoints(m.kubeClient, svc, addrType, matchNodeList)
}
return m.getNodeEndpoints(addrType)
return m.getNodeEndpoints(addrType, matchNodeList)
}

// getNodeEndpoints returns the IP list of nodes available as nodePort service.
func (m *Manager) getNodeEndpoints(addrType string) ([]string, error) {
func (m *Manager) getNodeEndpoints(addrType string, nodeMatchList []string) ([]string, error) {
req, err := labels.NewRequirement("node.kubernetes.io/exclude-from-external-load-balancers", selection.DoesNotExist, []string{})
if err != nil {
klog.Infof("getEndpoints: failed to make label requirement. err: %v", err)
Expand All @@ -1039,7 +1109,7 @@ func (m *Manager) getNodeEndpoints(addrType string) ([]string, error) {
return nil, err
}

return m.getEndpointsForLB(nodes, addrType), nil
return m.getEndpointsForLB(nodes, addrType, nodeMatchList), nil
}

// getLocalEndpoints returns the IP list of the Pods connected to the multus network.
Expand Down Expand Up @@ -1093,14 +1163,17 @@ func (m *Manager) getNodeAddress(node corev1.Node, addrType string) (string, err
return "", errors.New("no address with family found for host")
}

func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string) []string {
func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string, nodeMatchList []string) []string {
var endpoints []string
for _, node := range nodes {
addr, err := m.getNodeAddress(*node, addrType)
if err != nil {
klog.Errorf(err.Error())
continue
}
if len(nodeMatchList) > 0 && !k8s.MatchNodeinNodeList(addr, nodeMatchList) {
continue
}
endpoints = append(endpoints, addr)
}

Expand All @@ -1110,7 +1183,7 @@ func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string) []str
func (m *Manager) checkUpdateExternalIP(ingSvcPairs []SvcPair, svc *corev1.Service) bool {
for _, ingSvcPair := range ingSvcPairs {
if ingSvcPair.InRange || ingSvcPair.StaticIP {
retIngress := corev1.LoadBalancerIngress{Hostname: "llb-" + ingSvcPair.IPString}
retIngress := corev1.LoadBalancerIngress{Hostname: genExtIPName(ingSvcPair.IPString)}
if !m.checkServiceIngressIPExists(svc, retIngress.Hostname) {
klog.V(4).Infof("checkUpdateExternalIP: ingSvcPair %v has external IP but service %s has no IP. need update.", ingSvcPair, svc.Name)
return true
Expand Down Expand Up @@ -1185,19 +1258,22 @@ func (m *Manager) getServiceIngressIPs(service *corev1.Service) []string {
ingressIP = ingress.IP

} else if ingress.Hostname != "" {
llbHost := strings.Split(ingress.Hostname, "-")

if len(llbHost) != 2 {
if net.ParseIP(llbHost[0]) != nil {
ingressIP = llbHost[0]

}
if ingress.Hostname == "llbanyextip" {
ingressIP = "0.0.0.0"
} else {
if llbHost[0] == "llb" {
if net.ParseIP(llbHost[1]) != nil {
ingressIP = llbHost[1]
llbHost := strings.Split(ingress.Hostname, "-")

if len(llbHost) != 2 {
if net.ParseIP(llbHost[0]) != nil {
ingressIP = llbHost[0]

}
} else {
if llbHost[0] == "llb" {
if net.ParseIP(llbHost[1]) != nil {
ingressIP = llbHost[1]
}
}
}
}
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import (
"context"
"errors"
"fmt"
"net"
"time"

tk "github.com/loxilb-io/loxilib"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
"net"
"time"
)

// GetNodeAddr gets the available IP address of a Node.
Expand All @@ -53,8 +52,17 @@ func GetNodeAddr(node *v1.Node) (net.IP, error) {
return ipAddr, nil
}

func MatchNodeinNodeList(node string, nodeMatchList []string) bool {
for _, n := range nodeMatchList {
if n == node {
return true
}
}
return false
}

// GetServiceLocalEndpoints - Get HostIPs of pods belonging to the given service
func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service, addrType string) ([]string, error) {
func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service, addrType string, nodeMatchList []string) ([]string, error) {
var epList []string

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand All @@ -72,6 +80,9 @@ func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Servic
if addrType == "ipv6" && !tk.IsNetIPv6(pod.Status.HostIP) {
continue
}
if len(nodeMatchList) > 0 && !MatchNodeinNodeList(pod.Status.HostIP, nodeMatchList) {
continue
}
if _, found := epMap[pod.Status.HostIP]; !found {
epMap[pod.Status.HostIP] = struct{}{}
epList = append(epList, pod.Status.HostIP)
Expand Down

0 comments on commit 6f38df1

Please sign in to comment.