From 583118881ff266363453a4e47c6482594721b4af Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Tue, 17 Dec 2024 15:38:19 +0100 Subject: [PATCH] pkg/email/lore: extract patch series Refactor the code to make it more reusable. Add a method to extract specifically the list of new patch series. --- pkg/email/lore/parse.go | 169 +++++++++++++++++++++++++++++-- pkg/email/lore/parse_test.go | 186 +++++++++++++++++++++++++++++++++++ pkg/email/lore/read.go | 37 +++++-- tools/syz-lore/query_lkml.go | 33 ++++--- 4 files changed, 398 insertions(+), 27 deletions(-) diff --git a/pkg/email/lore/parse.go b/pkg/email/lore/parse.go index d0e4d4fe242f..98152b4083da 100644 --- a/pkg/email/lore/parse.go +++ b/pkg/email/lore/parse.go @@ -4,14 +4,17 @@ package lore import ( + "fmt" "regexp" "sort" + "strconv" "strings" "github.com/google/syzkaller/dashboard/dashapi" "github.com/google/syzkaller/pkg/email" ) +// Thread is a generic representation of a single discussion in the mailing list. type Thread struct { Subject string MessageID string @@ -20,9 +23,28 @@ type Thread struct { Messages []*email.Email } +// Series represents a single patch series sent over email. +type Series struct { + Subject string + MessageID string + Version int + Corrupted string // If non-empty, contains a reason why the series better be ignored. + Patches []Patch +} + +type Patch struct { + Seq int + *email.Email +} + // Threads extracts individual threads from a list of emails. func Threads(emails []*email.Email) []*Thread { + return listThreads(emails, 0) +} + +func listThreads(emails []*email.Email, maxDepth int) []*Thread { ctx := &parseCtx{ + maxDepth: maxDepth, messages: map[string]*email.Email{}, next: map[*email.Email][]*email.Email{}, } @@ -33,6 +55,68 @@ func Threads(emails []*email.Email) []*Thread { return ctx.threads } +// PatchSeries is similar to Threads, but returns only the patch series submitted to the mailing lists. +func PatchSeries(emails []*email.Email) []*Series { + var ret []*Series + // Normally, all following series patches are sent in response to the first email sent. + // So there's no sense to look at deeper replies. + for _, thread := range listThreads(emails, 1) { + if thread.Type != dashapi.DiscussionPatch { + continue + } + patch, ok := parsePatchSubject(thread.Subject) + if !ok { + // It must never be happening. + panic("DiscussionPatch is set, but we fail to parse the thread subject") + } + total := patch.Total.ValueOr(1) + series := &Series{ + Subject: patch.Title, + MessageID: thread.MessageID, + Version: patch.Version.ValueOr(1), + } + ret = append(ret, series) + if patch.Seq.IsSet() && patch.Seq.Value() > 1 { + series.Corrupted = "the first patch has seq>1" + continue + } + hasSeq := map[int]bool{} + for _, email := range thread.Messages { + patch, ok := parsePatchSubject(email.Subject) + if !ok { + continue + } + seq := patch.Seq.ValueOr(1) + if seq == 0 { + // The cover email is not of interest. + continue + } + if hasSeq[seq] { + // It's weird if that really happens, but let's skip for now. + continue + } + hasSeq[seq] = true + series.Patches = append(series.Patches, Patch{ + Seq: seq, + Email: email, + }) + } + if len(hasSeq) != total { + series.Corrupted = fmt.Sprintf("the subject mentions %d patches, %d are found", + total, len(hasSeq)) + continue + } + if len(series.Patches) == 0 { + series.Corrupted = "0 patches" + continue + } + sort.Slice(series.Patches, func(i, j int) bool { + return series.Patches[i].Seq < series.Patches[j].Seq + }) + } + return ret +} + // DiscussionType extracts the specific discussion type from an email. func DiscussionType(msg *email.Email) dashapi.DiscussionType { discType := dashapi.DiscussionMention @@ -40,7 +124,7 @@ func DiscussionType(msg *email.Email) dashapi.DiscussionType { discType = dashapi.DiscussionReport } // This is very crude, but should work for now. - if patchSubjectRe.MatchString(strings.ToLower(msg.Subject)) { + if _, ok := parsePatchSubject(msg.Subject); ok { discType = dashapi.DiscussionPatch } else if strings.Contains(msg.Subject, "Monthly") { discType = dashapi.DiscussionReminder @@ -48,9 +132,51 @@ func DiscussionType(msg *email.Email) dashapi.DiscussionType { return discType } -var patchSubjectRe = regexp.MustCompile(`\[(?:(?:rfc|resend)\s+)*patch`) +type PatchSubject struct { + Title string + Tags []string // Sometimes there's e.g. "net" or "next-next" in the subject. + Version Optional[int] + Seq Optional[int] // The "Seq/Total" part. + Total Optional[int] +} + +// nolint: lll +var patchSubjectRe = regexp.MustCompile(`(?mi)^\[(?:([\w\s-]+)\s)?PATCH(?:\s([\w\s-]+))??(?:\s0*(\d+)\/(\d+))?\]\s*(.+)`) + +func parsePatchSubject(subject string) (PatchSubject, bool) { + var ret PatchSubject + groups := patchSubjectRe.FindStringSubmatch(subject) + if len(groups) == 0 { + return ret, false + } + tags := strings.Fields(groups[1]) + for _, tag := range append(tags, strings.Fields(groups[2])...) { + if strings.HasPrefix(tag, "v") { + val, err := strconv.Atoi(strings.TrimPrefix(tag, "v")) + if err == nil { + ret.Version.Set(val) + continue + } + } + ret.Tags = append(ret.Tags, tag) + } + sort.Strings(ret.Tags) + if groups[3] != "" { + if val, err := strconv.Atoi(groups[3]); err == nil { + ret.Seq.Set(val) + } + } + if groups[4] != "" { + if val, err := strconv.Atoi(groups[4]); err == nil { + ret.Total.Set(val) + } + } + ret.Title = groups[5] + return ret, true +} type parseCtx struct { + maxDepth int threads []*Thread messages map[string]*email.Email next map[*email.Email][]*email.Email @@ -73,7 +199,7 @@ func (c *parseCtx) process() { } // Iterate starting from these tree nodes. for _, node := range nodes { - c.visit(node, nil) + c.visit(node, nil, 0) } // Collect BugIDs. for _, thread := range c.threads { @@ -92,7 +218,7 @@ func (c *parseCtx) process() { } } -func (c *parseCtx) visit(msg *email.Email, thread *Thread) { +func (c *parseCtx) visit(msg *email.Email, thread *Thread, depth int) { var oldInfo *email.OldThreadInfo if thread != nil { oldInfo = &email.OldThreadInfo{ @@ -114,7 +240,38 @@ func (c *parseCtx) visit(msg *email.Email, thread *Thread) { } c.threads = append(c.threads, thread) } - for _, nextMsg := range c.next[msg] { - c.visit(nextMsg, thread) + if c.maxDepth == 0 || depth < c.maxDepth { + for _, nextMsg := range c.next[msg] { + c.visit(nextMsg, thread, depth+1) + } } } + +type Optional[T any] struct { + val T + set bool +} + +func value[T any](val T) Optional[T] { + return Optional[T]{val: val, set: true} +} + +func (o Optional[T]) IsSet() bool { + return o.set +} + +func (o Optional[T]) Value() T { + return o.val +} + +func (o Optional[T]) ValueOr(def T) T { + if o.set { + return o.val + } + return def +} + +func (o *Optional[T]) Set(val T) { + o.val = val + o.set = true +} diff --git a/pkg/email/lore/parse_test.go b/pkg/email/lore/parse_test.go index 8349859ec9be..125b572c5bd8 100644 --- a/pkg/email/lore/parse_test.go +++ b/pkg/email/lore/parse_test.go @@ -4,6 +4,7 @@ package lore import ( + "fmt" "sort" "strings" "testing" @@ -12,6 +13,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/syzkaller/dashboard/dashapi" "github.com/google/syzkaller/pkg/email" + "github.com/stretchr/testify/assert" ) func TestThreadsCollection(t *testing.T) { @@ -242,6 +244,72 @@ Bug report`, } } +func TestParsePatchSubject(t *testing.T) { + tests := []struct { + subj string + ret PatchSubject + }{ + { + subj: `[PATCH] abcd`, + ret: PatchSubject{Title: "abcd"}, + }, + { + subj: `[PATCH 00/20] abcd`, + ret: PatchSubject{Title: "abcd", Seq: value[int](0), Total: value[int](20)}, + }, + { + subj: `[PATCH 5/6] abcd`, + ret: PatchSubject{Title: "abcd", Seq: value[int](5), Total: value[int](6)}, + }, + { + subj: `[PATCH RFC v3 0/4] abcd`, + ret: PatchSubject{ + Title: "abcd", + Tags: []string{"RFC"}, + Version: value[int](3), + Seq: value[int](0), + Total: value[int](4), + }, + }, + { + subj: `[RFC PATCH] abcd`, + ret: PatchSubject{Title: "abcd", Tags: []string{"RFC"}}, + }, + { + subj: `[PATCH net-next v2 00/21] abcd`, + ret: PatchSubject{ + Title: "abcd", + Tags: []string{"net-next"}, + Version: value[int](2), + Seq: value[int](0), + Total: value[int](21), + }, + }, + { + subj: `[PATCH v2 RESEND] abcd`, + ret: PatchSubject{Title: "abcd", Version: value[int](2), Tags: []string{"RESEND"}}, + }, + { + subj: `[PATCH RFC net-next v3 05/21] abcd`, + ret: PatchSubject{ + Title: "abcd", + Tags: []string{"RFC", "net-next"}, + Version: value[int](3), + Seq: value[int](5), + Total: value[int](21), + }, + }, + } + for id, test := range tests { + test := test + t.Run(fmt.Sprint(id), func(t *testing.T) { + ret, ok := parsePatchSubject(test.subj) + assert.True(t, ok) + assert.Equal(t, test.ret, ret) + }) + } +} + func TestDiscussionType(t *testing.T) { tests := []struct { msg *email.Email @@ -299,3 +367,121 @@ func TestDiscussionType(t *testing.T) { } } } + +func TestParseSeries(t *testing.T) { + messages := []string{ + // A simple patch series. + `Date: Sun, 7 May 2017 19:54:00 -0700 +Subject: [PATCH] Small patch +Message-ID: +From: UserA +Content-Type: text/plain + + +Some text`, + // A series with a cover. + `Date: Sun, 7 May 2017 19:55:00 -0700 +Subject: [PATCH v2 00/02] A longer series +Message-ID: +From: UserB +To: UserA +Content-Type: text/plain + +Some cover`, + `Date: Sun, 7 May 2017 19:56:00 -0700 +Subject: [PATCH v2 01/02] First patch +Message-ID: +From: UserC +To: UserA , UserB +Content-Type: text/plain +In-Reply-To: + + +Patch 1/2`, + `Date: Sun, 7 May 2017 19:56:00 -0700 +Subject: [PATCH v2 02/02] Second patch +Message-ID: +From: UserC +To: UserA , UserB +Content-Type: text/plain +In-Reply-To: + + +Patch 2/2`, + // Missing patches. + `Date: Sun, 7 May 2017 19:57:00 -0700 +Subject: [PATCH 01/03] Series +Message-ID: +From: Someone +Content-Type: text/plain + +Bug report`, + } + + emails := []*email.Email{} + for _, m := range messages { + msg, err := email.Parse(strings.NewReader(m), nil, nil, nil) + if err != nil { + t.Fatal(err) + } + emails = append(emails, msg) + } + + series := PatchSeries(emails) + assert.Len(t, series, 3) + + expectPerID := map[string]*Series{ + "": { + Subject: "Small patch", + Version: 1, + Patches: []Patch{ + { + Seq: 1, + Email: &email.Email{Subject: "[PATCH] Small patch"}, + }, + }, + }, + "": { + Subject: "A longer series", + Version: 2, + Patches: []Patch{ + { + Seq: 1, + Email: &email.Email{Subject: "[PATCH v2 01/02] First patch"}, + }, + { + Seq: 2, + Email: &email.Email{Subject: "[PATCH v2 02/02] Second patch"}, + }, + }, + }, + "": { + Subject: "Series", + Version: 1, + Corrupted: "the subject mentions 3 patches, 1 are found", + Patches: []Patch{ + { + Seq: 1, + Email: &email.Email{Subject: "[PATCH 01/03] Series"}, + }, + }, + }, + } + for _, s := range series { + expect := expectPerID[s.MessageID] + if expect == nil { + t.Fatalf("unexpected message: %q", s.MessageID) + } + expectPerID[s.MessageID] = nil + t.Run(s.MessageID, func(t *testing.T) { + assert.Equal(t, expect.Corrupted, s.Corrupted, "corrupted differs") + assert.Equal(t, expect.Subject, s.Subject, "subject differs") + assert.Equal(t, expect.Version, s.Version, "version differs") + assert.Len(t, s.Patches, len(expect.Patches), "patch count differs") + for i, expectPatch := range expect.Patches { + got := s.Patches[i] + assert.Equal(t, expectPatch.Seq, got.Seq, "seq differs") + } + }) + } +} diff --git a/pkg/email/lore/read.go b/pkg/email/lore/read.go index 7489f750f117..ef0a0faa0e33 100644 --- a/pkg/email/lore/read.go +++ b/pkg/email/lore/read.go @@ -4,30 +4,49 @@ package lore import ( + "bytes" "fmt" "time" + "github.com/google/syzkaller/pkg/email" "github.com/google/syzkaller/pkg/vcs" ) type EmailReader struct { - Extract func() ([]byte, error) + Read func() ([]byte, error) } // ReadArchive queries the parsed messages from a single LKML message archive. -func ReadArchive(dir string, messages chan<- *EmailReader) error { - repo := vcs.NewLKMLRepo(dir) - commits, err := repo.ListCommitHashes("HEAD", time.Time{}) +func ReadArchive(repo vcs.Repo, fromTime time.Time) ([]EmailReader, error) { + commits, err := repo.ListCommitHashes("HEAD", fromTime) if err != nil { - return fmt.Errorf("failed to get recent commits: %w", err) + return nil, fmt.Errorf("failed to get recent commits: %w", err) } + var ret []EmailReader for _, iterCommit := range commits { commit := iterCommit - messages <- &EmailReader{ - Extract: func() ([]byte, error) { + ret = append(ret, EmailReader{ + Read: func() ([]byte, error) { return repo.Object("m", commit) }, - } + }) } - return nil + return ret, nil +} + +func (er *EmailReader) Parse(emails, domains []string) (*email.Email, error) { + body, err := er.Read() + if err != nil { + return nil, err + } + msg, err := email.Parse(bytes.NewReader(body), emails, nil, domains) + if err != nil { + return nil, err + } + // Keep memory consumption low. + msg.Body = "" + msg.Patch = "" + // TODO: We definitely don't care about the patch here. Add an option to avoid extracting it? + // TODO: If emails/domains are nil, we also don't need to parse the body at all. + return msg, nil } diff --git a/tools/syz-lore/query_lkml.go b/tools/syz-lore/query_lkml.go index d6e20bf5a95a..ff1dbe2001fd 100644 --- a/tools/syz-lore/query_lkml.go +++ b/tools/syz-lore/query_lkml.go @@ -4,7 +4,6 @@ package main import ( - "bytes" "context" "encoding/json" "flag" @@ -14,6 +13,8 @@ import ( "runtime" "strings" "sync" + "sync/atomic" + "time" "github.com/google/syzkaller/dashboard/dashapi" "github.com/google/syzkaller/pkg/email" @@ -21,6 +22,7 @@ import ( "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/osutil" "github.com/google/syzkaller/pkg/tool" + "github.com/google/syzkaller/pkg/vcs" "golang.org/x/sync/errgroup" ) @@ -106,7 +108,7 @@ func processArchives(dir string, emails, domains []string) []*lore.Thread { tool.Failf("failed to read directory: %v", err) } threads := runtime.NumCPU() - messages := make(chan *lore.EmailReader, threads*2) + messages := make(chan lore.EmailReader, threads*2) wg := sync.WaitGroup{} g, _ := errgroup.WithContext(context.Background()) @@ -120,28 +122,32 @@ func processArchives(dir string, emails, domains []string) []*lore.Thread { wg.Add(1) g.Go(func() error { defer wg.Done() - return lore.ReadArchive(path, messages) + repo := vcs.NewLKMLRepo(path) + list, err := lore.ReadArchive(repo, time.Time{}) + if err != nil { + return err + } + for _, reader := range list { + messages <- reader + } + return nil }) } // Set up some worker threads. var repoEmails []*email.Email var mu sync.Mutex + var skipped atomic.Int64 for i := 0; i < threads; i++ { g.Go(func() error { for rawMsg := range messages { - body, err := rawMsg.Extract() - if err != nil { - continue - } - msg, err := email.Parse(bytes.NewReader(body), emails, nil, domains) + msg, err := rawMsg.Parse(emails, domains) if err != nil { + // There are many broken messages in LKML, + // no sense to print them all each time. + skipped.Add(1) continue } - // Keep memory consumption low. - msg.Body = "" - msg.Patch = "" - mu.Lock() repoEmails = append(repoEmails, msg) mu.Unlock() @@ -156,6 +162,9 @@ func processArchives(dir string, emails, domains []string) []*lore.Thread { if err := g.Wait(); err != nil { tool.Failf("%s", err) } + if cnt := skipped.Load(); cnt > 0 { + log.Printf("skipped %d messages because of parsing errors", cnt) + } list := lore.Threads(repoEmails) log.Printf("collected %d email threads", len(list))