Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet support #600

Open
sumit-arora-greyorange opened this issue Jan 5, 2025 · 1 comment
Open

Parquet support #600

sumit-arora-greyorange opened this issue Jan 5, 2025 · 1 comment

Comments

@sumit-arora-greyorange
Copy link

Are there any plans or is there any interest in adding support for indexing directly from parquet files?

In my current project, my corpus is provisioned as parquet files, I'm trying to implement building and updating indices from these as the source. Is there any prior work, interest or plans to support this directly in the SDK? If not, would this make for a useful contribution?

From a cursory look, this seems relatively straightforward to add e.g.

// index_document.go

type DocumentTransform func(record arrow.Record, index int) (map[string]interface{}, error)

func (i *index) AddDocumentsParquetFromReaderInBatchesWithContext(ctx context.Context, objReader parquet.ReaderAtSeeker, batchSize int, transform DocumentTransform, primaryKey ...string) (resp []TaskInfo, err error) {

	sendParquetBatch := func(rows []map[string]interface{}) (*TaskInfo, error) {
		resp, err := i.AddDocumentsWithContext(ctx, rows, primaryKey...)
		if err != nil {
			return nil, err
		}
		return resp, nil
	}

	var (
		responses []*TaskInfo
		batch []map[string]interface{}
	)

	pool := memory.NewGoAllocator()
	defer pool.Close()

	readerProps := parquet.NewReaderProperties(pool)
	readerProps.BufferedStreamEnabled = true

	//  Parequet Reader
	parquetReader, err := file.NewParquetReader(objReader, file.WithReadProps(readerProps))
	if err != nil {
		return nil, err
	}
	defer parquetReader.Close()

	// Arrow Reader
	arrowReader, err := pqarrow.NewFileReader(parquetReader, pqarrow.ArrowReadProperties{
		Parallel: true,
		BatchSize: rowGroupSize,
	}, pool)
	if err != nil {
		return nil, err
	}
	
	// Record Reader
	recordReader := arrowReader.NewRecordReader(ctx, nil, nil)
	if err != nil {
		return nil, err
	}
	defer recordReader.Release()

	for recordReader.Next(){
		record := arrowReader.Record()
		defer record.Release()

		for i := 0; i < record.NumRows(); i++ {
			document, err := transform(record, i)
			if err != nil {
				// handle error
				continue
			}
			batch = append(batch, document)
			if len(batch) >= batchSize {
				resp, err = sendParquetBatch(batch)
				if err != nil {
					return nil, err
				}
				batch = nil
			}
		}
	}
	if len(batch) > 0 {
		resp, err = sendParquetBatch(batch)
		if err != nil {
			return nil, err
		}
		
	}

	return responses, nil

}

There could also be a default transform that works on simple flat schemas and skips nested fields etc.

I understand that this will add the pqarrow dependency even if not being used but that can probably be solved by moving this to a subpackage (probably in contrib) and / or build tags.

Any advice / input is appreciated.

@Ja7ad
Copy link
Collaborator

Ja7ad commented Jan 5, 2025

Are there any plans or is there any interest in adding support for indexing directly from parquet files?

In my current project, my corpus is provisioned as parquet files, I'm trying to implement building and updating indices from these as the source. Is there any prior work, interest or plans to support this directly in the SDK? If not, would this make for a useful contribution?

From a cursory look, this seems relatively straightforward to add e.g.

// index_document.go

type DocumentTransform func(record arrow.Record, index int) (map[string]interface{}, error)

func (i *index) AddDocumentsParquetFromReaderInBatchesWithContext(ctx context.Context, objReader parquet.ReaderAtSeeker, batchSize int, transform DocumentTransform, primaryKey ...string) (resp []TaskInfo, err error) {

...

}

There could also be a default transform that works on simple flat schemas and skips nested fields etc.

I understand that this will add the pqarrow dependency even if not being used but that can probably be solved by moving this to a subpackage (probably in contrib) and / or build tags.

Any advice / input is appreciated.

Unfortunately, adding another dependency will only temporarily solve your problem. It will also cause problems for others. We tried to develop the SDK with the fewest dependencies to minimize interference with other packages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants