Skip to content

Commit 56ee609

Browse files
committed
Expand nest field to parquet path
1 parent 2964879 commit 56ee609

File tree

2 files changed

+93
-29
lines changed

2 files changed

+93
-29
lines changed

engine/engine.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,12 @@ func (p *ParquetEngine) executeSelect(stmt *parser.Select) (*RecordSet, error) {
9696
case *parser.ColExpr:
9797
cols = append(cols, field.(*parser.ColExpr).Name)
9898
case *parser.NestColExpr:
99-
fmt.Println(field.(*parser.NestColExpr).Subs)
99+
subs :=field.(*parser.NestColExpr).Subs
100+
_p, err := p.schema.GetFieldPath(subs)
101+
if err != nil {
102+
return nil, err
103+
}
104+
cols = append(cols, _p)
100105
}
101106
}
102107
filterCols := make([]string, 0)
@@ -111,7 +116,7 @@ func (p *ParquetEngine) executeSelect(stmt *parser.Select) (*RecordSet, error) {
111116
if stmt.Limit != nil {
112117
limit = stmt.Limit.Rowcount
113118
}
114-
cr, err := p.GetColumnReader()
119+
cr, err := p.GetReader()
115120
if err != nil {
116121
return nil, err
117122
}
@@ -162,15 +167,15 @@ func filter(expr parser.Expr, result *RecordSet) (*RecordSet, error) {
162167
}
163168

164169
func (p *ParquetEngine) GetTotalRowCount() (int64, error) {
165-
cr, err := p.GetColumnReader()
170+
cr, err := p.GetReader()
166171
if err != nil {
167172
return 0, err
168173
}
169174
return cr.GetNumRows(), nil
170175
}
171176

172-
func (p *ParquetEngine) GetColumnReader() (*reader.ParquetReader, error) {
173-
return reader.NewParquetColumnReader(p.fr, 2)
177+
func (p *ParquetEngine) GetReader() (*reader.ParquetReader, error) {
178+
return reader.NewParquetReader(p.fr,nil, 2)
174179
}
175180

176181
func (p *ParquetEngine) FetchRows(cr *reader.ParquetReader, cols []string, limit int) (result *RecordSet, err error) {

engine/schema.go

+83-24
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,16 @@ package engine
33
import (
44
"fmt"
55
"github.com/xitongsys/parquet-go/parquet"
6+
"strconv"
67
"strings"
78
)
89

10+
const (
11+
TYPE_STRUCT = "STRUCT"
12+
TYPE_LIST = "LIST"
13+
TYPE_MAP = "MAP"
14+
)
15+
916
type Schema interface {
1017
}
1118

@@ -18,32 +25,21 @@ type Field struct {
1825
}
1926

2027
func (f *Field) String() string {
21-
if len(f.Fields) == 0 {
22-
if f.SE.ConvertedType != nil {
23-
return f.SE.ConvertedType.String()
24-
}
25-
if f.SE.Type != nil {
26-
return f.SE.Type.String()
27-
}
28-
}
29-
if f.SE.ConvertedType == nil {
28+
switch f.Type {
29+
case TYPE_LIST:
30+
// LIST.ELEMENT
31+
return fmt.Sprintf("array<%s>", f.Fields[0].Fields[0])
32+
case TYPE_MAP:
33+
// map<KEY_VALUE.KEY, KEY_VALUE.VALUE>
34+
return fmt.Sprintf("map<%s,%s>", f.Fields[0].Fields[0], f.Fields[0].Fields[1])
35+
case TYPE_STRUCT:
3036
fields := make([]string, 0, len(f.Fields))
3137
for _, _f := range f.Fields {
3238
fields = append(fields, fmt.Sprintf("%s:%s", _f.Name, _f))
3339
}
3440
return fmt.Sprintf("struct<%s>", strings.Join(fields, ","))
35-
}
36-
switch *f.SE.ConvertedType {
37-
case parquet.ConvertedType_LIST:
38-
// LIST.ELEMENT
39-
return fmt.Sprintf("array<%s>", f.Fields[0].Fields[0])
40-
case parquet.ConvertedType_MAP:
41-
// map<KEY_VALUE.KEY, KEY_VALUE.VALUE>
42-
return fmt.Sprintf("map<%s,%s>", f.Fields[0].Fields[0], f.Fields[0].Fields[1])
43-
case parquet.ConvertedType_JSON:
44-
return "json"
4541
default:
46-
return f.SE.ConvertedType.String()
42+
return f.Type
4743
}
4844
}
4945

@@ -53,6 +49,47 @@ type ParquetSchema struct {
5349
Fields []*Field
5450
}
5551

52+
func (s *ParquetSchema) GetFieldPath(subs []string) (string, error) {
53+
paths := make([]string, 0)
54+
f := s.MFields[subs[0]]
55+
paths = append(paths, subs[0])
56+
subs = subs[1:]
57+
for len(subs) > 0 {
58+
sub := subs[0]
59+
idx, err := strconv.Atoi(sub)
60+
if err != nil {
61+
switch f.Type{
62+
case TYPE_MAP:
63+
paths = append(paths, []string{"Key_value", "Value"}...)
64+
f = f.Fields[0].Fields[1]
65+
case TYPE_STRUCT:
66+
found := false
67+
for _, _f := range f.Fields{
68+
if _f.Name == sub {
69+
found = true
70+
paths = append(paths, _f.Name)
71+
f = _f
72+
break
73+
}
74+
}
75+
if !found {
76+
return "", fmt.Errorf("can't find %s on %s:%s", sub, f.Name, f)
77+
}
78+
default:
79+
return "", fmt.Errorf("unsuport to get `%s` on %s:%s", sub, f.Name, f)
80+
}
81+
} else {
82+
if f.Type != TYPE_LIST {
83+
return "", fmt.Errorf("unsupport to retrive index `%d` on %s:%s", idx, f.Name, f)
84+
}
85+
paths = append(paths, "List", "Element")
86+
f = f.Fields[0].Fields[0]
87+
}
88+
subs = subs[1:]
89+
}
90+
return strings.Join(paths, "."), nil
91+
}
92+
5693
func (s *ParquetSchema) GetName() string {
5794
return s.Name
5895
}
@@ -86,11 +123,33 @@ func NewParquetSchema(schemas []*parquet.SchemaElement) *ParquetSchema {
86123
return &ParquetSchema{Name: root.Name, MFields: root.MFields, Fields: root.Fields}
87124
}
88125

89-
func NewField(schema *parquet.SchemaElement) *Field {
126+
func NewField(se *parquet.SchemaElement) *Field {
127+
TYPE := ""
128+
if se.GetNumChildren() == 0 {
129+
if se.ConvertedType != nil {
130+
TYPE = se.ConvertedType.String()
131+
} else if se.Type != nil {
132+
TYPE = se.Type.String()
133+
}
134+
} else {
135+
if se.ConvertedType == nil {
136+
TYPE = TYPE_STRUCT
137+
} else {
138+
switch *se.ConvertedType {
139+
case parquet.ConvertedType_MAP:
140+
TYPE = TYPE_MAP
141+
case parquet.ConvertedType_LIST:
142+
TYPE = TYPE_LIST
143+
default:
144+
TYPE = se.ConvertedType.String()
145+
}
146+
}
147+
}
90148
return &Field{
91-
Name: schema.Name,
92-
SE: schema,
149+
Name: se.Name,
150+
SE: se,
151+
Type: TYPE,
93152
MFields: make(map[string]*Field),
94-
Fields: make([]*Field, 0, schema.GetNumChildren()),
153+
Fields: make([]*Field, 0, se.GetNumChildren()),
95154
}
96155
}

0 commit comments

Comments
 (0)