From abe8593b478379c4f9ed2ce471231e7b7e8b726c Mon Sep 17 00:00:00 2001 From: Peter <> Date: Wed, 3 Apr 2024 22:31:09 +0200 Subject: [PATCH] follow mode now allows to read entire file --- client.go | 8 +++- go.mod | 7 ++++ go.sum | 17 ++++++++ http.go | 5 ++- main.go | 47 ++++++++++++++++------ models/client.go | 1 + modes/follow.go | 33 ++++++++++++++-- utils/file_reader.go | 83 +++++++++++++++++++++++++++++++++++++++ utils/file_reader_test.go | 46 ++++++++++++++++++++++ 9 files changed, 226 insertions(+), 21 deletions(-) create mode 100644 utils/file_reader.go create mode 100644 utils/file_reader_test.go diff --git a/client.go b/client.go index 69a4f5c..a46d5ee 100644 --- a/client.go +++ b/client.go @@ -130,7 +130,8 @@ func NewClients(msgs <-chan Message, maxCount int64) *Clients { currentlyConnected: 0, ring: ring.NewRingQueue[Message](maxCount), stats: Stats{ - Count: 0, + MaxCount: maxCount, + Count: 0, }, } @@ -269,7 +270,10 @@ func (c *Clients) Start() { } c.ring.PushSafe(msg) - c.stats.Count++ + if c.stats.Count < int(c.stats.MaxCount) { + c.stats.Count++ + } + c.stats.LastMessageAt = time.Now() for _, ch := range c.clients { diff --git a/go.mod b/go.mod index 1348345..c5fb532 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,16 @@ require ( ) require ( + github.com/VividCortex/ewma v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/color v1.15.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect github.com/nxadm/tail v1.4.11 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect golang.org/x/sys v0.16.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -21,6 +27,7 @@ require ( require ( github.com/brianvoe/gofakeit v3.18.0+incompatible github.com/brianvoe/gofakeit/v6 v6.28.0 + github.com/cheggaaa/pb/v3 v3.1.5 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/sirupsen/logrus v1.9.3 github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index c7d1add..2b41d73 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,36 @@ +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/brianvoe/gofakeit v3.18.0+incompatible h1:wDOmHc9DLG4nRjUVVaxA+CEglKOW72Y5+4WNxUIkjM8= github.com/brianvoe/gofakeit v3.18.0+incompatible/go.mod h1:kfwdRA90vvNhPutZWfH7WPaDzUjz+CZFqG+rPkOjGOc= github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4= github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= +github.com/cheggaaa/pb/v3 v3.1.5 h1:QuuUzeM2WsAqG2gMqtzaWithDJv0i+i6UlnwSCI4QLk= +github.com/cheggaaa/pb/v3 v3.1.5/go.mod h1:CrxkeghYTXi1lQBEI7jSn+3svI3cuc19haAj6jM60XI= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= +github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -32,7 +47,9 @@ github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLr golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/http.go b/http.go index b622f95..193dcc2 100644 --- a/http.go +++ b/http.go @@ -347,9 +347,10 @@ func handleClientPeek(clients *Clients) func(w http.ResponseWriter, r *http.Requ } } -func handleHttp(msgs <-chan models.Message, httpPort string, uiIp string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64, maxMessageCount int64) { +var clients *Clients + +func handleHttp(httpPort string, uiIp string, analyticsEnabled bool, uiPass string, configFilePath string, bulkWindowMs int64) { assets, _ := Assets() - clients := NewClients(msgs, maxMessageCount) BULK_WINDOW_MS = bulkWindowMs diff --git a/main.go b/main.go index 537c100..0333dfc 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,14 @@ where you can filter and browse well formatted application output. `, Run: func(cmd *cobra.Command, args []string) { }, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + verbose, _ := cmd.Flags().GetBool("verbose") + if verbose { + utils.Logger.SetLevel(logrus.TraceLevel) + } else { + utils.Logger.SetLevel(logrus.InfoLevel) + } + }, PersistentPostRun: func(cmd *cobra.Command, args []string) { noupdates, _ := cmd.Flags().GetBool("no-updates") @@ -47,29 +55,30 @@ where you can filter and browse well formatted application output. uiIp, _ := cmd.Flags().GetString("ui-ip") uiPass, _ := cmd.Flags().GetString("ui-pass") configFile, _ := cmd.Flags().GetString("config") - appendToFile, _ := cmd.Flags().GetString("append-to-file") - appendToFileRaw, _ := cmd.Flags().GetBool("append-to-file-raw") noanalytics, _ := cmd.Flags().GetBool("no-analytics") - modes.FallthroughGlobal, _ = cmd.Flags().GetBool("fallthrough") - verbose, _ := cmd.Flags().GetBool("verbose") bulkWindow, _ := cmd.Flags().GetInt64("bulk-window") - maxMessageCount, _ := cmd.Flags().GetInt64("max-message-count") + modes.FallthroughGlobal, _ = cmd.Flags().GetBool("fallthrough") if !noanalytics { utils.Logger.Warn("No opt-out from analytics, we'll be receiving anonymous usage data, which will be used to improve the product. To opt-out use the flag --no-analytics.") } - if verbose { - utils.Logger.SetLevel(logrus.TraceLevel) - } else { - utils.Logger.SetLevel(logrus.InfoLevel) + if clients == nil { + InitializeClients(cmd) } - mainChan := utils.ProcessIncomingMessages(ch, appendToFile, appendToFileRaw) - handleHttp(mainChan, httpPort, uiIp, !noanalytics, uiPass, configFile, bulkWindow, maxMessageCount) + handleHttp(httpPort, uiIp, !noanalytics, uiPass, configFile, bulkWindow) }, } +func InitializeClients(cmd *cobra.Command) { + appendToFile, _ := cmd.Flags().GetString("append-to-file") + appendToFileRaw, _ := cmd.Flags().GetBool("append-to-file-raw") + maxMessageCount, _ := cmd.Flags().GetInt64("max-message-count") + mainChan := utils.ProcessIncomingMessages(ch, appendToFile, appendToFileRaw) + clients = NewClients(mainChan, maxMessageCount) +} + var listenStdCmd = &cobra.Command{ Use: "stdin [command]", Short: "Listens to STDOUT/STDERR of a provided command. Example `logdy stdin \"npm run dev\"`", @@ -96,6 +105,13 @@ var followCmd = &cobra.Command{ Long: ``, Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { + InitializeClients(cmd) + fullRead, _ := cmd.Flags().GetBool("full-read") + + if fullRead { + modes.ReadFiles(ch, args) + } + modes.FollowFiles(ch, args) }, } @@ -156,15 +172,20 @@ func init() { rootCmd.PersistentFlags().BoolP("no-analytics", "n", false, "Opt-out from sending anonymous analytical data that helps improve Logdy") rootCmd.PersistentFlags().BoolP("no-updates", "u", false, "Opt-out from checking updates on program startup") rootCmd.PersistentFlags().BoolP("fallthrough", "t", false, "Will fallthrough all of the stdin received to the terminal as is (will display incoming messages)") - demoSocketCmd.PersistentFlags().BoolP("sample-text", "", true, "By default demo data will produce JSON, use this flag to produce raw text") - listenSocketCmd.PersistentFlags().StringP("ip", "", "", "IP address to listen to, leave empty to listen on all IP addresses") utils.InitLogger() rootCmd.AddCommand(listenStdCmd) + + listenSocketCmd.PersistentFlags().StringP("ip", "", "", "IP address to listen to, leave empty to listen on all IP addresses") rootCmd.AddCommand(listenSocketCmd) + rootCmd.AddCommand(forwardCmd) + + demoSocketCmd.PersistentFlags().BoolP("sample-text", "", true, "By default demo data will produce JSON, use this flag to produce raw text") rootCmd.AddCommand(demoSocketCmd) + + followCmd.Flags().BoolP("full-read", "", false, "Whether the the file(s) should be read entirely") rootCmd.AddCommand(followCmd) } diff --git a/models/client.go b/models/client.go index b10f8c2..c8b7797 100644 --- a/models/client.go +++ b/models/client.go @@ -3,6 +3,7 @@ package models import "time" type Stats struct { + MaxCount int64 `json:"max_count"` Count int `json:"msg_count"` FirstMessageAt time.Time `json:"first_message_at"` LastMessageAt time.Time `json:"last_message_at"` diff --git a/modes/follow.go b/modes/follow.go index a1bc768..d3dc5e2 100644 --- a/modes/follow.go +++ b/modes/follow.go @@ -22,11 +22,10 @@ func FollowFiles(ch chan models.Message, files []string) { "error": err.Error(), }).Error("Following file changes failed") continue - } else { - utils.Logger.WithFields(logrus.Fields{ - "path": file, - }).Info("Following file changes") } + utils.Logger.WithFields(logrus.Fields{ + "path": file, + }).Info("Following file changes") go func(file string) { t, err := tail.TailFile( @@ -46,3 +45,29 @@ func FollowFiles(ch chan models.Message, files []string) { } } + +func ReadFiles(ch chan models.Message, files []string) { + for _, file := range files { + + _, err := os.Stat(file) + if err != nil { + utils.Logger.WithFields(logrus.Fields{ + "path": file, + "error": err.Error(), + }).Error("Reading file failed") + continue + } + + r, size, bar := utils.OpenFileForReading(file) + utils.Logger.WithFields(logrus.Fields{ + "path": file, + "size_bytes": size, + }).Info("Reading file") + + utils.LineCounterWithChannel(r, func(line utils.Line, cancel func()) { + produce(ch, string(line.Line), models.MessageTypeStdout, &models.MessageOrigin{File: file}) + }) + bar.Finish() + + } +} diff --git a/utils/file_reader.go b/utils/file_reader.go new file mode 100644 index 0000000..fcc6059 --- /dev/null +++ b/utils/file_reader.go @@ -0,0 +1,83 @@ +package utils + +import ( + "bytes" + "context" + "io" + "os" + + "github.com/cheggaaa/pb/v3" +) + +type Line struct { + seq int + Line []byte +} + +func LineCounterWithChannel(r io.Reader, fn func(line Line, cancel func())) error { + ctx, cancel := context.WithCancel(context.Background()) + const bufferSize = 64 * 1024 + buf := make([]byte, bufferSize) + var previousLine = []byte{} + seq := 0 + + for { + + if ctx.Err() != nil { + cancel() + return nil + } + + i, err := r.Read(buf) + if err != nil && err != io.EOF { + panic(err) + } + + if i == 0 && len(previousLine) == 0 { + cancel() + return nil + } + + previousLine = append(previousLine, buf[:i]...) + newlineIndex := bytes.IndexByte(previousLine, '\n') + + if newlineIndex == -1 { + + if i == bufferSize { + continue + } + + if err == io.EOF || (i == 0 || i < bufferSize) { + fn(Line{seq: seq, Line: previousLine}, cancel) + return nil + } + } + + lines := bytes.Split(previousLine, []byte{'\n'}) + previousLine = lines[len(lines)-1] + lines = lines[:len(lines)-1] + + for _, line := range lines { + seq++ + fn(Line{seq: seq, Line: line}, cancel) + } + + if err == io.EOF { + fn(Line{seq: seq, Line: previousLine}, cancel) + return nil + } + } +} + +func OpenFileForReading(file string) (io.Reader, int64, *pb.ProgressBar) { + reader, err := os.Open(file) + + if err != nil { + panic(err) + } + + fi, _ := reader.Stat() + bar := pb.Full.Start64(fi.Size()) + return bar.NewProxyReader(reader), fi.Size(), bar + // return reader, fi.Size() +} diff --git a/utils/file_reader_test.go b/utils/file_reader_test.go new file mode 100644 index 0000000..66fab1e --- /dev/null +++ b/utils/file_reader_test.go @@ -0,0 +1,46 @@ +package utils + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLineCounterWithChannel(t *testing.T) { + + tests := []struct { + input string + buffer int + linesCount int + }{ + {input: "123", buffer: 5, linesCount: 1}, + {input: "123456", buffer: 5, linesCount: 1}, + {input: "123456790abc", buffer: 5, linesCount: 1}, + + {input: "12\n3", buffer: 5, linesCount: 2}, + {input: "12\n3456789", buffer: 5, linesCount: 2}, + {input: "12\n3456\n789", buffer: 5, linesCount: 3}, + {input: "12\n34aaabbb56\n789", buffer: 5, linesCount: 3}, + {input: "12\n34aaabbbcccddd56\n789", buffer: 5, linesCount: 3}, + + {input: "1\n\n2", buffer: 5, linesCount: 3}, + {input: "1\n\n\n\n2", buffer: 5, linesCount: 5}, + } + + for _, tc := range tests { + + chars := "" + c := 0 + LineCounterWithChannel(bytes.NewBufferString(tc.input), func(line Line, cancel func()) { + c++ + chars = chars + string(line.Line) + }) + + assert.Equal(t, tc.linesCount, c) + assert.Equal(t, strings.ReplaceAll(tc.input, "\n", ""), chars) + + } + +}