Skip to content

Commit

Permalink
implemented ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter committed Feb 22, 2024
1 parent d207bd2 commit ad367a0
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 41 deletions.
52 changes: 33 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package main
import (
"sync"
"time"

"logdy/ring"
)

var BULK_WINDOW_MS int64 = 100
var FLUSH_BUFFER_SIZE = 1000
var MAX_MESSAGE_COUNT = 100_000

type CursorStatus string

Expand Down Expand Up @@ -114,18 +115,18 @@ type Clients struct {
mu sync.Mutex
mainChan <-chan Message
clients map[string]*Client
buffer []Message
ring *ring.RingQueue[Message]
currentlyConnected int
stats Stats
}

func NewClients(msgs <-chan Message) *Clients {
func NewClients(msgs <-chan Message, maxCount int64) *Clients {
cls := &Clients{
mu: sync.Mutex{},
mainChan: msgs,
clients: map[string]*Client{},
currentlyConnected: 0,
buffer: []Message{},
ring: ring.NewRingQueue[Message](maxCount),
stats: Stats{
Count: 0,
},
Expand All @@ -146,25 +147,27 @@ func (c *Clients) Load(clientId string, start int, count int, includeStart bool)

seen := false
sent := 0
for i, msg := range c.buffer {
c.ring.Scan(func(msg Message, i int) bool {
if i+1 == start {
seen = true
if !includeStart {
continue
return false
}
}

if !seen {
continue
return false
}

sent++
cl.handleMessage(msg, true)

if count > 0 && sent >= count {
break
return true
}
}
return false
})

cl.flushBuffer()

}
Expand All @@ -173,10 +176,14 @@ func (c *Clients) PeekLog(idxs []int) []Message {
msgs := []Message{}

for _, idx := range idxs {
if len(c.buffer)-1 < idx {
if c.ring.Size()-1 < idx {
continue
}
msgs = append(msgs, c.buffer[idx])
msg, err := c.ring.PeekIdx(idx)
if err != nil {
panic(err)
}
msgs = append(msgs, msg)
}

return msgs
Expand All @@ -198,18 +205,20 @@ func (c *Clients) ResumeFollowing(clientId string, sinceCursor bool) {
c.clients[clientId].bufferOpMu.Lock()
if sinceCursor {
seen := false
for _, msg := range c.buffer {
c.ring.Scan(func(msg Message, _ int) bool {
if msg.Id == c.clients[clientId].cursorPosition {
seen = true
continue
return false
}

if !seen {
continue
return false
}

c.clients[clientId].handleMessage(msg, true)
}
return false
})

}
c.clients[clientId].flushBuffer()
c.clients[clientId].cursorStatus = CURSOR_FOLLOWING
Expand All @@ -234,7 +243,8 @@ func (c *Clients) Start() {
if c.stats.FirstMessageAt.IsZero() {
c.stats.FirstMessageAt = time.Now()
}
c.buffer = append(c.buffer, msg)

c.ring.PushSafe(msg)
c.stats.Count++
c.stats.LastMessageAt = time.Now()

Expand All @@ -253,11 +263,15 @@ func (c *Clients) Join(tailLen int) *Client {

// deliver last N messages from a buffer upon connection
idx := 0
if len(c.buffer) > tailLen {
idx = len(c.buffer) - tailLen
if c.ring.Size() > tailLen {
idx = c.ring.Size() - tailLen
}
sl, err := c.ring.PeekSlice(idx)

for _, msg := range c.buffer[idx:] {
if err != nil {
panic(err)
}
for _, msg := range sl {
cl.handleMessage(msg, true)
}
c.clients[cl.id].cursorStatus = CURSOR_FOLLOWING
Expand Down
47 changes: 33 additions & 14 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,36 @@ import (

func TestClientStartAddToBuffer(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)

assert.Equal(t, len(c.buffer), 0)
assert.Equal(t, c.ring.Size(), 0)
ch <- Message{}
time.Sleep(1 * time.Millisecond)
assert.Equal(t, len(c.buffer), 1)
assert.Equal(t, c.ring.Size(), 1)
}

func TestClientStartAddToBufferOverSize(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch, 100)

assert.Equal(t, c.ring.Size(), 0)
for i := 0; i <= 1000; i++ {
ch <- Message{Id: strconv.Itoa(i)}
}
assert.Equal(t, c.ring.Size(), 100)

msg, err := c.ring.PeekIdx(0)
assert.Equal(t, err, nil)
assert.Equal(t, msg.Id, "901")

msg, err = c.ring.PeekIdx(99)
assert.Equal(t, err, nil)
assert.Equal(t, msg.Id, "1000")
}

func TestClientJoinSingle(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
client := c.Join(10)

ch <- Message{Content: "foo"}
Expand All @@ -33,7 +52,7 @@ func TestClientJoinSingle(t *testing.T) {

func TestClientJoinSingleAfterMessage(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
ch <- Message{Content: "foo"}
client := c.Join(10)
msg := <-client.ch
Expand All @@ -46,7 +65,7 @@ func TestClientJoinSingleTailLen(t *testing.T) {
// tailLen is shorter than num of messages produced

ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)

for i := 0; i < 20; i++ {
ch <- Message{Content: strconv.Itoa(i)}
Expand All @@ -63,7 +82,7 @@ func TestClientJoinSingleTailLen(t *testing.T) {

func TestClientJoinMultiple(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
client1 := c.Join(10)
client2 := c.Join(10)
client3 := c.Join(10)
Expand All @@ -85,7 +104,7 @@ func TestClientJoinMultiple(t *testing.T) {

func TestClientBulkWindow(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
client1 := c.Join(10)

ch <- Message{Content: "foo1"}
Expand All @@ -103,23 +122,23 @@ func TestClientBulkWindow(t *testing.T) {

func TestClientSignalQuit(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)

cl := c.Join(10)
c.Close(cl.id)
}

func TestClientCloseError(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)

c.Close("1")
c.Close("2")
}

func TestClientStopFollowAndResume(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
client := c.Join(0)
closed := false

Expand Down Expand Up @@ -185,7 +204,7 @@ L:

func TestClientStats(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
c.Join(0)

i := 0
Expand All @@ -208,7 +227,7 @@ func TestClientStats(t *testing.T) {

func TestClientPeekLog(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
c.Join(0)

i := 0
Expand All @@ -230,7 +249,7 @@ func TestClientPeekLog(t *testing.T) {

func TestClientLoad(t *testing.T) {
ch := make(chan Message)
c := NewClients(ch)
c := NewClients(ch, 1000)
client := c.Join(0)
closed := false

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/logdyhq/logdy-core
module logdy

go 1.21.4

Expand Down
3 changes: 3 additions & 0 deletions go.work
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
go 1.21.4

use .
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
8 changes: 4 additions & 4 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func handleStatus(configFilePath string, analyticsEnabled bool, uiPass string) f
}
}

func handleWs(uiPass string, msgs <-chan Message) func(w http.ResponseWriter, r *http.Request) {
clients := NewClients(msgs)
func handleWs(uiPass string, msgs <-chan Message, maxMessageCount int64) func(w http.ResponseWriter, r *http.Request) {
clients := NewClients(msgs, maxMessageCount)

// go clients.Start()

Expand Down Expand Up @@ -147,7 +147,7 @@ func handleWs(uiPass string, msgs <-chan Message) func(w http.ResponseWriter, r
}
}

func handleHttp(msgs <-chan Message, httpPort string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64) {
func handleHttp(msgs <-chan Message, httpPort string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64, maxMessageCount int64) {
assets, _ := Assets()

BULK_WINDOW_MS = bulkWindowMs
Expand All @@ -160,7 +160,7 @@ func handleHttp(msgs <-chan Message, httpPort string, analyticsEnabled bool, uiP
http.HandleFunc("/api/status", handleStatus(configFilePath, analyticsEnabled, uiPass))

// Listen for WebSocket connections on port 8080.
http.HandleFunc("/ws", handleWs(uiPass, msgs))
http.HandleFunc("/ws", handleWs(uiPass, msgs, maxMessageCount))

logger.WithFields(logrus.Fields{
"port": httpPort,
Expand Down
4 changes: 2 additions & 2 deletions log.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import (
"os"
"io/ioutil"

log "github.com/sirupsen/logrus"
)
Expand All @@ -10,7 +10,7 @@ var logger *log.Logger

func initLogger() {
logger = &log.Logger{
Out: os.Stdout,
Out: ioutil.Discard,
Level: log.DebugLevel,
Formatter: &log.TextFormatter{
DisableColors: false,
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ where you can filter and browse well formatted application output.
FallthroughGlobal, _ = cmd.Flags().GetBool("fallthrough")
verbose, _ := cmd.Flags().GetBool("verbose")
bulkWindow, _ := cmd.Flags().GetInt64("bulk-window")
maxMessageCount, _ := cmd.Flags().GetInt64("max-message-count")

if !noanalytics {
logger.Warn("No opt-out from analytics, we'll be receiving anonymous usage data, which will be used to improve the product. To opt-out use the flag --no-analytics.")
Expand All @@ -56,7 +57,7 @@ where you can filter and browse well formatted application output.
logger.SetLevel(logrus.InfoLevel)
}

handleHttp(ch, httpPort, !noanalytics, uiPass, configFile, bulkWindow)
handleHttp(ch, httpPort, !noanalytics, uiPass, configFile, bulkWindow, maxMessageCount)
},
}

Expand Down Expand Up @@ -156,6 +157,7 @@ func init() {
}

func main() {
logger.Out = os.Stdout
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
22 changes: 22 additions & 0 deletions ring/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
https://github.com/sombr/go-container-roundrobin
MIT License

Copyright (c) 2023 Serge Toro

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Loading

0 comments on commit ad367a0

Please sign in to comment.