Skip to content

Commit

Permalink
Include metadata in mcap merge (#958)
Browse files Browse the repository at this point in the history
Includes metadata records from input files in mcap merge via a new read
option. This required a breaking change to read options to avoid a
dependency cycle: since I need to supply a callback option to apply to
metadata records, the readopts package required awareness of "mcap"
while "mcap" required awareness of readopts for configuration.

To address this I have moved readopts.go under the mcap package. Users
who upgrade the library will need to swap out the package name if they
are using any options.
  • Loading branch information
Wyatt Alt authored Nov 2, 2023
1 parent 4ab424e commit a164ec1
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 159 deletions.
9 changes: 4 additions & 5 deletions go/cli/mcap/cmd/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/foxglove/mcap/go/cli/mcap/utils"
"github.com/foxglove/mcap/go/cli/mcap/utils/ros"
"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/spf13/cobra"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -165,14 +164,14 @@ func (w *jsonOutputWriter) writeMessage(
return nil
}

func getReadOpts(useIndex bool) []readopts.ReadOpt {
func getReadOpts(useIndex bool) []mcap.ReadOpt {
topics := strings.FieldsFunc(catTopics, func(c rune) bool { return c == ',' })
opts := []readopts.ReadOpt{readopts.UsingIndex(useIndex), readopts.WithTopics(topics)}
opts := []mcap.ReadOpt{mcap.UsingIndex(useIndex), mcap.WithTopics(topics)}
if catStart != 0 {
opts = append(opts, readopts.After(catStart*1e9))
opts = append(opts, mcap.After(catStart*1e9))
}
if catEnd != math.MaxInt64 {
opts = append(opts, readopts.Before(catEnd*1e9))
opts = append(opts, mcap.Before(catEnd*1e9))
}
return opts
}
Expand Down
130 changes: 92 additions & 38 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,46 @@ import (
"container/heap"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"

"github.com/foxglove/mcap/go/cli/mcap/utils"
"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/spf13/cobra"
)

type ErrDuplicateMetadataName struct {
Name string
}

func (e ErrDuplicateMetadataName) Is(target error) bool {
_, ok := target.(*ErrDuplicateMetadataName)
return ok
}

func (e *ErrDuplicateMetadataName) Error() string {
return fmt.Sprintf("metadata name '%s' was previously encountered. "+
"Supply --allow-duplicate-metadata to override.", e.Name)
}

var (
mergeCompression string
mergeChunkSize int64
mergeIncludeCRC bool
mergeChunked bool
mergeOutputFile string
mergeCompression string
mergeChunkSize int64
mergeIncludeCRC bool
mergeChunked bool
mergeOutputFile string
mergeAllowDuplicateMetadata bool
)

type mergeOpts struct {
compression string
chunkSize int64
includeCRC bool
chunked bool
compression string
chunkSize int64
includeCRC bool
chunked bool
allowDuplicateMetadata bool
}

// schemaID uniquely identifies a schema across the inputs.
Expand All @@ -46,17 +62,20 @@ type mcapMerger struct {
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16

nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
metadataHashes map[string]bool
metadataNames map[string]bool
nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
}

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
metadataHashes: make(map[string]bool),
metadataNames: make(map[string]bool),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
Expand All @@ -82,6 +101,37 @@ func (m *mcapMerger) outputSchemaID(inputID int, inputSchemaID uint16) (uint16,
return v, ok
}

func hashMetadata(metadata *mcap.Metadata) (string, error) {
hasher := md5.New()
hasher.Write([]byte(metadata.Name))
bytes, err := json.Marshal(metadata.Metadata)
if err != nil {
return "", err
}
hasher.Write(bytes)
hash := hasher.Sum(nil)
return hex.EncodeToString(hash), nil
}

func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error {
if m.metadataNames[metadata.Name] && !m.opts.allowDuplicateMetadata {
return &ErrDuplicateMetadataName{Name: metadata.Name}
}
hash, err := hashMetadata(metadata)
if err != nil {
return fmt.Errorf("failed to compute metadata hash: %w", err)
}
if !m.metadataHashes[hash] {
err := w.WriteMetadata(metadata)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
}
m.metadataHashes[hash] = true
m.metadataNames[metadata.Name] = true
}
return nil
}

func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) {
outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID)
if !ok {
Expand Down Expand Up @@ -161,9 +211,6 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
if err != nil {
return fmt.Errorf("failed to create writer: %w", err)
}
if err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

iterators := make([]mcap.MessageIterator, len(inputs))
profiles := make([]string, len(inputs))
Expand All @@ -181,23 +228,22 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
// renumbered IDs, and load the message (with renumbered IDs) into the
// priority queue.
for inputID, input := range inputs {
err := func() error {
reader, err := mcap.NewReader(input.reader)
if err != nil {
return fmt.Errorf("failed to open reader on %s: %w", input.name, err)
}
defer reader.Close()
profiles[inputID] = reader.Header().Profile
iterator, err := reader.Messages(readopts.UsingIndex(false))
if err != nil {
return fmt.Errorf("failed to read messages on %s: %w", input.name, err)
}
iterators[inputID] = iterator
return nil
}()
reader, err := mcap.NewReader(input.reader)
if err != nil {
return fmt.Errorf("failed to open reader on %s: %w", input.name, err)
}
defer reader.Close() //nolint:gocritic // we actually want these defered in the loop.
profiles[inputID] = reader.Header().Profile
opts := []mcap.ReadOpt{
mcap.UsingIndex(false),
mcap.WithMetadataCallback(func(metadata *mcap.Metadata) error {
return m.addMetadata(writer, metadata)
})}
iterator, err := reader.Messages(opts...)
if err != nil {
return err
}
iterators[inputID] = iterator
}
if err := writer.WriteHeader(&mcap.Header{Profile: outputProfile(profiles)}); err != nil {
return err
Expand All @@ -210,7 +256,7 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
// the file may be an empty mcap. if so, just ignore it.
continue
}
return fmt.Errorf("failed to read first message on input %s: %w", inputName, err)
return fmt.Errorf("error on input %s: %w", inputName, err)
}
if schema != nil {
err = m.addSchema(writer, inputID, schema)
Expand Down Expand Up @@ -250,7 +296,7 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
// will break.
continue
}
return fmt.Errorf("failed to pull next message on %s: %w", inputs[msg.InputID].name, err)
return fmt.Errorf("error on input on %s: %w", inputs[msg.InputID].name, err)
}

// if the channel is unknown, need to add it to the output
Expand Down Expand Up @@ -300,10 +346,11 @@ var mergeCmd = &cobra.Command{
readers = append(readers, namedReader{name: arg, reader: f})
}
opts := mergeOpts{
compression: mergeCompression,
chunkSize: mergeChunkSize,
includeCRC: mergeIncludeCRC,
chunked: mergeChunked,
compression: mergeCompression,
chunkSize: mergeChunkSize,
includeCRC: mergeIncludeCRC,
chunked: mergeChunked,
allowDuplicateMetadata: mergeAllowDuplicateMetadata,
}
merger := newMCAPMerger(opts)
var writer io.Writer
Expand All @@ -319,7 +366,7 @@ var mergeCmd = &cobra.Command{
}
err := merger.mergeInputs(writer, readers)
if err != nil {
die(err.Error())
die("Merge failure: " + err.Error())
}
},
}
Expand Down Expand Up @@ -361,4 +408,11 @@ func init() {
true,
"chunk the output file",
)
mergeCmd.PersistentFlags().BoolVarP(
&mergeAllowDuplicateMetadata,
"allow-duplicate-metadata",
"",
false,
"Allow duplicate-named metadata records to be merged in the output",
)
}
Loading

0 comments on commit a164ec1

Please sign in to comment.