diff --git a/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go b/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go index 3dc78d312e0..7e9f7a131e9 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go +++ b/storage/ndb/rest-server/rest-api-server/internal/dal/operations_feature_store.go @@ -271,16 +271,22 @@ type FeatureGroupAvroSchema struct { Fields []AvroField `json:"fields"` } -func (c *FeatureGroupAvroSchema) GetSchemaByFeatureName(featureName string) (json.RawMessage, error) { - for _, field := range c.Fields { - if field.Name == featureName { - return field.Type, nil - } +// Unlike FeatureGroupAvroSchema which contains Avro schema containing all features in the FG +// this structure stores Avro schema per feature +type PerFeatureAvroSchema struct { + Schemas map[string]string +} + +func (c *PerFeatureAvroSchema) GetSchemaByFeatureName(featureName string) (string, error) { + schema, ok := c.Schemas[featureName] + if ok { + return schema, nil + } else { + return "", fmt.Errorf("Cannot find schema for feature %s", featureName) } - return nil, fmt.Errorf("Cannot find schema for feature %s", featureName) } -func GetFeatureGroupAvroSchema(fgName string, fgVersion int, projectId int) (*FeatureGroupAvroSchema, error) { +func GetFeatureGroupAvroSchema(fgName string, fgVersion int, projectId int) (*PerFeatureAvroSchema, error) { subjectName := fmt.Sprintf("%s_%d", fgName, fgVersion) log.Debugf("subject name is: %s", subjectName) cSubjectName := C.CString(subjectName) @@ -312,5 +318,25 @@ func GetFeatureGroupAvroSchema(fgName string, fgVersion int, projectId int) (*Fe if err != nil { return nil, err } - return &avroSchema, nil + + // FeatureGroupAvroSchema contain all features in on avro schema. + // we have to create sepate Avro schemas (PerFeatureAvroSchema) for + //the fields as each field is deserialized independently. + // And not all features are encoded using avro + var perFeatureAvroSchema PerFeatureAvroSchema + perFeatureAvroSchema.Schemas = make(map[string]string) + for _, field := range avroSchema.Fields { + var schema FeatureGroupAvroSchema // only populate one field + schema.Name = avroSchema.Name + schema.Namespace = avroSchema.Namespace + schema.Type = avroSchema.Type + schema.Fields = []AvroField{field} + schemaBytes, err := json.Marshal(schema) + if err != nil { + return nil, err + } + perFeatureAvroSchema.Schemas[field.Name] = string(schemaBytes) + } + + return &perFeatureAvroSchema, nil } diff --git a/storage/ndb/rest-server/rest-api-server/internal/feature_store/metadata.go b/storage/ndb/rest-server/rest-api-server/internal/feature_store/metadata.go index 67a399bd797..9fb25209c25 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/feature_store/metadata.go +++ b/storage/ndb/rest-server/rest-api-server/internal/feature_store/metadata.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "sort" "strings" "time" @@ -37,6 +38,11 @@ var CleanupInterval time.Duration = 15 * time.Minute const ERROR_NOT_FOUND = "Not Found" +type ComplexFeature struct { + Schema *avro.Schema + Struct *reflect.Type +} + type FeatureViewMetadata struct { FeatureStoreName string FeatureStoreId int @@ -56,7 +62,7 @@ type FeatureViewMetadata struct { PrefixJoinKeyMap map[string][]string // key: serving-key-prefix + fName, value: list of feature which join on the key. Used for filling in pk value. JoinKeyMap map[string][]string // key: fName, value: list of feature which join on the key. Used for filling in pk value. RequiredJoinKeyMap map[string][]string // key: serving-key-prefix + fName, value: list of feature which join on the key. Used for filling in pk value. - ComplexFeatures map[string]*avro.Schema // key: joinIndex + fgId + fName, label are excluded. joinIndex is needed because of self-join + ComplexFeatures map[string]*ComplexFeature // key: joinIndex + fgId + fName, label are excluded. joinIndex is needed because of self-join } type FeatureGroupFeatures struct { @@ -184,8 +190,8 @@ func newFeatureViewMetadata( featureCount++ } - var complexFeatures = make(map[string]*avro.Schema) - var fgSchemaCache = make(map[int]*dal.FeatureGroupAvroSchema) + var complexFeatures = make(map[string]*ComplexFeature) + var fgSchemaCache = make(map[int]*dal.PerFeatureAvroSchema) for _, fgFeature := range fgFeaturesArray { for _, feature := range fgFeature.Features { if (*feature).IsComplex() { @@ -209,12 +215,17 @@ func newFeatureViewMetadata( if err != nil { return nil, errors.New("Failed to get feature schema for feature: " + feature.Name) } - schema, err := avro.Parse(string(schemaStr)) + avroSchema, err := avro.Parse(string(schemaStr)) if err != nil { return nil, errors.New("Failed to parse feature schema.") } + avroStruct, err := ConvertAvroSchemaToStruct(avroSchema) + if err != nil { + return nil, errors.New("Failed to parse avro schema.") + } + featureIndexKey := GetFeatureIndexKeyByFeature(feature) - complexFeatures[featureIndexKey] = &schema + complexFeatures[featureIndexKey] = &ComplexFeature{Schema: &avroSchema, Struct: &avroStruct} } } @@ -424,3 +435,208 @@ func GetFeatureViewMetadata(featureStoreName, featureViewName string, featureVie } return featureViewMetadata, nil } + +// parser +func ConvertAvroSchemaToStruct(schema avro.Schema) (reflect.Type, error) { + parserlog(fmt.Sprintf("-----------------------------------------------\n")) + parserlog(fmt.Sprintf("Called %v\n", schema)) + + switch schema.Type() { + case avro.Record: + { + parserlog(fmt.Sprintf("avro.Record %T\n", schema)) + switch schema.(type) { + case *avro.PrimitiveSchema: + { + return nil, errors.New("*avro.PrimitiveSchema ***IMPLEMENT ME***\n") + } + case *avro.RecordSchema: + { + parserlog(fmt.Sprintf("*avro.RecordSchema \n")) + rs := schema.(*avro.RecordSchema) + var fields []reflect.StructField + for _, field := range rs.Fields() { + parserlog(fmt.Sprintf("Name: %s, Type: %T\n", field.Name(), field)) + ret, err := ConvertAvroSchemaToStruct(field.Type()) + if err != nil { + return nil, err + } else { + fields = append(fields, + reflect.StructField{Name: capitalizeMember(field.Name()), + Type: ret, + Tag: reflect.StructTag(fmt.Sprintf(`avro:"%s"`, field.Name()))}) + } + } + record := reflect.StructOf(fields) + parserlog(fmt.Sprintf("RETURNING RecordSchema, Record %v\n", record)) + return record, nil + + } + case *avro.UnionSchema: + { + return nil, errors.New("**avro.EnumSchema ***IMPLEMENT ME***\n") + } + case *avro.EnumSchema: + { + return nil, errors.New("*avro.EnumSchema ***IMPLEMENT ME***\n") + } + case *avro.ArraySchema: + { + return nil, errors.New("*avro.ArraySchema ***IMPLEMENT ME***\n") + } + case *avro.MapSchema: + { + return nil, errors.New("*avro.MapSchema ***IMPLEMENT ME***\n") + } + case *avro.FixedSchema: + { + return nil, errors.New("*avro.FixedSchema ***IMPLEMENT ME***\n") + } + case *avro.NullSchema: + { + return nil, errors.New("*avro.NullSchema ***IMPLEMENT ME***\n") + } + case *avro.RefSchema: + { + return nil, errors.New("*avro.RefSchema ***IMPLEMENT ME***\n") + } + //case *avro.PrimitiveLogicalSchema: + //case *avro.DecimalLogicalSchema: + default: + { + return nil, errors.New("Unsupported Crap ***IMPLEMENT ME*** \n") + } + } + } + case avro.Error: + { + return nil, errors.New("*avro.Error ***IMPLEMENT ME***\n") + } + case avro.Ref: + { + return nil, errors.New("*avro.Ref ***IMPLEMENT ME***\n") + } + case avro.Enum: + { + return nil, errors.New("*avro.Enum ***IMPLEMENT ME***\n") + } + case avro.Array: + { + parserlog(fmt.Sprintf("avro.Array %T\n", schema)) + as := schema.(*avro.ArraySchema) + items := as.Items() + ret, err := ConvertAvroSchemaToStruct(items) + if err != nil { + return nil, err + } else { + parserlog(fmt.Sprintf("RETURNING Array of %v\n", ret)) + return reflect.SliceOf(ret), nil + } + } + case avro.Map: + { + return nil, errors.New("*avro.Map ***IMPLEMENT ME***\n") + } + case avro.Union: + { + parserlog(fmt.Sprintf("avro.Union %T\n", schema)) + us := schema.(*avro.UnionSchema) + + if len(us.Types()) != 2 { + parserlog(fmt.Sprintf("Case error\n")) + return nil, errors.New("Invalid Union") + } + + var toConvert avro.Schema + if isAvroNullType(us.Types()[0]) { + toConvert = us.Types()[1] + } else { + toConvert = us.Types()[0] + } + + ret, err := ConvertAvroSchemaToStruct(toConvert) + if err != nil { + return nil, err + } else { + retPtr := reflect.PointerTo(ret) + parserlog(fmt.Sprintf("RETURNING UnionSchema %v\n", retPtr)) + return retPtr, nil + } + } + case avro.Fixed: + { + return nil, errors.New("*avro.Fixed ***IMPLEMENT ME***\n") + } + case avro.String: + { + parserlog(fmt.Sprintf("avro.String %T\n", schema)) + return reflect.TypeOf(""), nil + } + case avro.Bytes: + { + return nil, errors.New("*avro.Bytes ***IMPLEMENT ME***\n") + } + case avro.Int: + { + return nil, errors.New("*avro.Int ***IMPLEMENT ME***\n") + } + case avro.Long: + { + parserlog(fmt.Sprintf("avro.Long %T\n", schema)) + //TODO handle all logical type + if ps, ok := schema.(*avro.PrimitiveSchema); ok { + if ps.Logical() != nil { + if ps.Logical().Type() == avro.TimestampMicros { + parserlog(fmt.Sprintf("time\n")) + return reflect.TypeOf(time.Time{}), nil + } + } + } + parserlog(fmt.Sprintf("long\n")) + return reflect.TypeOf(int64(0)), nil + } + case avro.Float: + { + parserlog(fmt.Sprintf("*avro.Float\n")) + return reflect.TypeOf(float32(0)), nil + } + case avro.Double: + { + parserlog(fmt.Sprintf("*avro.Double\n")) + return reflect.TypeOf(float64(0)), nil + } + case avro.Boolean: + { + parserlog(fmt.Sprintf("*avro.Boolean\n")) + return reflect.TypeOf(bool(false)), nil + } + case avro.Null: + { + parserlog(fmt.Sprintf("*avro.Null\n")) + return reflect.TypeOf((*interface{})(nil)), nil + } + default: + { + return nil, errors.New("unsupported crap ***IMPLEMENT ME*** \n") + } + } +} + +func parserlog(msg string) { + // fmt.Printf(msg) +} + +func isAvroNullType(schema avro.Schema) bool { + if primitiveSchema, ok := schema.(*avro.NullSchema); ok { + return primitiveSchema.Type() == avro.Null + } + return false +} + +// This exports Struct's members +func capitalizeMember(s string) string { + if len(s) == 0 { + return s + } + return string(s[0]-32) + s[1:] +} diff --git a/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/handler.go b/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/handler.go index 30ad17f4136..c66837ba836 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/handler.go +++ b/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/handler.go @@ -24,6 +24,7 @@ import ( "regexp" "strconv" "strings" + "sync" "hopsworks.ai/rdrs/internal/common" "hopsworks.ai/rdrs/internal/config" @@ -303,6 +304,13 @@ func TranslateRonDbError(code int, err string) *feature_store.RestErrorCode { return fsError } +type Result struct { + Index int + Value *interface{} + Err *feature_store.RestErrorCode + Status api.FeatureStatus +} + func GetFeatureValues(ronDbResult *[]*api.PKReadResponseWithCodeJSON, entries *map[string]*json.RawMessage, featureView *feature_store.FeatureViewMetadata, includeDetailedStatus bool) (*[]interface{}, api.FeatureStatus, []*api.DetailedStatus, *feature_store.RestErrorCode) { featureValues := make([]interface{}, featureView.NumOfFeatures) var status = api.FEATURE_STATUS_COMPLETE @@ -331,23 +339,61 @@ func GetFeatureValues(ronDbResult *[]*api.PKReadResponseWithCodeJSON, entries *m } else if *response.Code != http.StatusOK { status = api.FEATURE_STATUS_ERROR } + + complexFeatures := 0 + for featureName, _ := range *response.Body.Data { + featureIndexKey := feature_store.GetFeatureIndexKeyByFgIndexKey(*response.Body.OperationID, featureName) + // When only primary key is selected, Rondb will return all columns, so not all value from response are needed. + if _, ok := (featureView.FeatureIndexLookup)[featureIndexKey]; ok { + if _, ok := (featureView.ComplexFeatures)[featureIndexKey]; ok { + complexFeatures++ + } + } + } + + var wg sync.WaitGroup + results := make(chan Result, complexFeatures) + for featureName, value := range *response.Body.Data { featureIndexKey := feature_store.GetFeatureIndexKeyByFgIndexKey(*response.Body.OperationID, featureName) // When only primary key is selected, Rondb will return all columns, so not all value from response are needed. if index, ok := (featureView.FeatureIndexLookup)[featureIndexKey]; ok { - if schema, ok := (featureView.ComplexFeatures)[featureIndexKey]; ok { - var deser, err1 = DeserialiseComplexFeature(value, schema) - if err1 != nil { - status = api.FEATURE_STATUS_ERROR - err = feature_store.DESERIALISE_FEATURE_FAIL.NewMessage(fmt.Sprintf("Feature name: %s; %s", featureName, err1.Error())) - } else { - featureValues[index] = deser - } + if complexFeature, ok := (featureView.ComplexFeatures)[featureIndexKey]; ok { + + wg.Add(1) + go func(idx int, complexFeature *feature_store.ComplexFeature) { + //fmt.Printf("Go routine started for index %d\n", idx) + defer wg.Done() + //defer fmt.Printf("Go routine stopped for index %d\n", idx) + myIndex := idx + deser, e := DeserialiseComplexFeature(value, complexFeature) + + if e != nil { + myStatus := api.FEATURE_STATUS_ERROR + myErr := feature_store.DESERIALISE_FEATURE_FAIL.NewMessage(fmt.Sprintf("Feature name: %s; %s", featureName, e.Error())) + results <- Result{Index: myIndex, Err: myErr, Status: myStatus} + } else { + results <- Result{Index: myIndex, Value: deser} + } + }(index, complexFeature) + } else { featureValues[index] = value } } } + wg.Wait() + close(results) + + for res := range results { + if res.Err != nil { + fmt.Printf("Failed: %v\n", res.Err) + err = res.Err + status = res.Status + } else { + featureValues[res.Index] = res.Value + } + } } // Fill in primary key value from request into the vector // If multiple matched entries are found, the priority of the entry follows the order in `GetBatchPkReadParams` diff --git a/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/util.go b/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/util.go index e688b7bf5eb..2570ca621cd 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/util.go +++ b/storage/ndb/rest-server/rest-api-server/internal/handlers/feature_store/util.go @@ -4,13 +4,15 @@ import ( "encoding/base64" "encoding/json" "fmt" + "reflect" "strings" "github.com/hamba/avro/v2" + "hopsworks.ai/rdrs/internal/feature_store" "hopsworks.ai/rdrs/internal/log" ) -func DeserialiseComplexFeature(value *json.RawMessage, schema *avro.Schema) (*interface{}, error) { +func DeserialiseComplexFeature(value *json.RawMessage, complexFeature *feature_store.ComplexFeature) (*interface{}, error) { valueString, err := decodeJSONString(value) if err != nil { if log.IsDebug() { @@ -26,8 +28,9 @@ func DeserialiseComplexFeature(value *json.RawMessage, schema *avro.Schema) (*in } return nil, err } - var avroDeserialized interface{} - err = avro.Unmarshal(*schema, jsonDecode, &avroDeserialized) + // var avroDeserialized interface{} + avroDeserialized := reflect.New(*complexFeature.Struct).Interface() + err = avro.Unmarshal(*complexFeature.Schema, jsonDecode, &avroDeserialized) if err != nil { if log.IsDebug() { log.Debugf("Failed to deserialize avro") @@ -35,7 +38,9 @@ func DeserialiseComplexFeature(value *json.RawMessage, schema *avro.Schema) (*in return nil, err } - nativeJson := ConvertAvroToJson(avroDeserialized) + // disard the top most wapper + nativeJson := reflect.ValueOf(avroDeserialized).Elem().Field(0).Interface() + // nativeJson := ConvertAvroToJson(avroDeserialized) return &nativeJson, err } diff --git a/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/handler_test.go b/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/handler_test.go index d1fcb459190..6e0404e0acc 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/handler_test.go +++ b/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/handler_test.go @@ -1302,14 +1302,28 @@ func Test_GetFeatureVector_Success_ComplexType(t *testing.T) { if err != nil { t.Fatalf("Cannot get sample data with error %s ", err) } - mapSchema, err := avro.Parse(`["null",{"type":"record","name":"r854762204","namespace":"struct","fields":[{"name":"int1","type":["null","long"]},{"name":"int2","type":["null","long"]}]}]`) + + // Map + mapSchema, err := avro.Parse(`{"type":"record","name":"sample_complex_type_1","namespace":"test_ken_featurestore.db","fields":[{"name":"struct","type":["null",{"type":"record","name":"r854762204","namespace":"struct","fields":[{"name":"int1","type":["null","long"]},{"name":"int2","type":["null","long"]}]}]}]}`) if err != nil { t.Fatal(err.Error()) } - arraySchema, err := avro.Parse(`["null",{"type":"array","items":["null","long"]}]`) + mapStruct, err := fsmetadata.ConvertAvroSchemaToStruct(mapSchema) if err != nil { t.Fatal(err.Error()) } + mapComplexFeature := fsmetadata.ComplexFeature{Schema: &mapSchema, Struct: &mapStruct} + + // Array + arraySchema, err := avro.Parse(`{"type":"record","name":"sample_complex_type_1","namespace":"test_ken_featurestore.db","fields":[{"name":"array","type":["null",{"type":"array","items":["null","long"]}]}]}`) + if err != nil { + t.Fatal(err.Error()) + } + arrayStruct, err := fsmetadata.ConvertAvroSchemaToStruct(arraySchema) + if err != nil { + t.Fatal(err.Error()) + } + arrayComplexFeature := fsmetadata.ComplexFeature{Schema: &arraySchema, Struct: &arrayStruct} for _, row := range rows { var fsReq = CreateFeatureStoreRequest( @@ -1329,7 +1343,7 @@ func Test_GetFeatureVector_Success_ComplexType(t *testing.T) { if err != nil { t.Fatalf("Cannot convert to json with error %s ", err) } - arrayPt, err := feature_store.DeserialiseComplexFeature(arrayJson, &arraySchema) // array + arrayPt, err := feature_store.DeserialiseComplexFeature(arrayJson, &arrayComplexFeature) // array row[2] = *arrayPt if err != nil { t.Fatalf("Cannot deserailize feature with error %s ", err) @@ -1339,7 +1353,7 @@ func Test_GetFeatureVector_Success_ComplexType(t *testing.T) { if err != nil { t.Fatalf("Cannot convert to json with error %s ", err) } - mapPt, err := feature_store.DeserialiseComplexFeature(mapJson, &mapSchema) // map + mapPt, err := feature_store.DeserialiseComplexFeature(mapJson, &mapComplexFeature) // map row[3] = *mapPt if err != nil { t.Fatalf("Cannot deserailize feature with error %s ", err) @@ -1358,10 +1372,15 @@ func Test_GetFeatureVector_Date_Array_Success_ComplexType(t *testing.T) { if err != nil { t.Fatalf("Cannot get sample data with error %s ", err) } - dataSchema, err := avro.Parse(`["null",{"type":"array","items":["null",{"type":"record","name":"myRecName","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]`) + dataSchema, err := avro.Parse(`{"type":"record","name":"date_array_1","namespace":"salmanap_featurestore.db","fields":[{"name":"data0","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]}]}`) + if err != nil { + t.Fatal(err.Error()) + } + dataStruct, err := fsmetadata.ConvertAvroSchemaToStruct(dataSchema) if err != nil { t.Fatal(err.Error()) } + dateComplexFeature := fsmetadata.ComplexFeature{Schema: &dataSchema, Struct: &dataStruct} for _, row := range rows { var fsReq = CreateFeatureStoreRequest( @@ -1376,21 +1395,24 @@ func Test_GetFeatureVector_Date_Array_Success_ComplexType(t *testing.T) { fsReq.MetadataRequest = &api.MetadataRequest{FeatureName: true, FeatureType: true} fsResp := GetFeatureStoreResponse(t, fsReq) - //indented, err := json.MarshalIndent(fsResp, "", " ") - //if err != nil { - // t.Fatalf("Cannot MarshalIndent. Error %s ", err) - //} - //fmt.Printf("Response: %s", string(indented)) + // indented, err := json.MarshalIndent(fsResp, "", " ") + // if err != nil { + // t.Fatalf("Cannot MarshalIndent. Error %s ", err) + // } + // fmt.Printf("Response: %s", string(indented)) + + for i := 1; i <= 5; i++ { + // convert data to object in json format + arrayJson, err := ConvertBinaryToJsonMessage(row[1+i]) // col 0=pk , 1=ts. therefore we start with 2 + if err != nil { + t.Fatalf("Cannot convert to json with error %s ", err) + } + arrayPt, err := feature_store.DeserialiseComplexFeature(arrayJson, &dateComplexFeature) // array + row[1+i] = *arrayPt + if err != nil { + t.Fatalf("Cannot deserailize feature with error %s ", err) + } - // convert data to object in json format - arrayJson, err := ConvertBinaryToJsonMessage(row[2]) - if err != nil { - t.Fatalf("Cannot convert to json with error %s ", err) - } - arrayPt, err := feature_store.DeserialiseComplexFeature(arrayJson, &dataSchema) // array - row[2] = *arrayPt - if err != nil { - t.Fatalf("Cannot deserailize feature with error %s ", err) } // validate ValidateResponseWithData(t, &row, &cols, fsResp) diff --git a/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/performance_test.go b/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/performance_test.go index c1f29e02028..0dae5933f6a 100644 --- a/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/performance_test.go +++ b/storage/ndb/rest-server/rest-api-server/internal/integrationtests/feature_store/performance_test.go @@ -48,6 +48,10 @@ import ( const totalNumRequest = 100000 +func Benchmark_date_arrays(b *testing.B) { + run(b, testdbs.FSDB002, "date_array", 1) +} + func Benchmark(b *testing.B) { run(b, testdbs.FSDB001, "sample_1", 1) } @@ -56,7 +60,7 @@ func Benchmark_join(b *testing.B) { run(b, testdbs.FSDB001, "sample_1n2", 1) } -const nrows = 100 +const nrows = 1 func getSampleData(fsName string, fvName string, fvVersion int) ([][]interface{}, []string, []string, error) { switch fmt.Sprintf("%s|%s|%d", fsName, fvName, fvVersion) { @@ -66,6 +70,8 @@ func getSampleData(fsName string, fvName string, fvVersion int) ([][]interface{} return GetNSampleData(testdbs.FSDB001, "sample_3_1", nrows) case "fsdb001|sample_1n2|1": return GetNSampleDataWithJoin(nrows, testdbs.FSDB001, "sample_1_1", testdbs.FSDB001, "sample_2_1", "fg2_") + case "fsdb002|date_array|1": + return GetNSampleData(testdbs.FSDB002, "date_array_1", 1) default: return nil, nil, nil, nil } diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/FSDB002.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/FSDB002.sql index 00b6607cad2..66ad1b458de 100644 --- a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/FSDB002.sql +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/FSDB002.sql @@ -272,17 +272,21 @@ VALUES CREATE TABLE `date_array_1` ( `pk` varchar(10) COLLATE utf8mb4_unicode_ci NOT NULL, `ts` timestamp NULL DEFAULT NULL, - `data` varbinary(100) DEFAULT NULL, + `data0` varbinary(100) DEFAULT NULL, + `data1` varbinary(100) DEFAULT NULL, + `data2` varbinary(100) DEFAULT NULL, + `data3` varbinary(100) DEFAULT NULL, + `data4` varbinary(100) DEFAULT NULL, PRIMARY KEY (`pk`) USING HASH ) ENGINE=ndbcluster; -INSERT INTO `date_array_1` VALUES ('pk-1','2025-01-08 15:33:04',0x020A02020E736B752D312D3002B8EF96BEF2CC950602020E736B752D312D3102B8EF96BEF2CC950602020E736B752D312D3202BAEF96BEF2CC950602020E736B752D312D3302BAEF96BEF2CC950602020E736B752D312D3402BCEF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-6','2025-01-08 15:33:04',0x020A02020E736B752D362D3002DAEF96BEF2CC950602020E736B752D362D3102DAEF96BEF2CC950602020E736B752D362D3202DCEF96BEF2CC950602020E736B752D362D3302DCEF96BEF2CC950602020E736B752D362D3402DEEF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-5','2025-01-08 15:33:04',0x020A02020E736B752D352D3002D2EF96BEF2CC950602020E736B752D352D3102D4EF96BEF2CC950602020E736B752D352D3202D4EF96BEF2CC950602020E736B752D352D3302D6EF96BEF2CC950602020E736B752D352D3402D6EF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-3','2025-01-08 15:33:04',0x020A02020E736B752D332D3002C6EF96BEF2CC950602020E736B752D332D3102C6EF96BEF2CC950602020E736B752D332D3202C8EF96BEF2CC950602020E736B752D332D3302C8EF96BEF2CC950602020E736B752D332D3402C8EF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-8','2025-01-08 15:33:04',0x020A02020E736B752D382D3002F6F296BEF2CC950602020E736B752D382D3102F6F296BEF2CC950602020E736B752D382D3202F8F296BEF2CC950602020E736B752D382D3302F8F296BEF2CC950602020E736B752D382D3402F8F296BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-0','2025-01-08 15:33:04',0x020A02020E736B752D302D3002AEEF96BEF2CC950602020E736B752D302D3102B0EF96BEF2CC950602020E736B752D302D3202B2EF96BEF2CC950602020E736B752D302D3302B2EF96BEF2CC950602020E736B752D302D3402B4EF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-4','2025-01-08 15:33:04',0x020A02020E736B752D342D3002CCEF96BEF2CC950602020E736B752D342D3102CCEF96BEF2CC950602020E736B752D342D3202CEEF96BEF2CC950602020E736B752D342D3302D0EF96BEF2CC950602020E736B752D342D3402D0EF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-7','2025-01-08 15:33:04',0x020A02020E736B752D372D3002E0EF96BEF2CC950602020E736B752D372D3102E0EF96BEF2CC950602020E736B752D372D3202F0F296BEF2CC950602020E736B752D372D3302F0F296BEF2CC950602020E736B752D372D3402F2F296BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-2','2025-01-08 15:33:04',0x020A02020E736B752D322D3002BEEF96BEF2CC950602020E736B752D322D3102C0EF96BEF2CC950602020E736B752D322D3202C0EF96BEF2CC950602020E736B752D322D3302C2EF96BEF2CC950602020E736B752D322D3402C2EF96BEF2CC950600); -INSERT INTO `date_array_1` VALUES ('pk-9','2025-01-08 15:33:04',0x020A02020E736B752D392D3002FCF296BEF2CC950602020E736B752D392D3102FEF296BEF2CC950602020E736B752D392D3202FEF296BEF2CC950602020E736B752D392D3302FEF296BEF2CC950602020E736B752D392D340280F396BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-1','2025-01-08 15:33:04',0x020A02020E736B752D312D3002B8EF96BEF2CC950602020E736B752D312D3102B8EF96BEF2CC950602020E736B752D312D3202BAEF96BEF2CC950602020E736B752D312D3302BAEF96BEF2CC950602020E736B752D312D3402BCEF96BEF2CC950600,0x020A02020E736B752D312D3002B8EF96BEF2CC950602020E736B752D312D3102B8EF96BEF2CC950602020E736B752D312D3202BAEF96BEF2CC950602020E736B752D312D3302BAEF96BEF2CC950602020E736B752D312D3402BCEF96BEF2CC950600,0x020A02020E736B752D312D3002B8EF96BEF2CC950602020E736B752D312D3102B8EF96BEF2CC950602020E736B752D312D3202BAEF96BEF2CC950602020E736B752D312D3302BAEF96BEF2CC950602020E736B752D312D3402BCEF96BEF2CC950600,0x020A02020E736B752D312D3002B8EF96BEF2CC950602020E736B752D312D3102B8EF96BEF2CC950602020E736B752D312D3202BAEF96BEF2CC950602020E736B752D312D3302BAEF96BEF2CC950602020E736B752D312D3402BCEF96BEF2CC950600,0x020A02020E736B752D312D3002B8EF96BEF2CC950602020E736B752D312D3102B8EF96BEF2CC950602020E736B752D312D3202BAEF96BEF2CC950602020E736B752D312D3302BAEF96BEF2CC950602020E736B752D312D3402BCEF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-6','2025-01-08 15:33:04',0x020A02020E736B752D362D3002DAEF96BEF2CC950602020E736B752D362D3102DAEF96BEF2CC950602020E736B752D362D3202DCEF96BEF2CC950602020E736B752D362D3302DCEF96BEF2CC950602020E736B752D362D3402DEEF96BEF2CC950600,0x020A02020E736B752D362D3002DAEF96BEF2CC950602020E736B752D362D3102DAEF96BEF2CC950602020E736B752D362D3202DCEF96BEF2CC950602020E736B752D362D3302DCEF96BEF2CC950602020E736B752D362D3402DEEF96BEF2CC950600,0x020A02020E736B752D362D3002DAEF96BEF2CC950602020E736B752D362D3102DAEF96BEF2CC950602020E736B752D362D3202DCEF96BEF2CC950602020E736B752D362D3302DCEF96BEF2CC950602020E736B752D362D3402DEEF96BEF2CC950600,0x020A02020E736B752D362D3002DAEF96BEF2CC950602020E736B752D362D3102DAEF96BEF2CC950602020E736B752D362D3202DCEF96BEF2CC950602020E736B752D362D3302DCEF96BEF2CC950602020E736B752D362D3402DEEF96BEF2CC950600,0x020A02020E736B752D362D3002DAEF96BEF2CC950602020E736B752D362D3102DAEF96BEF2CC950602020E736B752D362D3202DCEF96BEF2CC950602020E736B752D362D3302DCEF96BEF2CC950602020E736B752D362D3402DEEF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-5','2025-01-08 15:33:04',0x020A02020E736B752D352D3002D2EF96BEF2CC950602020E736B752D352D3102D4EF96BEF2CC950602020E736B752D352D3202D4EF96BEF2CC950602020E736B752D352D3302D6EF96BEF2CC950602020E736B752D352D3402D6EF96BEF2CC950600,0x020A02020E736B752D352D3002D2EF96BEF2CC950602020E736B752D352D3102D4EF96BEF2CC950602020E736B752D352D3202D4EF96BEF2CC950602020E736B752D352D3302D6EF96BEF2CC950602020E736B752D352D3402D6EF96BEF2CC950600,0x020A02020E736B752D352D3002D2EF96BEF2CC950602020E736B752D352D3102D4EF96BEF2CC950602020E736B752D352D3202D4EF96BEF2CC950602020E736B752D352D3302D6EF96BEF2CC950602020E736B752D352D3402D6EF96BEF2CC950600,0x020A02020E736B752D352D3002D2EF96BEF2CC950602020E736B752D352D3102D4EF96BEF2CC950602020E736B752D352D3202D4EF96BEF2CC950602020E736B752D352D3302D6EF96BEF2CC950602020E736B752D352D3402D6EF96BEF2CC950600,0x020A02020E736B752D352D3002D2EF96BEF2CC950602020E736B752D352D3102D4EF96BEF2CC950602020E736B752D352D3202D4EF96BEF2CC950602020E736B752D352D3302D6EF96BEF2CC950602020E736B752D352D3402D6EF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-3','2025-01-08 15:33:04',0x020A02020E736B752D332D3002C6EF96BEF2CC950602020E736B752D332D3102C6EF96BEF2CC950602020E736B752D332D3202C8EF96BEF2CC950602020E736B752D332D3302C8EF96BEF2CC950602020E736B752D332D3402C8EF96BEF2CC950600,0x020A02020E736B752D332D3002C6EF96BEF2CC950602020E736B752D332D3102C6EF96BEF2CC950602020E736B752D332D3202C8EF96BEF2CC950602020E736B752D332D3302C8EF96BEF2CC950602020E736B752D332D3402C8EF96BEF2CC950600,0x020A02020E736B752D332D3002C6EF96BEF2CC950602020E736B752D332D3102C6EF96BEF2CC950602020E736B752D332D3202C8EF96BEF2CC950602020E736B752D332D3302C8EF96BEF2CC950602020E736B752D332D3402C8EF96BEF2CC950600,0x020A02020E736B752D332D3002C6EF96BEF2CC950602020E736B752D332D3102C6EF96BEF2CC950602020E736B752D332D3202C8EF96BEF2CC950602020E736B752D332D3302C8EF96BEF2CC950602020E736B752D332D3402C8EF96BEF2CC950600,0x020A02020E736B752D332D3002C6EF96BEF2CC950602020E736B752D332D3102C6EF96BEF2CC950602020E736B752D332D3202C8EF96BEF2CC950602020E736B752D332D3302C8EF96BEF2CC950602020E736B752D332D3402C8EF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-8','2025-01-08 15:33:04',0x020A02020E736B752D382D3002F6F296BEF2CC950602020E736B752D382D3102F6F296BEF2CC950602020E736B752D382D3202F8F296BEF2CC950602020E736B752D382D3302F8F296BEF2CC950602020E736B752D382D3402F8F296BEF2CC950600,0x020A02020E736B752D382D3002F6F296BEF2CC950602020E736B752D382D3102F6F296BEF2CC950602020E736B752D382D3202F8F296BEF2CC950602020E736B752D382D3302F8F296BEF2CC950602020E736B752D382D3402F8F296BEF2CC950600,0x020A02020E736B752D382D3002F6F296BEF2CC950602020E736B752D382D3102F6F296BEF2CC950602020E736B752D382D3202F8F296BEF2CC950602020E736B752D382D3302F8F296BEF2CC950602020E736B752D382D3402F8F296BEF2CC950600,0x020A02020E736B752D382D3002F6F296BEF2CC950602020E736B752D382D3102F6F296BEF2CC950602020E736B752D382D3202F8F296BEF2CC950602020E736B752D382D3302F8F296BEF2CC950602020E736B752D382D3402F8F296BEF2CC950600,0x020A02020E736B752D382D3002F6F296BEF2CC950602020E736B752D382D3102F6F296BEF2CC950602020E736B752D382D3202F8F296BEF2CC950602020E736B752D382D3302F8F296BEF2CC950602020E736B752D382D3402F8F296BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-0','2025-01-08 15:33:04',0x020A02020E736B752D302D3002AEEF96BEF2CC950602020E736B752D302D3102B0EF96BEF2CC950602020E736B752D302D3202B2EF96BEF2CC950602020E736B752D302D3302B2EF96BEF2CC950602020E736B752D302D3402B4EF96BEF2CC950600,0x020A02020E736B752D302D3002AEEF96BEF2CC950602020E736B752D302D3102B0EF96BEF2CC950602020E736B752D302D3202B2EF96BEF2CC950602020E736B752D302D3302B2EF96BEF2CC950602020E736B752D302D3402B4EF96BEF2CC950600,0x020A02020E736B752D302D3002AEEF96BEF2CC950602020E736B752D302D3102B0EF96BEF2CC950602020E736B752D302D3202B2EF96BEF2CC950602020E736B752D302D3302B2EF96BEF2CC950602020E736B752D302D3402B4EF96BEF2CC950600,0x020A02020E736B752D302D3002AEEF96BEF2CC950602020E736B752D302D3102B0EF96BEF2CC950602020E736B752D302D3202B2EF96BEF2CC950602020E736B752D302D3302B2EF96BEF2CC950602020E736B752D302D3402B4EF96BEF2CC950600,0x020A02020E736B752D302D3002AEEF96BEF2CC950602020E736B752D302D3102B0EF96BEF2CC950602020E736B752D302D3202B2EF96BEF2CC950602020E736B752D302D3302B2EF96BEF2CC950602020E736B752D302D3402B4EF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-4','2025-01-08 15:33:04',0x020A02020E736B752D342D3002CCEF96BEF2CC950602020E736B752D342D3102CCEF96BEF2CC950602020E736B752D342D3202CEEF96BEF2CC950602020E736B752D342D3302D0EF96BEF2CC950602020E736B752D342D3402D0EF96BEF2CC950600,0x020A02020E736B752D342D3002CCEF96BEF2CC950602020E736B752D342D3102CCEF96BEF2CC950602020E736B752D342D3202CEEF96BEF2CC950602020E736B752D342D3302D0EF96BEF2CC950602020E736B752D342D3402D0EF96BEF2CC950600,0x020A02020E736B752D342D3002CCEF96BEF2CC950602020E736B752D342D3102CCEF96BEF2CC950602020E736B752D342D3202CEEF96BEF2CC950602020E736B752D342D3302D0EF96BEF2CC950602020E736B752D342D3402D0EF96BEF2CC950600,0x020A02020E736B752D342D3002CCEF96BEF2CC950602020E736B752D342D3102CCEF96BEF2CC950602020E736B752D342D3202CEEF96BEF2CC950602020E736B752D342D3302D0EF96BEF2CC950602020E736B752D342D3402D0EF96BEF2CC950600,0x020A02020E736B752D342D3002CCEF96BEF2CC950602020E736B752D342D3102CCEF96BEF2CC950602020E736B752D342D3202CEEF96BEF2CC950602020E736B752D342D3302D0EF96BEF2CC950602020E736B752D342D3402D0EF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-7','2025-01-08 15:33:04',0x020A02020E736B752D372D3002E0EF96BEF2CC950602020E736B752D372D3102E0EF96BEF2CC950602020E736B752D372D3202F0F296BEF2CC950602020E736B752D372D3302F0F296BEF2CC950602020E736B752D372D3402F2F296BEF2CC950600,0x020A02020E736B752D372D3002E0EF96BEF2CC950602020E736B752D372D3102E0EF96BEF2CC950602020E736B752D372D3202F0F296BEF2CC950602020E736B752D372D3302F0F296BEF2CC950602020E736B752D372D3402F2F296BEF2CC950600,0x020A02020E736B752D372D3002E0EF96BEF2CC950602020E736B752D372D3102E0EF96BEF2CC950602020E736B752D372D3202F0F296BEF2CC950602020E736B752D372D3302F0F296BEF2CC950602020E736B752D372D3402F2F296BEF2CC950600,0x020A02020E736B752D372D3002E0EF96BEF2CC950602020E736B752D372D3102E0EF96BEF2CC950602020E736B752D372D3202F0F296BEF2CC950602020E736B752D372D3302F0F296BEF2CC950602020E736B752D372D3402F2F296BEF2CC950600,0x020A02020E736B752D372D3002E0EF96BEF2CC950602020E736B752D372D3102E0EF96BEF2CC950602020E736B752D372D3202F0F296BEF2CC950602020E736B752D372D3302F0F296BEF2CC950602020E736B752D372D3402F2F296BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-2','2025-01-08 15:33:04',0x020A02020E736B752D322D3002BEEF96BEF2CC950602020E736B752D322D3102C0EF96BEF2CC950602020E736B752D322D3202C0EF96BEF2CC950602020E736B752D322D3302C2EF96BEF2CC950602020E736B752D322D3402C2EF96BEF2CC950600,0x020A02020E736B752D322D3002BEEF96BEF2CC950602020E736B752D322D3102C0EF96BEF2CC950602020E736B752D322D3202C0EF96BEF2CC950602020E736B752D322D3302C2EF96BEF2CC950602020E736B752D322D3402C2EF96BEF2CC950600,0x020A02020E736B752D322D3002BEEF96BEF2CC950602020E736B752D322D3102C0EF96BEF2CC950602020E736B752D322D3202C0EF96BEF2CC950602020E736B752D322D3302C2EF96BEF2CC950602020E736B752D322D3402C2EF96BEF2CC950600,0x020A02020E736B752D322D3002BEEF96BEF2CC950602020E736B752D322D3102C0EF96BEF2CC950602020E736B752D322D3202C0EF96BEF2CC950602020E736B752D322D3302C2EF96BEF2CC950602020E736B752D322D3402C2EF96BEF2CC950600,0x020A02020E736B752D322D3002BEEF96BEF2CC950602020E736B752D322D3102C0EF96BEF2CC950602020E736B752D322D3202C0EF96BEF2CC950602020E736B752D322D3302C2EF96BEF2CC950602020E736B752D322D3402C2EF96BEF2CC950600); +INSERT INTO `date_array_1` VALUES ('pk-9','2025-01-08 15:33:04',0x020A02020E736B752D392D3002FCF296BEF2CC950602020E736B752D392D3102FEF296BEF2CC950602020E736B752D392D3202FEF296BEF2CC950602020E736B752D392D3302FEF296BEF2CC950602020E736B752D392D340280F396BEF2CC950600,0x020A02020E736B752D392D3002FCF296BEF2CC950602020E736B752D392D3102FEF296BEF2CC950602020E736B752D392D3202FEF296BEF2CC950602020E736B752D392D3302FEF296BEF2CC950602020E736B752D392D340280F396BEF2CC950600,0x020A02020E736B752D392D3002FCF296BEF2CC950602020E736B752D392D3102FEF296BEF2CC950602020E736B752D392D3202FEF296BEF2CC950602020E736B752D392D3302FEF296BEF2CC950602020E736B752D392D340280F396BEF2CC950600,0x020A02020E736B752D392D3002FCF296BEF2CC950602020E736B752D392D3102FEF296BEF2CC950602020E736B752D392D3202FEF296BEF2CC950602020E736B752D392D3302FEF296BEF2CC950602020E736B752D392D340280F396BEF2CC950600,0x020A02020E736B752D392D3002FCF296BEF2CC950602020E736B752D392D3102FEF296BEF2CC950602020E736B752D392D3202FEF296BEF2CC950602020E736B752D392D3302FEF296BEF2CC950602020E736B752D392D340280F396BEF2CC950600); diff --git a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/hopsworks_40_data.sql b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/hopsworks_40_data.sql index 2d359b3c4c0..8095239b8b3 100644 --- a/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/hopsworks_40_data.sql +++ b/storage/ndb/rest-server/rest-api-server/resources/testdbs/fixed/hopsworks_40_data.sql @@ -812,7 +812,19 @@ VALUES 62, NULL, 33, 'ts', 'timestamp', 27, 1, 0, 0, 0, 21, NULL ), ( - 63, NULL, 33, 'data', 'array>', 27, 2, 0, 0, 0, 21, NULL + 63, NULL, 33, 'data0', 'array>', 27, 2, 0, 0, 0, 21, NULL + ), + ( + 64, NULL, 33, 'data1', 'array>', 27, 3, 0, 0, 0, 21, NULL + ), + ( + 65, NULL, 33, 'data2', 'array>', 27, 4, 0, 0, 0, 21, NULL + ), + ( + 66, NULL, 33, 'data3', 'array>', 27, 5, 0, 0, 0, 21, NULL + ), + ( + 67, NULL, 33, 'data4', 'array>', 27, 6, 0, 0, 0, 21, NULL ), ( 5148, NULL, 2069, 'data1', 'bigint', 5133, 2, 0, 0, 0, 4117, NULL @@ -1063,7 +1075,7 @@ VALUES 21, '{"type":"record","name":"sample_complex_type_1","namespace":"test_ken_featurestore.db","fields":[{"name":"id1","type":["null","long"]},{"name":"ts","type":["null","long"]},{"name":"array","type":["null",{"type":"array","items":["null","long"]}]},{"name":"struct","type":["null",{"type":"record","name":"r854762204","namespace":"struct","fields":[{"name":"int1","type":["null","long"]},{"name":"int2","type":["null","long"]}]}]}]}', 1001 ), ( - 23, '{"type":"record","name":"date_array_1","namespace":"salmanap_featurestore.db","fields":[{"name":"pk","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]},{"name":"data","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]}]}', 1001 + 23, '{"type":"record","name":"date_array_1","namespace":"salmanap_featurestore.db","fields":[{"name":"pk","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]},{"name":"data0","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]},{"name":"data1","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]},{"name":"data2","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]},{"name":"data3","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]},{"name":"data4","type":["null",{"type":"array","items":["null",{"type":"record","name":"r515636140","namespace":"data","fields":[{"name":"sku","type":["null","string"]},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}]}]}]}', 1001 ); INSERT INTO