diff --git a/sdjournal/journal.go b/sdjournal/journal.go index 0324983a..78773a5e 100644 --- a/sdjournal/journal.go +++ b/sdjournal/journal.go @@ -251,6 +251,31 @@ func (j *Journal) GetData(field string) (string, error) { return msg, nil } +// EnumerateData may be used to iterate through all data fields used in the opened journal files +// the order of the returned field names is not defined. +func (j *Journal) EnumerateData() (string, error) { + var d unsafe.Pointer + var l C.size_t + + j.mu.Lock() + r := C.sd_journal_enumerate_data(j.cjournal, &d, &l) + j.mu.Unlock() + + if r < 0 { + return "", fmt.Errorf("failed to read message: %d", r) + } + + msg := C.GoStringN((*C.char)(d), C.int(l)) + return msg, nil +} + +//RestartData resets the field name enumeration index to the beginning of the list. +func (j *Journal) RestartData() { + j.mu.Lock() + C.sd_journal_restart_data(j.cjournal) + j.mu.Unlock() +} + // GetDataValue gets the data object associated with a specific field from the // current journal entry, returning only the value of the object. func (j *Journal) GetDataValue(field string) (string, error) { diff --git a/sdjournal/read.go b/sdjournal/read.go index 8944448c..6fd30396 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -20,7 +20,10 @@ import ( "fmt" "io" "log" + "strings" "time" + + "golang.org/x/net/context" ) var ( @@ -49,6 +52,22 @@ type JournalReader struct { journal *Journal } +// FollowFilter is a function which you pass into Follow to determine if you to +// filter key value pair from a journal entry. +type FollowFilter func(key, value string) bool + +// AllKeys is a FollowFilter that allows all keys of a given journal entry through the filter +func AllKeys(key, value string) bool { + return true +} + +func OnlyMessages(key, value string) bool { + if key == "MESSAGE" { + return true + } + return false +} + // NewJournalReader creates a new JournalReader with configuration options that are similar to the // systemd journalctl tool's iteration and filtering features. func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { @@ -129,63 +148,85 @@ func (r *JournalReader) Close() error { return r.journal.Close() } -// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The -// follow will continue until a single time.Time is received on the until channel. -func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) { - - // Process journal entries and events. Entries are flushed until the tail or - // timeout is reached, and then we wait for new events or the timeout. - var msg = make([]byte, 64*1<<(10)) -process: - for { - c, err := r.Read(msg) - if err != nil && err != io.EOF { - break process - } +// Follow asynchronously follows the JournalReader it takes in a context to stop the following the Journal, it returns a +// buffered channel of errors and will stop following the journal on the first given error +func (r *JournalReader) Follow(ctx context.Context, msgs chan<- map[string]interface{}, filter FollowFilter) <-chan error { + errChan := make(chan error, 1) + go func() { + for { + kvMap := make(map[string]interface{}) + select { + case <-ctx.Done(): + errChan <- ctx.Err() + return + default: + var err error + var c int - select { - case <-until: - return ErrExpired - default: - if c > 0 { - writer.Write(msg[:c]) - continue process - } - } + // Advance the journal cursor + c, err = r.journal.Next() - // We're at the tail, so wait for new events or time out. - // Holds journal events to process. Tightly bounded for now unless there's a - // reason to unblock the journal watch routine more quickly. - events := make(chan int, 1) - pollDone := make(chan bool, 1) - go func() { - for { - select { - case <-pollDone: + // An unexpected error + if err != nil { + errChan <- err return - default: - events <- r.journal.Wait(time.Duration(1) * time.Second) + } + + // We have a new journal entry go over the fields + // get the data for what we care about and return + r.journal.RestartData() + if c > 0 { + fields: + for { + s, err := r.journal.EnumerateData() + if err != nil || len(s) == 0 { + break fields + } + s = s[:len(s)] + arr := strings.SplitN(s, "=", 2) + // if we want the pair, + // add it to the map + if filter(arr[0], arr[1]) { + kvMap[arr[0]] = arr[1] + } + } + msgs <- kvMap } } - }() - - select { - case <-until: - pollDone <- true - return ErrExpired - case e := <-events: - pollDone <- true - switch e { - case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: - // TODO: need to account for any of these? - default: - log.Printf("Received unknown event: %d\n", e) + + // we're at the tail, so wait for new events or time out. + // holds journal events to process. tightly bounded for now unless there's a + // reason to unblock the journal watch routine more quickly + events := make(chan int, 1) + pollDone := make(chan bool, 1) + go func() { + for { + select { + case <-pollDone: + return + default: + events <- r.journal.Wait(time.Duration(1) * time.Second) + } + } + }() + + select { + case <-ctx.Done(): + errChan <- ctx.Err() + pollDone <- true + return + case e := <-events: + pollDone <- true + switch e { + case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: + // TODO: need to account for any of these? + default: + log.Printf("Received unknown event: %d\n", e) + } } - continue process } - } - - return + }() + return errChan } // buildMessage returns a string representing the current journal entry in a simple format which