Skip to content

Commit

Permalink
file structure changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter committed Feb 28, 2024
1 parent 7443f23 commit 1491615
Show file tree
Hide file tree
Showing 24 changed files with 294 additions and 254 deletions.
26 changes: 7 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"sync"
"time"

. "logdy/models"
"logdy/ring"
"logdy/utils"
)

var BULK_WINDOW_MS int64 = 100
Expand All @@ -28,7 +30,7 @@ type Client struct {

func (c *Client) handleMessage(m Message, force bool) {
if !force && c.cursorStatus == CURSOR_STOPPED {
logger.Debug("Client: Status stopped discarding message")
utils.Logger.Debug("Client: Status stopped discarding message")
return
}
c.buffer = append(c.buffer, m)
Expand Down Expand Up @@ -72,7 +74,7 @@ func (c *Client) startBufferFlushLoop() {
time.Sleep(time.Millisecond * time.Duration(BULK_WINDOW_MS))
select {
case <-c.done:
logger.Debug("Client: received done signal, quitting")
utils.Logger.Debug("Client: received done signal, quitting")
defer close(c.done)
defer close(c.ch)
return
Expand All @@ -82,7 +84,7 @@ func (c *Client) startBufferFlushLoop() {
continue
}

logger.WithField("count", len(c.buffer)).Debug("Client: Flushing buffer")
utils.Logger.WithField("count", len(c.buffer)).Debug("Client: Flushing buffer")
c.cursorPosition = c.buffer[len(c.buffer)-1].Id
c.bufferOpMu.Lock()

Expand All @@ -102,7 +104,7 @@ func NewClient() *Client {
ch: make(chan []Message, BULK_WINDOW_MS*25),
cursorStatus: CURSOR_STOPPED,
cursorPosition: "",
id: RandStringRunes(6),
id: utils.RandStringRunes(6),
}

go c.startBufferFlushLoop()
Expand Down Expand Up @@ -195,20 +197,6 @@ func (c *Clients) PeekLog(idxs []int) []Message {
return msgs
}

type Stats struct {
Count int `json:"msg_count"`
FirstMessageAt time.Time `json:"first_message_at"`
LastMessageAt time.Time `json:"last_message_at"`
}

type ClientStats struct {
LastDeliveredId string `json:"last_delivered_id"`
LastDeliveredIdIdx int `json:"last_delivered_id_idx"`
// number of messages the client is behind the tail
// by tail we mean a recent message
CountToTail int `json:"count_to_tail"`
}

func (c *Clients) Stats() Stats {
return c.stats
}
Expand Down Expand Up @@ -269,7 +257,7 @@ func (c *Clients) PauseFollowing(clientId string) {
// starts a delivery channel to all clients
func (c *Clients) Start() {
if c.started {
logger.Debug("Clients delivery loop already started")
utils.Logger.Debug("Clients delivery loop already started")
return
}

Expand Down
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

. "logdy/models"

"github.com/stretchr/testify/assert"
)

Expand Down
52 changes: 0 additions & 52 deletions forward.go

This file was deleted.

3 changes: 0 additions & 3 deletions go.work

This file was deleted.

2 changes: 0 additions & 2 deletions go.work.sum

This file was deleted.

87 changes: 45 additions & 42 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@ import (
"errors"
"fmt"
"log"
"logdy/utils"
"net/http"
"strconv"
"time"

"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"

"logdy/models"
)

func handleCheckPass(uiPass string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
logger.Debug("/api/check-pass")
utils.Logger.Debug("/api/check-pass")
pass := r.URL.Query().Get("password")
if uiPass == "" {
w.WriteHeader(200)
return
}

if pass == "" || uiPass != pass {
logger.WithFields(logrus.Fields{
utils.Logger.WithFields(logrus.Fields{
"ip": r.RemoteAddr,
"ua": r.Header.Get("user-agent"),
}).Info("Client denied")
Expand All @@ -38,17 +41,17 @@ func handleCheckPass(uiPass string) func(w http.ResponseWriter, r *http.Request)
func handleStatus(configFilePath string, analyticsEnabled bool, uiPass string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {

logger.Debug("/api/status")
utils.Logger.Debug("/api/status")

configStr := ""
if configFilePath != "" {
logger.Debug("Reading config file")
configStr = loadFile(configFilePath)
utils.Logger.Debug("Reading config file")
configStr = utils.LoadFile(configFilePath)
}

initMsg, _ := json.Marshal(InitMessage{
BaseMessage: BaseMessage{
MessageType: MessageTypeInit,
initMsg, _ := json.Marshal(models.InitMessage{
BaseMessage: models.BaseMessage{
MessageType: models.MessageTypeInit,
},
AnalyticsEnabled: analyticsEnabled,
AuthRequired: uiPass != "",
Expand All @@ -59,7 +62,7 @@ func handleStatus(configFilePath string, analyticsEnabled bool, uiPass string) f
}
}

func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.ResponseWriter, r *http.Request) {
func handleWs(uiPass string, msgs <-chan models.Message, clients *Clients) func(w http.ResponseWriter, r *http.Request) {

wsUpgrader := websocket.Upgrader{
ReadBufferSize: 1024,
Expand All @@ -73,7 +76,7 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
if uiPass != "" {
pass := r.URL.Query().Get("password")
if pass == "" || uiPass != pass {
logger.WithFields(logrus.Fields{
utils.Logger.WithFields(logrus.Fields{
"ip": r.RemoteAddr,
"ua": r.Header.Get("user-agent"),
}).Info("Client denied")
Expand All @@ -89,14 +92,14 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
return
}

logger.Info("New Web UI client connected")
utils.Logger.Info("New Web UI client connected")

ch := clients.Join(100, r.URL.Query().Get("should_follow") == "true")
clientId := ch.id

bts, err := json.Marshal(ClientJoined{
BaseMessage: BaseMessage{
MessageType: MessageTypeClientJoined,
bts, err := json.Marshal(models.ClientJoined{
BaseMessage: models.BaseMessage{
MessageType: models.MessageTypeClientJoined,
},
ClientId: ch.id,
})
Expand All @@ -119,8 +122,8 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
_, _, err := conn.ReadMessage()
log.Println("ERROR", err)
if err != nil {
logger.Debug(err)
logger.WithField("client_id", clientId).Info("Closed client")
utils.Logger.Debug(err)
utils.Logger.WithField("client_id", clientId).Info("Closed client")
clients.Close(clientId)
return
}
Expand All @@ -131,9 +134,9 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
for {
time.Sleep(1 * time.Second)
if ch.cursorStatus == CURSOR_STOPPED {
bts, err = json.Marshal(ClientMsgStatus{
BaseMessage: BaseMessage{
MessageType: MessageTypeClientMsgStatus,
bts, err = json.Marshal(models.ClientMsgStatus{
BaseMessage: models.BaseMessage{
MessageType: models.MessageTypeClientMsgStatus,
},
Client: clients.ClientStats(ch.id),
Stats: clients.Stats(),
Expand All @@ -146,9 +149,9 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
err = conn.WriteMessage(1, bts)

if err != nil {
logger.Error("Err", err)
utils.Logger.Error("Err", err)
clients.Close(clientId)
logger.WithField("client_id", clientId).Info("Closed client")
utils.Logger.WithField("client_id", clientId).Info("Closed client")
break
}

Expand All @@ -158,21 +161,21 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.

for {
msgs := <-ch.ch
bulk := MessageBulk{
BaseMessage: BaseMessage{
MessageType: MessageTypeLogBulk,
bulk := models.MessageBulk{
BaseMessage: models.BaseMessage{
MessageType: models.MessageTypeLogBulk,
},
Messages: msgs,
Status: clients.Stats(),
}

logger.WithField("count", len(msgs)).Debug("Received messages")
utils.Logger.WithField("count", len(msgs)).Debug("Received messages")

if logger.Level <= logrus.DebugLevel {
if utils.Logger.Level <= logrus.DebugLevel {
for _, msg := range msgs {
mbts, _ := json.Marshal(msg)
logger.WithFields(logrus.Fields{
"msg": trunc(string(mbts), 45),
utils.Logger.WithFields(logrus.Fields{
"msg": utils.Trunc(string(mbts), 45),
"clientId": clientId,
}).Debug("Sending message through WebSocket")
}
Expand All @@ -187,15 +190,15 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
err = conn.WriteMessage(1, bts)

if err != nil {
logger.Error("Err", err)
utils.Logger.Error("Err", err)
clients.Close(clientId)
logger.WithField("client_id", clientId).Info("Closed client")
utils.Logger.WithField("client_id", clientId).Info("Closed client")
break
}

bts, err = json.Marshal(ClientMsgStatus{
BaseMessage: BaseMessage{
MessageType: MessageTypeClientMsgStatus,
bts, err = json.Marshal(models.ClientMsgStatus{
BaseMessage: models.BaseMessage{
MessageType: models.MessageTypeClientMsgStatus,
},
Client: clients.ClientStats(ch.id),
Stats: clients.Stats(),
Expand All @@ -209,9 +212,9 @@ func handleWs(uiPass string, msgs <-chan Message, clients *Clients) func(w http.
err = conn.WriteMessage(1, bts)

if err != nil {
logger.Error("Err", err)
utils.Logger.Error("Err", err)
clients.Close(clientId)
logger.WithField("client_id", clientId).Info("Closed client")
utils.Logger.WithField("client_id", clientId).Info("Closed client")
break
}
}
Expand All @@ -238,15 +241,15 @@ func getClientOrErr(r *http.Request, w http.ResponseWriter, clients *Clients) *C
cid, err := getClientId(r)

if err != nil {
logger.Error("Missing client id")
utils.Logger.Error("Missing client id")
w.WriteHeader(http.StatusBadRequest)
return nil
}

cl, ok := clients.GetClient(cid)

if !ok {
logger.WithField("client_id", cid).Error("Missing client")
utils.Logger.WithField("client_id", cid).Error("Missing client")
w.WriteHeader(http.StatusBadRequest)
return nil
}
Expand All @@ -269,7 +272,7 @@ func handleClientStatus(clients *Clients) func(w http.ResponseWriter, r *http.Re
case string(CURSOR_STOPPED):
clients.PauseFollowing(cl.id)
default:
logger.Error("Unrecognized status")
utils.Logger.Error("Unrecognized status")
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -290,13 +293,13 @@ func handleClientLoad(clients *Clients) func(w http.ResponseWriter, r *http.Requ

startInt, err := strconv.Atoi(start)
if err != nil {
logger.Error("Invalid start")
utils.Logger.Error("Invalid start")
w.WriteHeader(http.StatusBadRequest)
return
}
countInt, err := strconv.Atoi(count)
if err != nil {
logger.Error("Invalid count")
utils.Logger.Error("Invalid count")
w.WriteHeader(http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -337,7 +340,7 @@ func handleClientPeek(clients *Clients) func(w http.ResponseWriter, r *http.Requ
}
}

func handleHttp(msgs <-chan Message, httpPort string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64, maxMessageCount int64) {
func handleHttp(msgs <-chan models.Message, httpPort string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64, maxMessageCount int64) {
assets, _ := Assets()
clients := NewClients(msgs, maxMessageCount)

Expand All @@ -356,7 +359,7 @@ func handleHttp(msgs <-chan Message, httpPort string, analyticsEnabled bool, uiP
// Listen for WebSocket connections on port 8080.
http.HandleFunc("/ws", handleWs(uiPass, msgs, clients))

logger.WithFields(logrus.Fields{
utils.Logger.WithFields(logrus.Fields{
"port": httpPort,
}).Info("WebUI started, visit http://localhost:" + httpPort)

Expand Down
Binary file added logdy
Binary file not shown.
Loading

0 comments on commit 1491615

Please sign in to comment.