Skip to content

Commit

Permalink
Merge pull request #225 from loxilb-io/egr-crd
Browse files Browse the repository at this point in the history
PR - loxilb-io/loxilb#877 egress support - handle addresses in any format
  • Loading branch information
TrekkieCoder authored Jan 6, 2025
2 parents 21dbf09 + 381140c commit 8b52ce9
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 23 deletions.
23 changes: 15 additions & 8 deletions cmd/loxilb-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func run(o *Options) error {
loxilbPeerClients := api.NewLoxiClientPool()
loxiLBLiveCh := make(chan *api.LoxiClient, 50)
loxiLBPurgeCh := make(chan *api.LoxiClient, 5)
loxiLBSelMasterEvent := make(chan bool)
loxiLBDeadCh := make(chan struct{}, 64)
ticker := time.NewTicker(20 * time.Second)

Expand Down Expand Up @@ -266,16 +265,24 @@ func run(o *Options) error {
case <-loxiLBDeadCh:
if networkConfig.SetRoles != "" {
klog.Infof("Running select-roles")
lbManager.SelectLoxiLBRoles(true, loxiLBSelMasterEvent)
lbManager.SelectLoxiLBRoles(true, lbManager.ClientSelMasterCh)
}
case alive := <-loxiLBLiveCh:
klog.Infof("Client alive : %s", alive.Host)
lbManager.ClientAliveCh <- alive
egressMgr.ClientAliveCh <- alive
case dead := <-loxiLBDeadCh:
lbManager.ClientDeadCh <- dead
case purged := <-loxiLBPurgeCh:
lbManager.ClientPurgeCh <- purged
case <-ticker.C:
if len(networkConfig.LoxilbURLs) <= 0 {
lbManager.DiscoverLoxiLBServices(loxiLBLiveCh, loxiLBDeadCh, loxiLBPurgeCh, o.config.ExcludeRoleList)
}
lbManager.DiscoverLoxiLBPeerServices(loxiLBLiveCh, loxiLBDeadCh, loxiLBPurgeCh)

if networkConfig.SetRoles != "" {
lbManager.SelectLoxiLBRoles(true, loxiLBSelMasterEvent)
lbManager.SelectLoxiLBRoles(true, lbManager.ClientSelMasterCh)
}
case <-stopCh:
return
Expand All @@ -285,13 +292,13 @@ func run(o *Options) error {
log.StartLogFileNumberMonitor(stopCh)
informerFactory.Start(stopCh)

go lbManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent)
go lbManager.Run(stopCh)
if o.config.EnableBGPCRDs {
bgpCRDInformerFactory.Start(stopCh)
go BgpPeerManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent)
go BGPPolicyDefinedSetsManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent)
go BGPPolicyDefinitionManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent)
go BGPPolicyApplyManager.Run(stopCh, loxiLBLiveCh, loxiLBPurgeCh, loxiLBSelMasterEvent)
go BgpPeerManager.Run(stopCh)
go BGPPolicyDefinedSetsManager.Run(stopCh)
go BGPPolicyDefinitionManager.Run(stopCh)
go BGPPolicyApplyManager.Run(stopCh)
}

go loxilbURLMgr.Start(loxilbURLInformerFactory, stopCh, loxiLBLiveCh, loxiLBDeadCh, loxiLBPurgeCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/manager/bgppeer/bgppeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *Manager) enqueueService(obj interface{}) {
m.queue.Add(lb)
}

func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient, loxiLBPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) {
func (m *Manager) Run(stopCh <-chan struct{}) {
defer m.queue.ShutDown()

klog.Infof("Starting %s", mgrName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/manager/bgppolicyapply/bgppolicyapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (m *Manager) enqueueService(obj interface{}) {
m.queue.Add(lb)
}

func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient, loxiLBPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) {
func (m *Manager) Run(stopCh <-chan struct{}) {
defer m.queue.ShutDown()

klog.Infof("Starting %s", mgrName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (m *Manager) enqueueService(obj interface{}) {
m.queue.Add(lb)
}

func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient, loxiLBPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) {
func (m *Manager) Run(stopCh <-chan struct{}) {
defer m.queue.ShutDown()

klog.Infof("Starting %s", mgrName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (m *Manager) enqueueService(obj interface{}) {
m.queue.Add(lb)
}

func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient, loxiLBPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) {
func (m *Manager) Run(stopCh <-chan struct{}) {
defer m.queue.ShutDown()

klog.Infof("Starting %s", mgrName)
Expand Down
34 changes: 33 additions & 1 deletion pkg/agent/manager/egress/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package egress
import (
"context"
"fmt"
"net"
"strings"
"time"

Expand All @@ -31,6 +32,7 @@ import (
crdLister "github.com/loxilb-io/kube-loxilb/pkg/egress-client/listers/egress/v1"
apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand All @@ -55,6 +57,7 @@ type Manager struct {
EgressListerSynced cache.InformerSynced
LoxiClients *api.LoxiClientPool
queue workqueue.RateLimitingInterface
ClientAliveCh chan *api.LoxiClient
}

// Create and Init Manager.
Expand All @@ -76,6 +79,7 @@ func NewEgressManager(
EgressLister: EgressInformer.Lister(),
EgressListerSynced: EgressInformer.Informer().HasSynced,
LoxiClients: LoxiClients,
ClientAliveCh: make(chan *api.LoxiClient, 50),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "Egress"),
}
Expand Down Expand Up @@ -126,6 +130,9 @@ func (m *Manager) Run(stopCh <-chan struct{}) {
m.EgressListerSynced) {
return
}

go m.manageLoxilbEgressLifeCycle(stopCh)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(m.worker, time.Second, stopCh)
}
Expand Down Expand Up @@ -229,9 +236,14 @@ func (m *Manager) deleteEgress(egress *crdv1.Egress) error {
func (m *Manager) makeLoxiFirewallModel(egress *crdv1.Egress) []*api.FwRuleMod {
newFwModels := []*api.FwRuleMod{}
for _, address := range egress.Spec.Addresses {
cidrAddr := address
_, _, err := net.ParseCIDR(address)
if err != nil {
cidrAddr = address + "/32"
}
newFwModel := &api.FwRuleMod{
Rule: api.FwRuleArg{
SrcIP: address + "/32",
SrcIP: cidrAddr,
},
Opts: api.FwOptArg{
DoSnat: true,
Expand Down Expand Up @@ -308,3 +320,23 @@ func (m *Manager) callLoxiFirewallDeleteAPI(ctx context.Context, fwModel *api.Fw
}
return nil
}

func (m *Manager) manageLoxilbEgressLifeCycle(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case c := <-m.ClientAliveCh:
klog.V(4).Infof("Resync all Egresses (%s:alive)", c.Host)
egresses, err := m.EgressLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get egress list - err: %v", err)
} else {
klog.V(4).Infof("Resync all Egresses - (%v)", egresses)
for _, egr := range egresses {
m.addEgress(egr)
}
}
}
}
}
28 changes: 18 additions & 10 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ type Manager struct {
instAddrApplyCh chan struct{}
loxiInstAddrMap map[string]net.IP
zoneInstSelHint int
ClientAliveCh chan *api.LoxiClient
ClientPurgeCh chan *api.LoxiClient
ClientSelMasterCh chan bool
ClientDeadCh chan struct{}
}

type LbArgs struct {
Expand Down Expand Up @@ -263,6 +267,10 @@ func NewLoadBalancerManager(
instAddrApplyCh: make(chan struct{}),
loxiInstAddrMap: make(map[string]net.IP),
zoneInstRoleMap: make(map[string]*LoxiInstRole),
ClientAliveCh: make(chan *api.LoxiClient, 50),
ClientPurgeCh: make(chan *api.LoxiClient, 5),
ClientSelMasterCh: make(chan bool),
ClientDeadCh: make(chan struct{}, 64),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "loadbalancer"),
lbCache: make(LbCacheTable),
Expand Down Expand Up @@ -316,7 +324,7 @@ func (m *Manager) enqueueService(obj interface{}) {
m.queue.Add(key)
}

func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient, loxiLBPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) {
func (m *Manager) Run(stopCh <-chan struct{}) {
defer m.queue.ShutDown()

klog.Infof("Starting %s", mgrName)
Expand All @@ -330,7 +338,7 @@ func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh chan *api.LoxiClient,
return
}

go m.manageLoxiLbLifeCycle(stopCh, loxiLBLiveCh, loxiLBPurgeCh, masterEventCh)
go m.manageLoxiLbLifeCycle(stopCh)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(m.worker, time.Second, stopCh)
Expand Down Expand Up @@ -2301,15 +2309,15 @@ func (m *Manager) checkHandleBGPCfgErrors(loxiAliveCh chan *api.LoxiClient, peer
}
}

func (m *Manager) manageLoxiLbLifeCycle(stopCh <-chan struct{}, loxiAliveCh chan *api.LoxiClient, loxiPurgeCh chan *api.LoxiClient, masterEventCh <-chan bool) {
func (m *Manager) manageLoxiLbLifeCycle(stopCh <-chan struct{}) {
loop:
for {
select {
case <-stopCh:
break loop
case <-m.instAddrApplyCh:
m.updateAllLoxiLBServiceStatus()
case <-masterEventCh:
case <-m.ClientSelMasterCh:
for _, lc := range m.LoxiClients.Clients {
if !lc.IsAlive {
continue
Expand All @@ -2334,7 +2342,7 @@ loop:
}
}
}
case purgedClient := <-loxiPurgeCh:
case purgedClient := <-m.ClientPurgeCh:
klog.Infof("loxilb-lb(%s): purged", purgedClient.Host)
if m.networkConfig.SetBGP != 0 {
deleteNeigh := func(client *api.LoxiClient, neighIP string, remoteAs int) error {
Expand All @@ -2355,7 +2363,7 @@ loop:
}
}
}
case aliveClient := <-loxiAliveCh:
case aliveClient := <-m.ClientAliveCh:
aliveClient.DoBGPCfg = false
if m.networkConfig.SetRoles != "" && !aliveClient.PeeringOnly {

Expand Down Expand Up @@ -2424,7 +2432,7 @@ loop:
klog.Infof("loxilb(%s) set-bgp-global success", aliveClient.Host)
} else {
klog.Infof("loxilb(%s) set-bgp-global failed(%s)", aliveClient.Host, err)
m.checkHandleBGPCfgErrors(loxiAliveCh, aliveClient, err)
m.checkHandleBGPCfgErrors(m.ClientAliveCh, aliveClient, err)
}

for _, bgpPeer := range bgpPeers {
Expand All @@ -2438,7 +2446,7 @@ loop:
klog.Infof("set-bgp-neigh(%s->%s) success", aliveClient.Host, bgpPeer.Host)
} else {
klog.Infof("set-bgp-neigh(%s->%s) failed(%s)", aliveClient.Host, bgpPeer.Host, err)
m.checkHandleBGPCfgErrors(loxiAliveCh, aliveClient, err)
m.checkHandleBGPCfgErrors(m.ClientAliveCh, aliveClient, err)
}

bgpNeighCfg1, _ := m.makeLoxiLBBGNeighModel(int(m.networkConfig.SetBGP), aliveClient.Host, m.networkConfig.ListenBGPPort, false)
Expand All @@ -2451,7 +2459,7 @@ loop:
klog.Infof("set-bgp-neigh(%s->%s) success", bgpPeer.Host, aliveClient.Host)
} else {
klog.Infof("set-bgp-neigh(%s->%s) failed(%s)", bgpPeer.Host, aliveClient.Host, err)
m.checkHandleBGPCfgErrors(loxiAliveCh, bgpPeer, err)
m.checkHandleBGPCfgErrors(m.ClientAliveCh, bgpPeer, err)
}
}

Expand Down Expand Up @@ -2482,7 +2490,7 @@ loop:
klog.Infof("set-ebgp-neigh(%s:%v) cfg success", bgpRemoteIP.String(), asid)
} else {
klog.Infof("set-ebgp-neigh(%s:%v) cfg - failed (%s)", bgpRemoteIP.String(), asid, err)
m.checkHandleBGPCfgErrors(loxiAliveCh, aliveClient, err)
m.checkHandleBGPCfgErrors(m.ClientAliveCh, aliveClient, err)
}
}
}
Expand Down

0 comments on commit 8b52ce9

Please sign in to comment.