Skip to content

Commit

Permalink
feat(parquet): Move footerOffset into FileMetaData (#217)
Browse files Browse the repository at this point in the history
### Rationale for this change

After looking into the code, I found that `footerOffset` is mainly used
in parsing the meta data of parquet file.

I think we can store it in `FileMetaData`, so we don't need to get
`footerOffset` again when we use `WithMetadata`.

### What changes are included in this PR?

Move `Reader.footerOffset` into `FileMetadata`

### Are these changes tested?


### Are there any user-facing changes?
  • Loading branch information
joechenrh authored Dec 13, 2024
1 parent 82be143 commit 244d47c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
29 changes: 13 additions & 16 deletions parquet/file/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Reader struct {
r parquet.ReaderAtSeeker
props *parquet.ReaderProperties
metadata *metadata.FileMetaData
footerOffset int64
fileDecryptor encryption.FileDecryptor

bufferPool sync.Pool
Expand Down Expand Up @@ -100,19 +99,11 @@ func OpenParquetFile(filename string, memoryMap bool, opts ...ReadOption) (*Read
// If no read properties are provided then the default ReaderProperties will be used. The WithMetadata
// option can be used to provide a FileMetaData object rather than reading the file metadata from the file.
func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, error) {
var err error
f := &Reader{r: r}
for _, o := range opts {
o(f)
}

if f.footerOffset <= 0 {
f.footerOffset, err = r.Seek(0, io.SeekEnd)
if err != nil {
return nil, fmt.Errorf("parquet: could not retrieve footer offset: %w", err)
}
}

if f.props == nil {
f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
}
Expand Down Expand Up @@ -156,13 +147,18 @@ func (f *Reader) MetaData() *metadata.FileMetaData { return f.metadata }

// parseMetaData handles parsing the metadata from the opened file.
func (f *Reader) parseMetaData() error {
if f.footerOffset <= int64(footerSize) {
return fmt.Errorf("parquet: file too small (size=%d)", f.footerOffset)
footerOffset, err := f.r.Seek(0, io.SeekEnd)
if err != nil {
return fmt.Errorf("parquet: could not retrieve footer offset: %w", err)
}

if footerOffset <= int64(footerSize) {
return fmt.Errorf("parquet: file too small (size=%d)", footerOffset)
}

buf := make([]byte, footerSize)
// backup 8 bytes to read the footer size (first four bytes) and the magic bytes (last 4 bytes)
n, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize))
n, err := f.r.ReadAt(buf, footerOffset-int64(footerSize))
if err != nil && err != io.EOF {
return fmt.Errorf("parquet: could not read footer: %w", err)
}
Expand All @@ -171,7 +167,7 @@ func (f *Reader) parseMetaData() error {
}

size := int64(binary.LittleEndian.Uint32(buf[:4]))
if size < 0 || size+int64(footerSize) > f.footerOffset {
if size < 0 || size+int64(footerSize) > footerOffset {
return errInconsistentFileMetadata
}

Expand All @@ -180,14 +176,15 @@ func (f *Reader) parseMetaData() error {
switch {
case bytes.Equal(buf[4:], magicBytes): // non-encrypted metadata
buf = make([]byte, size)
if _, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize)-size); err != nil {
if _, err := f.r.ReadAt(buf, footerOffset-int64(footerSize)-size); err != nil {
return fmt.Errorf("parquet: could not read footer: %w", err)
}

f.metadata, err = metadata.NewFileMetaData(buf, nil)
if err != nil {
return fmt.Errorf("parquet: could not read footer: %w", err)
}
f.metadata.SetSourceFileSize(footerOffset)

if !f.metadata.IsSetEncryptionAlgorithm() {
if fileDecryptProps != nil && !fileDecryptProps.PlaintextFilesAllowed() {
Expand All @@ -200,7 +197,7 @@ func (f *Reader) parseMetaData() error {
}
case bytes.Equal(buf[4:], magicEBytes): // encrypted metadata
buf = make([]byte, size)
if _, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize)-size); err != nil {
if _, err := f.r.ReadAt(buf, footerOffset-int64(footerSize)-size); err != nil {
return fmt.Errorf("parquet: could not read footer: %w", err)
}

Expand All @@ -223,6 +220,7 @@ func (f *Reader) parseMetaData() error {
if err != nil {
return fmt.Errorf("parquet: could not read footer: %w", err)
}
f.metadata.SetSourceFileSize(footerOffset)
default:
return fmt.Errorf("parquet: magic bytes not found in footer. Either the file is corrupted or this isn't a parquet file")
}
Expand Down Expand Up @@ -310,7 +308,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
rgMetadata: metadata.NewRowGroupMetaData(rg, f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
props: f.props,
r: f.r,
sourceSz: f.footerOffset,
fileDecryptor: f.fileDecryptor,
bufferPool: &f.bufferPool,
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/file/row_group_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
// RowGroupReader is the primary interface for reading a single row group
type RowGroupReader struct {
r parquet.ReaderAtSeeker
sourceSz int64
fileMetadata *metadata.FileMetaData
rgMetadata *metadata.RowGroupMetaData
props *parquet.ReaderProperties
Expand Down Expand Up @@ -85,16 +84,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
colLen := col.TotalCompressedSize()
// PARQUET-816 workaround for old files created by older parquet-mr
if r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
sourceSz := r.fileMetadata.GetSourceFileSize()
// The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
// dictionary page header size in total_compressed_size and total_uncompressed_size
// (see IMPALA-694). We add padding to compensate.
if colStart < 0 || colLen < 0 {
return nil, fmt.Errorf("invalid column chunk metadata, offset (%d) and length (%d) should both be positive", colStart, colLen)
}
if colStart > r.sourceSz || colLen > r.sourceSz {
return nil, fmt.Errorf("invalid column chunk metadata, offset (%d) and length (%d) must both be less than total source size (%d)", colStart, colLen, r.sourceSz)
if colStart > sourceSz || colLen > sourceSz {
return nil, fmt.Errorf("invalid column chunk metadata, offset (%d) and length (%d) must both be less than total source size (%d)", colStart, colLen, sourceSz)
}
bytesRemain := r.sourceSz - (colStart + colLen)
bytesRemain := sourceSz - (colStart + colLen)
padding := utils.Min(maxDictHeaderSize, bytesRemain)
colLen += padding
}
Expand Down
11 changes: 11 additions & 0 deletions parquet/metadata/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ type FileMetaData struct {
// size of the raw bytes of the metadata in the file which were
// decoded by thrift, Size() getter returns the value.
metadataLen int

// sourceFileSize is not a part of FileMetaData, but it is mainly used to parse meta data.
// Users can manually set this value and they are responsible for the validity of it.
sourceFileSize int64
}

// NewFileMetaData takes in the raw bytes of the serialized metadata to deserialize
Expand Down Expand Up @@ -275,6 +279,12 @@ func NewFileMetaData(data []byte, fileDecryptor encryption.FileDecryptor) (*File
// Size is the length of the raw serialized metadata bytes in the footer
func (f *FileMetaData) Size() int { return f.metadataLen }

// GetSourceFileSize get the total size of the source file from meta data.
func (f *FileMetaData) GetSourceFileSize() int64 { return f.sourceFileSize }

// SetSourceFileSize set the total size of the source file in meta data.
func (f *FileMetaData) SetSourceFileSize(sourceFileSize int64) { f.sourceFileSize = sourceFileSize }

// NumSchemaElements is the length of the flattened schema list in the thrift
func (f *FileMetaData) NumSchemaElements() int {
return len(f.FileMetaData.Schema)
Expand Down Expand Up @@ -388,6 +398,7 @@ func (f *FileMetaData) Subset(rowGroups []int) (*FileMetaData, error) {
f.FileDecryptor,
f.version,
0,
f.sourceFileSize,
}

out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups))
Expand Down

0 comments on commit 244d47c

Please sign in to comment.