Skip to content

Commit

Permalink
follow mode now allows to read entire file
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter committed Apr 3, 2024
1 parent f558c6b commit abe8593
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 21 deletions.
8 changes: 6 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func NewClients(msgs <-chan Message, maxCount int64) *Clients {
currentlyConnected: 0,
ring: ring.NewRingQueue[Message](maxCount),
stats: Stats{
Count: 0,
MaxCount: maxCount,
Count: 0,
},
}

Expand Down Expand Up @@ -269,7 +270,10 @@ func (c *Clients) Start() {
}

c.ring.PushSafe(msg)
c.stats.Count++
if c.stats.Count < int(c.stats.MaxCount) {
c.stats.Count++
}

c.stats.LastMessageAt = time.Now()

for _, ch := range c.clients {
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ require (
)

require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
golang.org/x/sys v0.16.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -21,6 +27,7 @@ require (
require (
github.com/brianvoe/gofakeit v3.18.0+incompatible
github.com/brianvoe/gofakeit/v6 v6.28.0
github.com/cheggaaa/pb/v3 v3.1.5
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/brianvoe/gofakeit v3.18.0+incompatible h1:wDOmHc9DLG4nRjUVVaxA+CEglKOW72Y5+4WNxUIkjM8=
github.com/brianvoe/gofakeit v3.18.0+incompatible/go.mod h1:kfwdRA90vvNhPutZWfH7WPaDzUjz+CZFqG+rPkOjGOc=
github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4=
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/cheggaaa/pb/v3 v3.1.5 h1:QuuUzeM2WsAqG2gMqtzaWithDJv0i+i6UlnwSCI4QLk=
github.com/cheggaaa/pb/v3 v3.1.5/go.mod h1:CrxkeghYTXi1lQBEI7jSn+3svI3cuc19haAj6jM60XI=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand All @@ -32,7 +47,9 @@ github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLr
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
5 changes: 3 additions & 2 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ func handleClientPeek(clients *Clients) func(w http.ResponseWriter, r *http.Requ
}
}

func handleHttp(msgs <-chan models.Message, httpPort string, uiIp string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64, maxMessageCount int64) {
var clients *Clients

func handleHttp(httpPort string, uiIp string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64) {
assets, _ := Assets()
clients := NewClients(msgs, maxMessageCount)

BULK_WINDOW_MS = bulkWindowMs

Expand Down
47 changes: 34 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ where you can filter and browse well formatted application output.
`,
Run: func(cmd *cobra.Command, args []string) {
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
verbose, _ := cmd.Flags().GetBool("verbose")
if verbose {
utils.Logger.SetLevel(logrus.TraceLevel)
} else {
utils.Logger.SetLevel(logrus.InfoLevel)
}
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {

noupdates, _ := cmd.Flags().GetBool("no-updates")
Expand All @@ -47,29 +55,30 @@ where you can filter and browse well formatted application output.
uiIp, _ := cmd.Flags().GetString("ui-ip")
uiPass, _ := cmd.Flags().GetString("ui-pass")
configFile, _ := cmd.Flags().GetString("config")
appendToFile, _ := cmd.Flags().GetString("append-to-file")
appendToFileRaw, _ := cmd.Flags().GetBool("append-to-file-raw")
noanalytics, _ := cmd.Flags().GetBool("no-analytics")
modes.FallthroughGlobal, _ = cmd.Flags().GetBool("fallthrough")
verbose, _ := cmd.Flags().GetBool("verbose")
bulkWindow, _ := cmd.Flags().GetInt64("bulk-window")
maxMessageCount, _ := cmd.Flags().GetInt64("max-message-count")
modes.FallthroughGlobal, _ = cmd.Flags().GetBool("fallthrough")

if !noanalytics {
utils.Logger.Warn("No opt-out from analytics, we'll be receiving anonymous usage data, which will be used to improve the product. To opt-out use the flag --no-analytics.")
}

if verbose {
utils.Logger.SetLevel(logrus.TraceLevel)
} else {
utils.Logger.SetLevel(logrus.InfoLevel)
if clients == nil {
InitializeClients(cmd)
}

mainChan := utils.ProcessIncomingMessages(ch, appendToFile, appendToFileRaw)
handleHttp(mainChan, httpPort, uiIp, !noanalytics, uiPass, configFile, bulkWindow, maxMessageCount)
handleHttp(httpPort, uiIp, !noanalytics, uiPass, configFile, bulkWindow)
},
}

func InitializeClients(cmd *cobra.Command) {
appendToFile, _ := cmd.Flags().GetString("append-to-file")
appendToFileRaw, _ := cmd.Flags().GetBool("append-to-file-raw")
maxMessageCount, _ := cmd.Flags().GetInt64("max-message-count")
mainChan := utils.ProcessIncomingMessages(ch, appendToFile, appendToFileRaw)
clients = NewClients(mainChan, maxMessageCount)
}

var listenStdCmd = &cobra.Command{
Use: "stdin [command]",
Short: "Listens to STDOUT/STDERR of a provided command. Example `logdy stdin \"npm run dev\"`",
Expand All @@ -96,6 +105,13 @@ var followCmd = &cobra.Command{
Long: ``,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
InitializeClients(cmd)
fullRead, _ := cmd.Flags().GetBool("full-read")

if fullRead {
modes.ReadFiles(ch, args)
}

modes.FollowFiles(ch, args)
},
}
Expand Down Expand Up @@ -156,15 +172,20 @@ func init() {
rootCmd.PersistentFlags().BoolP("no-analytics", "n", false, "Opt-out from sending anonymous analytical data that helps improve Logdy")
rootCmd.PersistentFlags().BoolP("no-updates", "u", false, "Opt-out from checking updates on program startup")
rootCmd.PersistentFlags().BoolP("fallthrough", "t", false, "Will fallthrough all of the stdin received to the terminal as is (will display incoming messages)")
demoSocketCmd.PersistentFlags().BoolP("sample-text", "", true, "By default demo data will produce JSON, use this flag to produce raw text")
listenSocketCmd.PersistentFlags().StringP("ip", "", "", "IP address to listen to, leave empty to listen on all IP addresses")

utils.InitLogger()

rootCmd.AddCommand(listenStdCmd)

listenSocketCmd.PersistentFlags().StringP("ip", "", "", "IP address to listen to, leave empty to listen on all IP addresses")
rootCmd.AddCommand(listenSocketCmd)

rootCmd.AddCommand(forwardCmd)

demoSocketCmd.PersistentFlags().BoolP("sample-text", "", true, "By default demo data will produce JSON, use this flag to produce raw text")
rootCmd.AddCommand(demoSocketCmd)

followCmd.Flags().BoolP("full-read", "", false, "Whether the the file(s) should be read entirely")
rootCmd.AddCommand(followCmd)
}

Expand Down
1 change: 1 addition & 0 deletions models/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import "time"

type Stats struct {
MaxCount int64 `json:"max_count"`
Count int `json:"msg_count"`
FirstMessageAt time.Time `json:"first_message_at"`
LastMessageAt time.Time `json:"last_message_at"`
Expand Down
33 changes: 29 additions & 4 deletions modes/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ func FollowFiles(ch chan models.Message, files []string) {
"error": err.Error(),
}).Error("Following file changes failed")
continue
} else {
utils.Logger.WithFields(logrus.Fields{
"path": file,
}).Info("Following file changes")
}
utils.Logger.WithFields(logrus.Fields{
"path": file,
}).Info("Following file changes")

go func(file string) {
t, err := tail.TailFile(
Expand All @@ -46,3 +45,29 @@ func FollowFiles(ch chan models.Message, files []string) {
}

}

func ReadFiles(ch chan models.Message, files []string) {
for _, file := range files {

_, err := os.Stat(file)
if err != nil {
utils.Logger.WithFields(logrus.Fields{
"path": file,
"error": err.Error(),
}).Error("Reading file failed")
continue
}

r, size, bar := utils.OpenFileForReading(file)
utils.Logger.WithFields(logrus.Fields{
"path": file,
"size_bytes": size,
}).Info("Reading file")

utils.LineCounterWithChannel(r, func(line utils.Line, cancel func()) {
produce(ch, string(line.Line), models.MessageTypeStdout, &models.MessageOrigin{File: file})
})
bar.Finish()

}
}
83 changes: 83 additions & 0 deletions utils/file_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package utils

import (
"bytes"
"context"
"io"
"os"

"github.com/cheggaaa/pb/v3"
)

type Line struct {
seq int
Line []byte
}

func LineCounterWithChannel(r io.Reader, fn func(line Line, cancel func())) error {
ctx, cancel := context.WithCancel(context.Background())
const bufferSize = 64 * 1024
buf := make([]byte, bufferSize)
var previousLine = []byte{}
seq := 0

for {

if ctx.Err() != nil {
cancel()
return nil
}

i, err := r.Read(buf)
if err != nil && err != io.EOF {
panic(err)
}

if i == 0 && len(previousLine) == 0 {
cancel()
return nil
}

previousLine = append(previousLine, buf[:i]...)
newlineIndex := bytes.IndexByte(previousLine, '\n')

if newlineIndex == -1 {

if i == bufferSize {
continue
}

if err == io.EOF || (i == 0 || i < bufferSize) {
fn(Line{seq: seq, Line: previousLine}, cancel)
return nil
}
}

lines := bytes.Split(previousLine, []byte{'\n'})
previousLine = lines[len(lines)-1]
lines = lines[:len(lines)-1]

for _, line := range lines {
seq++
fn(Line{seq: seq, Line: line}, cancel)
}

if err == io.EOF {
fn(Line{seq: seq, Line: previousLine}, cancel)
return nil
}
}
}

func OpenFileForReading(file string) (io.Reader, int64, *pb.ProgressBar) {
reader, err := os.Open(file)

if err != nil {
panic(err)
}

fi, _ := reader.Stat()
bar := pb.Full.Start64(fi.Size())
return bar.NewProxyReader(reader), fi.Size(), bar
// return reader, fi.Size()
}
46 changes: 46 additions & 0 deletions utils/file_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package utils

import (
"bytes"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestLineCounterWithChannel(t *testing.T) {

tests := []struct {
input string
buffer int
linesCount int
}{
{input: "123", buffer: 5, linesCount: 1},
{input: "123456", buffer: 5, linesCount: 1},
{input: "123456790abc", buffer: 5, linesCount: 1},

{input: "12\n3", buffer: 5, linesCount: 2},
{input: "12\n3456789", buffer: 5, linesCount: 2},
{input: "12\n3456\n789", buffer: 5, linesCount: 3},
{input: "12\n34aaabbb56\n789", buffer: 5, linesCount: 3},
{input: "12\n34aaabbbcccddd56\n789", buffer: 5, linesCount: 3},

{input: "1\n\n2", buffer: 5, linesCount: 3},
{input: "1\n\n\n\n2", buffer: 5, linesCount: 5},
}

for _, tc := range tests {

chars := ""
c := 0
LineCounterWithChannel(bytes.NewBufferString(tc.input), func(line Line, cancel func()) {
c++
chars = chars + string(line.Line)
})

assert.Equal(t, tc.linesCount, c)
assert.Equal(t, strings.ReplaceAll(tc.input, "\n", ""), chars)

}

}

0 comments on commit abe8593

Please sign in to comment.