-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Follow with filter #152
base: main
Are you sure you want to change the base?
Follow with filter #152
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,10 @@ import ( | |
"fmt" | ||
"io" | ||
"log" | ||
"strings" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
) | ||
|
||
var ( | ||
|
@@ -49,6 +52,22 @@ type JournalReader struct { | |
journal *Journal | ||
} | ||
|
||
// FollowFilter is a function which you pass into Follow to determine if you to | ||
// filter key value pair from a journal entry. | ||
type FollowFilter func(key, value string) bool | ||
|
||
// AllKeys is a FollowFilter that allows all keys of a given journal entry through the filter | ||
func AllKeys(key, value string) bool { | ||
return true | ||
} | ||
|
||
func OnlyMessages(key, value string) bool { | ||
if key == "MESSAGE" { | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
// NewJournalReader creates a new JournalReader with configuration options that are similar to the | ||
// systemd journalctl tool's iteration and filtering features. | ||
func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { | ||
|
@@ -129,63 +148,85 @@ func (r *JournalReader) Close() error { | |
return r.journal.Close() | ||
} | ||
|
||
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The | ||
// follow will continue until a single time.Time is received on the until channel. | ||
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) { | ||
|
||
// Process journal entries and events. Entries are flushed until the tail or | ||
// timeout is reached, and then we wait for new events or the timeout. | ||
var msg = make([]byte, 64*1<<(10)) | ||
process: | ||
for { | ||
c, err := r.Read(msg) | ||
if err != nil && err != io.EOF { | ||
break process | ||
} | ||
// Follow asynchronously follows the JournalReader it takes in a context to stop the following the Journal, it returns a | ||
// buffered channel of errors and will stop following the journal on the first given error | ||
func (r *JournalReader) Follow(ctx context.Context, msgs chan<- map[string]interface{}, filter FollowFilter) <-chan error { | ||
errChan := make(chan error, 1) | ||
go func() { | ||
for { | ||
kvMap := make(map[string]interface{}) | ||
select { | ||
case <-ctx.Done(): | ||
errChan <- ctx.Err() | ||
return | ||
default: | ||
var err error | ||
var c int | ||
|
||
select { | ||
case <-until: | ||
return ErrExpired | ||
default: | ||
if c > 0 { | ||
writer.Write(msg[:c]) | ||
continue process | ||
} | ||
} | ||
// Advance the journal cursor | ||
c, err = r.journal.Next() | ||
|
||
// We're at the tail, so wait for new events or time out. | ||
// Holds journal events to process. Tightly bounded for now unless there's a | ||
// reason to unblock the journal watch routine more quickly. | ||
events := make(chan int, 1) | ||
pollDone := make(chan bool, 1) | ||
go func() { | ||
for { | ||
select { | ||
case <-pollDone: | ||
// An unexpected error | ||
if err != nil { | ||
errChan <- err | ||
return | ||
default: | ||
events <- r.journal.Wait(time.Duration(1) * time.Second) | ||
} | ||
|
||
// We have a new journal entry go over the fields | ||
// get the data for what we care about and return | ||
r.journal.RestartData() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about moving this below |
||
if c > 0 { | ||
fields: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this tag? |
||
for { | ||
s, err := r.journal.EnumerateData() | ||
if err != nil || len(s) == 0 { | ||
break fields | ||
} | ||
s = s[:len(s)] | ||
arr := strings.SplitN(s, "=", 2) | ||
// if we want the pair, | ||
// add it to the map | ||
if filter(arr[0], arr[1]) { | ||
kvMap[arr[0]] = arr[1] | ||
} | ||
} | ||
msgs <- kvMap | ||
} | ||
} | ||
}() | ||
|
||
select { | ||
case <-until: | ||
pollDone <- true | ||
return ErrExpired | ||
case e := <-events: | ||
pollDone <- true | ||
switch e { | ||
case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: | ||
// TODO: need to account for any of these? | ||
default: | ||
log.Printf("Received unknown event: %d\n", e) | ||
|
||
// we're at the tail, so wait for new events or time out. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont think this is true, we haven't drained the journal yet.
A better way would be put all these into a function to make it cleaner
|
||
// holds journal events to process. tightly bounded for now unless there's a | ||
// reason to unblock the journal watch routine more quickly | ||
events := make(chan int, 1) | ||
pollDone := make(chan bool, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe change the size to 0, and close the channel instead of sending a bool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or better, can we reuse these channels ? |
||
go func() { | ||
for { | ||
select { | ||
case <-pollDone: | ||
return | ||
default: | ||
events <- r.journal.Wait(time.Duration(1) * time.Second) | ||
} | ||
} | ||
}() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
errChan <- ctx.Err() | ||
pollDone <- true | ||
return | ||
case e := <-events: | ||
pollDone <- true | ||
switch e { | ||
case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: | ||
// TODO: need to account for any of these? | ||
default: | ||
log.Printf("Received unknown event: %d\n", e) | ||
} | ||
} | ||
continue process | ||
} | ||
} | ||
|
||
return | ||
}() | ||
return errChan | ||
} | ||
|
||
// buildMessage returns a string representing the current journal entry in a simple format which | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why value type is
interface{}
? Aren't strings enough?