Skip to content

Commit

Permalink
go: fix channel auto coalescing in merge
Browse files Browse the repository at this point in the history
  • Loading branch information
bradsquicciarini-coco committed Jan 26, 2025
1 parent fb2b5f2 commit 68ae983
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
12 changes: 10 additions & 2 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"os"
"slices"

"github.com/foxglove/mcap/go/cli/mcap/utils"
"github.com/foxglove/mcap/go/mcap"
Expand Down Expand Up @@ -157,9 +158,16 @@ func getChannelHash(channel *mcap.Channel, coalesceChannels string) HashSum {

switch coalesceChannels {
case AutoCoalescing: // Include channel metadata in hash
for key, value := range channel.Metadata {
// sort keys so we get same metadata order in the hash
keys := make([]string, 0, len(channel.Metadata))
for key := range channel.Metadata {
keys = append(keys, key)
}
slices.Sort(keys)

for _, key := range keys {
hasher.Write([]byte(key))
hasher.Write([]byte(value))
hasher.Write([]byte(channel.Metadata[key]))
}
case ForceCoalescing: // Channel metadata is not included in hash
break
Expand Down
13 changes: 8 additions & 5 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,26 +537,29 @@ func TestSameSchemasNotDuplicated(t *testing.T) {

func TestChannelCoalesceBehavior(t *testing.T) {
expectedMsgCountByChannel := map[string]map[uint16]int{
"none": {1: 100, 2: 100, 3: 100, 4: 100},
"auto": {1: 200, 2: 100, 3: 100},
"force": {1: 300, 2: 100},
"none": {1: 100, 2: 100, 3: 100, 4: 100, 5: 100},
"auto": {1: 200, 2: 200, 3: 100},
"force": {1: 400, 2: 100},
}

for coalesceChannels, messagesByChannel := range expectedMsgCountByChannel {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
buf4 := &bytes.Buffer{}
buf5 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 2, Topic: "/foo"})
prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 3, Topic: "/foo", Metadata: map[string]string{"k": "v"}})
prepInput(t, buf4, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 4, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 3, Topic: "/foo", Metadata: map[string]string{"k0": "v0", "k1": "v1", "k2": "v2"}})
prepInput(t, buf4, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 4, Topic: "/foo", Metadata: map[string]string{"k0": "v0", "k1": "v1", "k2": "v2"}})
prepInput(t, buf5, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 5, Topic: "/bar"})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf1", bytes.NewReader(buf1.Bytes())},
{"buf2", bytes.NewReader(buf2.Bytes())},
{"buf3", bytes.NewReader(buf3.Bytes())},
{"buf4", bytes.NewReader(buf4.Bytes())},
{"buf5", bytes.NewReader(buf5.Bytes())},
}
merger := newMCAPMerger(mergeOpts{coalesceChannels: coalesceChannels, allowDuplicateMetadata: true})
require.NoError(t, merger.mergeInputs(output, inputs))
Expand Down

0 comments on commit 68ae983

Please sign in to comment.