Skip to content

Commit

Permalink
backend buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter committed Feb 22, 2024
1 parent 12cf026 commit d207bd2
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 80 deletions.
228 changes: 185 additions & 43 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,66 @@ package main
import (
"sync"
"time"

"github.com/sirupsen/logrus"
)

var BULK_WINDOW_MS int64 = 100
var FLUSH_BUFFER_SIZE = 1000
var MAX_MESSAGE_COUNT = 100_000

type CursorStatus string

const CURSOR_STOPPED CursorStatus = "stopped"
const CURSOR_FOLLOWING CursorStatus = "following"

type Client struct {
done chan struct{}
ch chan []Message
buffer []Message
bufferOpMu sync.Mutex
id string
done chan struct{}
ch chan []Message
buffer []Message

cursorStatus CursorStatus
cursorPosition string // last delivered message id
}

func (c *Client) handleMessage(m Message) {
func (c *Client) handleMessage(m Message, force bool) {
if !force && c.cursorStatus == CURSOR_STOPPED {
logger.Debug("Client: Status stopped discarding message")
return
}
c.buffer = append(c.buffer, m)
}

func (c *Client) flushBuffer() {
if len(c.buffer) == 0 {
return
}

for i := 0; i < len(c.buffer); i += FLUSH_BUFFER_SIZE {
end := i + FLUSH_BUFFER_SIZE
if end > len(c.buffer) {
end = len(c.buffer)
}

batch := c.buffer[i:end]
c.ch <- batch
}
}

func (c *Client) clearBuffer() {
c.buffer = []Message{}
}

func (c *Client) close() {
c.done <- struct{}{}
}

func (c *Client) waitForBufferDrain() {
for len(c.buffer) > 0 {
time.Sleep(5 * time.Millisecond)
}
}

// Messages are delivered in bulks to avoid
// ddossing the client (browser) with too many messages produced
// in a very short timespan
Expand All @@ -41,18 +82,26 @@ func (c *Client) startBufferFlushLoop() {
}

logger.WithField("count", len(c.buffer)).Debug("Client: Flushing buffer")
c.ch <- c.buffer
c.buffer = []Message{}
c.cursorPosition = c.buffer[len(c.buffer)-1].Id
c.bufferOpMu.Lock()

c.flushBuffer()
c.clearBuffer()

c.bufferOpMu.Unlock()
}

}
}

func NewClient() *Client {
c := &Client{
done: make(chan struct{}),
ch: make(chan []Message, 100),
buffer: []Message{},
bufferOpMu: sync.Mutex{},
done: make(chan struct{}),
ch: make(chan []Message, BULK_WINDOW_MS*25),
cursorStatus: CURSOR_STOPPED,
cursorPosition: "",
id: RandStringRunes(6),
}

go c.startBufferFlushLoop()
Expand All @@ -61,69 +110,162 @@ func NewClient() *Client {
}

type Clients struct {
started bool
mu sync.Mutex
mainChan <-chan Message
clients map[int]*Client
clients map[string]*Client
buffer []Message
currentlyConnected int
stats Stats
}

func NewClients(msgs <-chan Message) *Clients {
return &Clients{
cls := &Clients{
mu: sync.Mutex{},
mainChan: msgs,
clients: map[int]*Client{},
clients: map[string]*Client{},
currentlyConnected: 0,
buffer: []Message{},
stats: Stats{
Count: 0,
},
}

go cls.Start()

return cls
}

func (c *Clients) Start() {
for {
msg := <-c.mainChan
c.mu.Lock()
if c.currentlyConnected == 0 {
logger.Debug("Received a log message but no client is connected, buffering message")
c.buffer = append(c.buffer, msg)
func (c *Clients) Load(clientId string, start int, count int, includeStart bool) {
c.PauseFollowing(clientId)
cl := c.clients[clientId]
cl.waitForBufferDrain()

cl.bufferOpMu.Lock()
defer cl.bufferOpMu.Unlock()

seen := false
sent := 0
for i, msg := range c.buffer {
if i+1 == start {
seen = true
if !includeStart {
continue
}
}

for _, ch := range c.clients {
ch.handleMessage(msg)
if !seen {
continue
}

sent++
cl.handleMessage(msg, true)

if count > 0 && sent >= count {
break
}
c.mu.Unlock()
}
cl.flushBuffer()

}

func (c *Clients) Join(id int) *Client {
func (c *Clients) PeekLog(idxs []int) []Message {
msgs := []Message{}

if _, ok := c.clients[id]; ok {
panic("Client already exists")
for _, idx := range idxs {
if len(c.buffer)-1 < idx {
continue
}
msgs = append(msgs, c.buffer[idx])
}

c.mu.Lock()
defer func() {
if len(c.buffer) == 0 {
return
}
return msgs
}

type Stats struct {
Count int
FirstMessageAt time.Time
LastMessageAt time.Time
}

func (c *Clients) Stats() Stats {
return c.stats
}

logger.WithFields(logrus.Fields{
"msg_count": len(c.buffer),
}).Info("Flushing log messages buffer to a recently connected client")
func (c *Clients) ResumeFollowing(clientId string, sinceCursor bool) {
//pump back the items until last element seen

c.clients[clientId].bufferOpMu.Lock()
if sinceCursor {
seen := false
for _, msg := range c.buffer {
cl := c.clients[id]
cl.handleMessage(msg)
if msg.Id == c.clients[clientId].cursorPosition {
seen = true
continue
}

if !seen {
continue
}

c.clients[clientId].handleMessage(msg, true)
}
}
c.clients[clientId].flushBuffer()
c.clients[clientId].cursorStatus = CURSOR_FOLLOWING
c.clients[clientId].bufferOpMu.Unlock()
}

c.buffer = []Message{}
}()
defer c.mu.Unlock()
func (c *Clients) PauseFollowing(clientId string) {
c.clients[clientId].cursorStatus = CURSOR_STOPPED
c.clients[clientId].waitForBufferDrain()
}

c.clients[id] = NewClient()
// starts a delivery channel to all clients
func (c *Clients) Start() {
if c.started {
logger.Debug("Clients delivery loop already started")
return
}

c.started = true
for {
msg := <-c.mainChan
if c.stats.FirstMessageAt.IsZero() {
c.stats.FirstMessageAt = time.Now()
}
c.buffer = append(c.buffer, msg)
c.stats.Count++
c.stats.LastMessageAt = time.Now()

for _, ch := range c.clients {
ch.bufferOpMu.Lock()
ch.handleMessage(msg, false)
ch.bufferOpMu.Unlock()
}
}
}

func (c *Clients) Join(tailLen int) *Client {
cl := NewClient()
c.clients[cl.id] = cl
c.currentlyConnected++
return c.clients[id]

// deliver last N messages from a buffer upon connection
idx := 0
if len(c.buffer) > tailLen {
idx = len(c.buffer) - tailLen
}

for _, msg := range c.buffer[idx:] {
cl.handleMessage(msg, true)
}
c.clients[cl.id].cursorStatus = CURSOR_FOLLOWING

return c.clients[cl.id]
}

func (c *Clients) Close(id int) {
func (c *Clients) Close(id string) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down
Loading

0 comments on commit d207bd2

Please sign in to comment.