Skip to content

Commit

Permalink
Make sure to update stats monitor on ingress started/ended (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben authored Jun 23, 2023
1 parent 7ac35ee commit 2c186c5
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 16 deletions.
6 changes: 0 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ type Config struct {
NodeID string `yaml:"-"`
}

type WhipConfig struct {
// TODO add IceLite, NAT1To1IPs
ICEPortRange []uint16 `yaml:"ice_port_range"`
EnableLoopbackCandidate bool `yaml:"enable_loopback_candidate"`
}

type CPUCostConfig struct {
RTMPCpuCost float64 `yaml:"rtmp_cpu_cost"`
WHIPCpuCost float64 `yaml:"whip_cpu_cost"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *ProcessManager) launchHandler(ctx context.Context, resp *rpc.GetIngress
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

s.sm.IngressStarted(resp.Info.IngressId, SessionType_HandlerProcess)
s.sm.IngressStarted(resp.Info, SessionType_HandlerProcess)
h := &process{
info: resp.Info,
cmd: cmd,
Expand All @@ -122,7 +122,7 @@ func (s *ProcessManager) awaitCleanup(h *process) {
}

h.closed.Break()
s.sm.IngressEnded(h.info.IngressId)
s.sm.IngressEnded(h.info)

s.mu.Lock()
defer s.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc
p.SetStatus(livekit.IngressState_ENDPOINT_PUBLISHING, "")
p.SendStateUpdate(ctx)

s.sm.IngressStarted(p.IngressId, SessionType_Service)
s.sm.IngressStarted(p.IngressInfo, SessionType_Service)
} else {
extraParams.MimeTypes = mimeTypes

Expand All @@ -202,7 +202,7 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc
}

p.SendStateUpdate(ctx)
s.sm.IngressEnded(p.IngressId)
s.sm.IngressEnded(p.IngressInfo)
DeregisterIngressRpcHandlers(rpcServer, p.IngressInfo, p.ExtraParams)
}
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/service/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"github.com/livekit/ingress/pkg/stats"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

Expand All @@ -28,22 +29,26 @@ func NewSessionManager(monitor *stats.Monitor) *SessionManager {
}
}

func (sm *SessionManager) IngressStarted(ingressID string, t SessionType) {
logger.Infow("ingress started", "ingressID", ingressID, "type", t)
func (sm *SessionManager) IngressStarted(info *livekit.IngressInfo, t SessionType) {
logger.Infow("ingress started", "ingressID", info.IngressId, "type", t)

sm.lock.Lock()
defer sm.lock.Unlock()

sm.sessions[ingressID] = t
sm.sessions[info.IngressId] = t

sm.monitor.IngressStarted(info)
}

func (sm *SessionManager) IngressEnded(ingressID string) {
logger.Infow("ingress ended", "ingressID", ingressID)
func (sm *SessionManager) IngressEnded(info *livekit.IngressInfo) {
logger.Infow("ingress ended", "ingressID", info.IngressId)

sm.lock.Lock()
defer sm.lock.Unlock()

delete(sm.sessions, ingressID)
delete(sm.sessions, info.IngressId)

sm.monitor.IngressEnded(info)
}

func (sm *SessionManager) IsIdle() bool {
Expand Down

0 comments on commit 2c186c5

Please sign in to comment.