From e82fe622b1efb7b0956bb94fe6ab2b4bc994fdff Mon Sep 17 00:00:00 2001 From: ngjaying Date: Thu, 4 Jul 2024 15:07:15 +0800 Subject: [PATCH] feat(graph): restore graph api (#2979) Signed-off-by: Jiyong Huang --- internal/server/rule_manager.go | 9 +- internal/server/rule_migration.go | 306 ++++++ internal/topo/graph/io.go | 166 ++++ internal/topo/graph/node.go | 70 ++ internal/topo/node/contract.go | 7 + internal/topo/planner/ext_graph_node.go | 50 + internal/topo/planner/planner.go | 3 +- internal/topo/planner/planner_graph.go | 824 ++++++++++++++++ internal/topo/planner/planner_graph_test.go | 998 ++++++++++++++++++++ internal/topo/planner/planner_sink.go | 200 ++-- internal/topo/planner/planner_sink_test.go | 2 +- test/run_jmeter.sh | 34 +- 12 files changed, 2570 insertions(+), 99 deletions(-) create mode 100644 internal/topo/graph/io.go create mode 100644 internal/topo/graph/node.go create mode 100644 internal/topo/planner/ext_graph_node.go create mode 100644 internal/topo/planner/planner_graph.go create mode 100644 internal/topo/planner/planner_graph_test.go diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index 1fb2ccb223..fc4792a56c 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/pkg/store" "github.com/lf-edge/ekuiper/v2/internal/server/promMetrics" + "github.com/lf-edge/ekuiper/v2/internal/topo/planner" "github.com/lf-edge/ekuiper/v2/internal/topo/rule" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/cast" @@ -458,6 +459,12 @@ func validateRule(name, ruleJson string) ([]string, bool, error) { return nil, false, err } } + } else if rule.Graph != nil { + tp, err := planner.PlanByGraph(rule) + if err != nil { + return nil, false, fmt.Errorf("invalid rule graph: %v", err) + } + sources = tp.GetTopo().Sources } return sources, true, nil } diff --git a/internal/server/rule_migration.go b/internal/server/rule_migration.go index 3a0230c335..f470fee426 100644 --- a/internal/server/rule_migration.go +++ b/internal/server/rule_migration.go @@ -16,15 +16,19 @@ package server import ( "encoding/json" + "errors" + "fmt" "strings" "github.com/lf-edge/ekuiper/v2/internal/meta" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" store2 "github.com/lf-edge/ekuiper/v2/internal/pkg/store" "github.com/lf-edge/ekuiper/v2/internal/processor" + "github.com/lf-edge/ekuiper/v2/internal/topo/graph" "github.com/lf-edge/ekuiper/v2/internal/topo/node/conf" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/ast" + "github.com/lf-edge/ekuiper/v2/pkg/cast" ) type RuleMigrationProcessor struct { @@ -65,6 +69,7 @@ type dependencies struct { func ruleTraverse(rule *def.Rule, de *dependencies) { sql := rule.Sql + ruleGraph := rule.Graph if sql != "" { stmt, err := xsql.GetStatementFromSql(sql) if err != nil { @@ -145,6 +150,162 @@ func ruleTraverse(rule *def.Rule, de *dependencies) { // Rules de.rules = append(de.rules, rule.Id) + } else { + for _, gn := range ruleGraph.Nodes { + switch gn.Type { + case "source": + sourceOption := &ast.Options{} + err := cast.MapToStruct(gn.Props, sourceOption) + if err != nil { + break + } + sourceOption.TYPE = gn.NodeType + + de.sources = append(de.sources, sourceOption.TYPE) + // get config key + _, ok := de.sourceConfigKeys[sourceOption.TYPE] + if ok { + de.sourceConfigKeys[sourceOption.TYPE] = append(de.sourceConfigKeys[sourceOption.TYPE], sourceOption.CONF_KEY) + } else { + var confKeys []string + confKeys = append(confKeys, sourceOption.CONF_KEY) + de.sourceConfigKeys[sourceOption.TYPE] = confKeys + } + // get schema id + if sourceOption.SCHEMAID != "" { + r := strings.Split(sourceOption.SCHEMAID, ".") + de.schemas = append(de.schemas, sourceOption.FORMAT+"_"+r[0]) + } + case "sink": + sinkType := gn.NodeType + props := gn.Props + de.sinks = append(de.sinks, sinkType) + resourceId, ok := props[conf.ResourceID].(string) + if ok { + _, ok := de.sinkConfigKeys[sinkType] + if ok { + de.sinkConfigKeys[sinkType] = append(de.sinkConfigKeys[sinkType], resourceId) + } else { + var confKeys []string + confKeys = append(confKeys, resourceId) + de.sinkConfigKeys[sinkType] = confKeys + } + } + + format, ok := props["format"].(string) + if ok && format != "json" { + schemaId, ok := props["schemaId"].(string) + if ok { + r := strings.Split(schemaId, ".") + de.schemas = append(de.schemas, format+"_"+r[0]) + } + } + case "operator": + nt := strings.ToLower(gn.NodeType) + switch nt { + case "function": + fop, err := parseFunc(gn.Props) + if err != nil { + break + } + ast.WalkFunc(fop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "aggfunc": + fop, err := parseFunc(gn.Props) + if err != nil { + break + } + ast.WalkFunc(fop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "filter": + fop, err := parseFilter(gn.Props) + if err != nil { + break + } + ast.WalkFunc(fop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "pick": + pop, err := parsePick(gn.Props) + if err != nil { + break + } + ast.WalkFunc(pop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "join": + jop, err := parseJoin(gn.Props) + if err != nil { + break + } + ast.WalkFunc(jop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "groupby": + gop, err := parseGroupBy(gn.Props) + if err != nil { + break + } + ast.WalkFunc(gop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "orderby": + oop, err := parseOrderBy(gn.Props) + if err != nil { + break + } + ast.WalkFunc(oop, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + case "switch": + opArray, err := parseSwitch(gn.Props) + if err != nil { + break + } + for _, op := range opArray { + ast.WalkFunc(op, func(n ast.Node) bool { + switch f := n.(type) { + case *ast.Call: + de.functions = append(de.functions, f.Name) + } + return true + }) + } + } + default: + break + } + } } } @@ -271,3 +432,148 @@ func (p *RuleMigrationProcessor) exportSelected(de *dependencies, config *Config config.Uploads = uploadsExport() } + +func parsePick(props map[string]interface{}) (*ast.SelectStatement, error) { + n := &graph.Select{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + stmt, err := xsql.NewParser(strings.NewReader("select " + strings.Join(n.Fields, ",") + " from nonexist")).Parse() + if err != nil { + return nil, err + } else { + return stmt, nil + } +} + +func parseFunc(props map[string]interface{}) (*ast.SelectStatement, error) { + m, ok := props["expr"] + if !ok { + return nil, errors.New("no expr") + } + funcExpr, ok := m.(string) + if !ok { + return nil, fmt.Errorf("expr %v is not string", m) + } + stmt, err := xsql.NewParser(strings.NewReader("select " + funcExpr + " from nonexist")).Parse() + if err != nil { + return nil, err + } else { + return stmt, nil + } +} + +func parseFilter(props map[string]interface{}) (ast.Expr, error) { + m, ok := props["expr"] + if !ok { + return nil, errors.New("no expr") + } + conditionExpr, ok := m.(string) + if !ok { + return nil, fmt.Errorf("expr %v is not string", m) + } + p := xsql.NewParser(strings.NewReader(" where " + conditionExpr)) + if exp, err := p.ParseCondition(); err != nil { + return nil, err + } else { + return exp, nil + } +} + +func parseHaving(props map[string]interface{}) (ast.Expr, error) { + m, ok := props["expr"] + if !ok { + return nil, errors.New("no expr") + } + conditionExpr, ok := m.(string) + if !ok { + return nil, fmt.Errorf("expr %v is not string", m) + } + p := xsql.NewParser(strings.NewReader("where " + conditionExpr)) + if exp, err := p.ParseCondition(); err != nil { + return nil, err + } else { + return exp, nil + } +} + +func parseSwitch(props map[string]interface{}) ([]ast.Expr, error) { + n := &graph.Switch{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if len(n.Cases) == 0 { + return nil, fmt.Errorf("switch node must have at least one case") + } + caseExprs := make([]ast.Expr, len(n.Cases)) + for i, c := range n.Cases { + p := xsql.NewParser(strings.NewReader("where " + c)) + if exp, err := p.ParseCondition(); err != nil { + return nil, fmt.Errorf("parse case %d error: %v", i, err) + } else { + if exp != nil { + caseExprs[i] = exp + } + } + } + return caseExprs, nil +} + +func parseOrderBy(props map[string]interface{}) (*ast.SelectStatement, error) { + n := &graph.Orderby{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + stmt := "SELECT * FROM unknown ORDER BY" + for _, s := range n.Sorts { + stmt += " " + s.Field + " " + if s.Desc { + stmt += "DESC" + } + } + p, err := xsql.NewParser(strings.NewReader(stmt)).Parse() + if err != nil { + return nil, fmt.Errorf("invalid order by statement error: %v", err) + } else { + return p, nil + } +} + +func parseGroupBy(props map[string]interface{}) (*ast.SelectStatement, error) { + n := &graph.Groupby{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if len(n.Dimensions) == 0 { + return nil, fmt.Errorf("groupby must have at least one dimension") + } + stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",") + p, err := xsql.NewParser(strings.NewReader(stmt)).Parse() + if err != nil { + return nil, fmt.Errorf("invalid join statement error: %v", err) + } else { + return p, nil + } +} + +func parseJoin(props map[string]interface{}) (*ast.SelectStatement, error) { + n := &graph.Join{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + stmt := "SELECT * FROM " + n.From + for _, join := range n.Joins { + stmt += " " + join.Type + " JOIN ON " + join.On + } + p, err := xsql.NewParser(strings.NewReader(stmt)).Parse() + if err != nil { + return nil, fmt.Errorf("invalid join statement error: %v", err) + } else { + return p, nil + } +} diff --git a/internal/topo/graph/io.go b/internal/topo/graph/io.go new file mode 100644 index 0000000000..63ed88e398 --- /dev/null +++ b/internal/topo/graph/io.go @@ -0,0 +1,166 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +import "fmt" + +type ( + IoInputType uint8 + IoRowType uint8 + IoCollectionType uint8 +) + +const ( + IOINPUT_TYPE_SAME IoInputType = iota + IOINPUT_TYPE_ROW // 0b01 + IOINPUT_TYPE_COLLECTION // 0b10 + IOINPUT_TYPE_ANY // 0b11 +) + +var inputTypes = map[IoInputType]string{ + IOINPUT_TYPE_ROW: "row", + IOINPUT_TYPE_COLLECTION: "collection", + IOINPUT_TYPE_ANY: "any", + IOINPUT_TYPE_SAME: "same", +} + +const ( + IOROW_TYPE_SAME IoRowType = iota + IOROW_TYPE_SINGLE // 0b01 + IOROW_TYPE_MERGED // 0b10 + IOROW_TYPE_ANY // 0b11 +) + +var rowTypes = map[IoRowType]string{ + IOROW_TYPE_SINGLE: "single emitter row", + IOROW_TYPE_MERGED: "merged row", + IOROW_TYPE_ANY: "any", + IOROW_TYPE_SAME: "same", +} + +const ( + IOCOLLECTION_TYPE_SAME IoCollectionType = iota + IOCOLLECTION_TYPE_SINGLE + IOCOLLECTION_TYPE_GROUPED + IOCOLLECTION_TYPE_ANY +) + +var collectionsTypes = map[IoCollectionType]string{ + IOCOLLECTION_TYPE_SINGLE: "non-grouped collection", + IOCOLLECTION_TYPE_GROUPED: "grouped collection", + IOCOLLECTION_TYPE_ANY: "any", + IOCOLLECTION_TYPE_SAME: "same", +} + +// IOType is the type of input/output +// all fields are default to any +type IOType struct { + Type IoInputType `json:"type"` + RowType IoRowType `json:"rowType"` + CollectionType IoCollectionType `json:"collectionType"` + AllowMulti bool `json:"allowMulti"` +} + +// NewIOType creates a new IOType +func NewIOType() *IOType { + return &IOType{ + Type: IOINPUT_TYPE_ANY, + RowType: IOROW_TYPE_ANY, + CollectionType: IOCOLLECTION_TYPE_ANY, + } +} + +func Fit(value, condition *IOType) (bool, error) { + if value.Type&condition.Type == 0 { + return false, fmt.Errorf("input type mismatch, expect %s, got %s", inputTypes[condition.Type], inputTypes[value.Type]) + } + if value.RowType&condition.RowType == 0 { + return false, fmt.Errorf("row type mismatch, expect %s, got %s", rowTypes[condition.RowType], rowTypes[value.RowType]) + } + if value.CollectionType&condition.CollectionType == 0 { + return false, fmt.Errorf("collection type mismatch, expect %s, got %s", collectionsTypes[condition.CollectionType], collectionsTypes[value.CollectionType]) + } + return true, nil +} + +func MapOut(previous, origin *IOType) (result *IOType) { + result = NewIOType() + if origin.Type == IOINPUT_TYPE_SAME { + result.Type = previous.Type + result.RowType = previous.RowType + result.CollectionType = previous.CollectionType + } else { + result.Type = origin.Type + if origin.RowType == IOROW_TYPE_SAME { + result.RowType = previous.RowType + } else { + result.RowType = origin.RowType + } + if origin.CollectionType == IOCOLLECTION_TYPE_SAME { + result.CollectionType = previous.CollectionType + } else { + result.CollectionType = origin.CollectionType + } + } + return +} + +// OpIO The io constraints for a node +var OpIO = map[string][]*IOType{ + "aggfunc": { + {Type: IOINPUT_TYPE_COLLECTION, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY}, + {Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_GROUPED}, + }, + "filter": { + {Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY}, + {Type: IOINPUT_TYPE_SAME}, + }, + "function": { + {Type: IOINPUT_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_ANY}, + {Type: IOINPUT_TYPE_SAME}, + }, + "groupby": { + {Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_ANY}, + {Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_GROUPED}, + }, + "join": { + {Type: IOINPUT_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_SINGLE, AllowMulti: true}, + {Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_MERGED}, + }, + "orderby": { + {Type: IOINPUT_TYPE_COLLECTION, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY}, + {Type: IOINPUT_TYPE_SAME}, + }, + "pick": { + {Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY}, + {Type: IOINPUT_TYPE_SAME}, + }, + "watermark": { + {Type: IOINPUT_TYPE_ROW, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY, AllowMulti: true}, + {Type: IOINPUT_TYPE_SAME}, + }, + "window": { + {Type: IOINPUT_TYPE_ROW, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY, AllowMulti: true}, + {Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_SINGLE}, + }, + "switch": { + {Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY}, + {Type: IOINPUT_TYPE_SAME}, + }, + "script": { + {Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY}, + {Type: IOINPUT_TYPE_SAME}, + }, +} diff --git a/internal/topo/graph/node.go b/internal/topo/graph/node.go new file mode 100644 index 0000000000..f1afbd601f --- /dev/null +++ b/internal/topo/graph/node.go @@ -0,0 +1,70 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graph + +type Function struct { + Expr string `json:"expr"` +} + +type Filter struct { + Expr string `json:"expr"` +} + +type Select struct { + Fields []string `json:"fields"` + IsAgg bool `json:"isAgg"` +} + +type Watermark struct { + Emitters []string `json:"emitters"` + SendWatermark bool `json:"sendWatermark"` +} + +type Window struct { + Type string `json:"type"` + Unit string `json:"unit"` + Size int `json:"size"` + Interval int `json:"interval"` +} + +type Join struct { + From string `json:"from"` + Joins []struct { + Name string `json:"name"` + Type string `json:"type"` + On string `json:"on"` + } +} + +type Groupby struct { + Dimensions []string `json:"dimensions"` +} + +type Orderby struct { + Sorts []struct { + Field string `json:"field"` + Desc bool `json:"desc"` + } +} + +type Switch struct { + Cases []string `json:"cases"` + StopAtFirstMatch bool `json:"stopAtFirstMatch"` +} + +type Script struct { + Script string `json:"script"` + IsAgg bool `json:"isAgg"` +} diff --git a/internal/topo/node/contract.go b/internal/topo/node/contract.go index 284d32b7e9..2a10ad12d6 100644 --- a/internal/topo/node/contract.go +++ b/internal/topo/node/contract.go @@ -33,6 +33,13 @@ type TopNode interface { GetName() string } +// CompNode is a composite node. For implicit splitted nodes +// For example, sink node or source node may be implemented internally as a collection of connected nodes +type CompNode interface { + TopNode + Nodes() []TopNode +} + type MetricNode interface { GetMetrics() []any RemoveMetrics(ruleId string) diff --git a/internal/topo/planner/ext_graph_node.go b/internal/topo/planner/ext_graph_node.go new file mode 100644 index 0000000000..0efd6dbf84 --- /dev/null +++ b/internal/topo/planner/ext_graph_node.go @@ -0,0 +1,50 @@ +// Copyright 2023-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build script + +package planner + +import ( + "fmt" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + "github.com/lf-edge/ekuiper/v2/internal/topo/graph" + "github.com/lf-edge/ekuiper/v2/internal/topo/node" + "github.com/lf-edge/ekuiper/v2/internal/topo/operator" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +func init() { + extNodes["script"] = func(name string, props map[string]interface{}, options *def.RuleOption) (node.TopNode, error) { + sop, err := parseScript(props) + if err != nil { + return nil, err + } + op := Transform(sop, name, options) + return op, nil + } +} + +func parseScript(props map[string]interface{}) (*operator.ScriptOp, error) { + n := &graph.Script{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if n.Script == "" { + return nil, fmt.Errorf("script node must have script") + } + return operator.NewScriptOp(n.Script, n.IsAgg) +} diff --git a/internal/topo/planner/planner.go b/internal/topo/planner/planner.go index 179e50283c..6863c8e277 100644 --- a/internal/topo/planner/planner.go +++ b/internal/topo/planner/planner.go @@ -34,8 +34,9 @@ import ( func Plan(rule *def.Rule) (*topo.Topo, error) { if rule.Sql != "" { return PlanSQLWithSourcesAndSinks(rule, nil) + } else { + return PlanByGraph(rule) } - return nil, nil } // PlanSQLWithSourcesAndSinks For test only diff --git a/internal/topo/planner/planner_graph.go b/internal/topo/planner/planner_graph.go new file mode 100644 index 0000000000..72d84c8a64 --- /dev/null +++ b/internal/topo/planner/planner_graph.go @@ -0,0 +1,824 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/lf-edge/ekuiper/v2/internal/binder/function" + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + store2 "github.com/lf-edge/ekuiper/v2/internal/pkg/store" + "github.com/lf-edge/ekuiper/v2/internal/topo" + "github.com/lf-edge/ekuiper/v2/internal/topo/graph" + "github.com/lf-edge/ekuiper/v2/internal/topo/node" + "github.com/lf-edge/ekuiper/v2/internal/topo/operator" + "github.com/lf-edge/ekuiper/v2/internal/xsql" + "github.com/lf-edge/ekuiper/v2/pkg/ast" + "github.com/lf-edge/ekuiper/v2/pkg/cast" + "github.com/lf-edge/ekuiper/v2/pkg/kv" +) + +type genNodeFunc func(name string, props map[string]interface{}, options *def.RuleOption) (node.TopNode, error) + +var extNodes = map[string]genNodeFunc{} + +type sourceType int + +const ( + ILLEGAL sourceType = iota + STREAM + SCANTABLE + LOOKUPTABLE +) + +// PlanByGraph returns a topo.Topo object by a graph +func PlanByGraph(rule *def.Rule) (*topo.Topo, error) { + ruleGraph := rule.Graph + if ruleGraph == nil { + return nil, errors.New("no graph") + } + tp, err := topo.NewWithNameAndOptions(rule.Id, rule.Options) + if err != nil { + return nil, err + } + var ( + nodeMap = make(map[string]node.TopNode) + sinks = make(map[string]bool) + sources = make(map[string]bool) + store kv.KeyValue + lookupTableChildren = make(map[string]*ast.Options) + scanTableEmitters []string + sourceNames []string + streamEmitters = make(map[string]struct{}) + ) + for _, srcName := range ruleGraph.Topo.Sources { + gn, ok := ruleGraph.Nodes[srcName] + if !ok { + return nil, fmt.Errorf("source node %s not defined", srcName) + } + if _, ok := ruleGraph.Topo.Edges[srcName]; !ok { + return nil, fmt.Errorf("no edge defined for source node %s", srcName) + } + srcNode, srcType, name, ops, err := parseSource(srcName, gn, rule, tp, store, lookupTableChildren) + if err != nil { + return nil, fmt.Errorf("parse source %s with %v error: %w", srcName, gn.Props, err) + } + switch srcType { + case STREAM: + streamEmitters[name] = struct{}{} + sourceNames = append(sourceNames, name) + case SCANTABLE: + scanTableEmitters = append(scanTableEmitters, name) + sourceNames = append(sourceNames, name) + case LOOKUPTABLE: + sourceNames = append(sourceNames, name) + } + if srcNode != nil { + nodeMap[srcName] = srcNode + tp.AddSrc(srcNode) + inputs := []node.Emitter{srcNode} + for _, e := range ops { + tp.AddOperator(inputs, e) + inputs = []node.Emitter{e} + nodeMap[srcName] = e + } + } + sources[srcName] = true + } + tp.SetStreams(sourceNames) + for nodeName, gn := range ruleGraph.Nodes { + switch gn.Type { + case "source": // handled above, + continue + case "sink": + if _, ok := ruleGraph.Topo.Edges[nodeName]; ok { + return nil, fmt.Errorf("sink %s has edge", nodeName) + } + cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames)) + if err != nil { + return nil, err + } + nodeMap[nodeName] = cn + sinks[nodeName] = true + case "operator": + if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok { + return nil, fmt.Errorf("no edge defined for operator node %s", nodeName) + } + nt := strings.ToLower(gn.NodeType) + switch nt { + case "watermark": + n, err := parseWatermark(gn.Props, streamEmitters) + if err != nil { + return nil, fmt.Errorf("parse watermark %s with %v error: %w", nodeName, gn.Props, err) + } + op := node.NewWatermarkOp(nodeName, n.SendWatermark, n.Emitters, rule.Options) + nodeMap[nodeName] = op + case "function": + fop, err := parseFunc(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse function %s with %v error: %w", nodeName, gn.Props, err) + } + op := Transform(fop, nodeName, rule.Options) + nodeMap[nodeName] = op + case "aggfunc": + fop, err := parseFunc(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse aggfunc %s with %v error: %w", nodeName, gn.Props, err) + } + fop.IsAgg = true + op := Transform(fop, nodeName, rule.Options) + nodeMap[nodeName] = op + case "filter": + fop, err := parseFilter(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse filter %s with %v error: %w", nodeName, gn.Props, err) + } + op := Transform(fop, nodeName, rule.Options) + nodeMap[nodeName] = op + case "pick": + pop, err := parsePick(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse pick %s with %v error: %w", nodeName, gn.Props, err) + } + op := Transform(pop, nodeName, rule.Options) + nodeMap[nodeName] = op + case "window": + wconf, err := parseWindow(gn.Props) + if err != nil { + return nil, fmt.Errorf("parse window conf %s with %v error: %w", nodeName, gn.Props, err) + } + op, err := node.NewWindowOp(nodeName, *wconf, rule.Options) + if err != nil { + return nil, fmt.Errorf("parse window %s with %v error: %w", nodeName, gn.Props, err) + } + nodeMap[nodeName] = op + case "join": + stmt, err := parseJoinAst(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse join %s with %v error: %w", nodeName, gn.Props, err) + } + fromNode := stmt.Sources[0].(*ast.Table) + if _, ok := streamEmitters[fromNode.Name]; !ok { + return nil, fmt.Errorf("parse join %s with %v error: join source %s is not a stream", nodeName, gn.Props, fromNode.Name) + } + hasLookup := false + if stmt.Joins != nil { + if len(lookupTableChildren) > 0 { + var joins []ast.Join + for _, join := range stmt.Joins { + if hasLookup { + return nil, fmt.Errorf("parse join %s with %v error: only support to join one lookup table with one stream", nodeName, gn.Props) + } + if streamOpt, ok := lookupTableChildren[join.Name]; ok { + hasLookup = true + lookupPlan := LookupPlan{ + joinExpr: join, + options: streamOpt, + } + if !lookupPlan.validateAndExtractCondition() { + return nil, fmt.Errorf("parse join %s with %v error: join condition %s is invalid, at least one equi-join predicate is required", nodeName, gn.Props, join.Expr) + } + op, err := planLookupSource(tp.GetContext(), &lookupPlan, rule.Options) + if err != nil { + return nil, fmt.Errorf("parse join %s with %v error: fail to create lookup node", nodeName, gn.Props) + } + nodeMap[nodeName] = op.(node.TopNode) + } else { + joins = append(joins, join) + } + } + stmt.Joins = joins + } + // Not all joins are lookup joins, so we need to create a join plan for the remaining joins + if len(stmt.Joins) > 0 && !hasLookup { + if len(scanTableEmitters) > 0 { + return nil, fmt.Errorf("parse join %s with %v error: do not support scan table %s yet", nodeName, gn.Props, scanTableEmitters) + } + jop := &operator.JoinOp{Joins: stmt.Joins, From: fromNode} + op := Transform(jop, nodeName, rule.Options) + nodeMap[nodeName] = op + } + } + case "groupby": + gop, err := parseGroupBy(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse groupby %s with %v error: %w", nodeName, gn.Props, err) + } + op := Transform(gop, nodeName, rule.Options) + nodeMap[nodeName] = op + case "orderby": + oop, err := parseOrderBy(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse orderby %s with %v error: %w", nodeName, gn.Props, err) + } + op := Transform(oop, nodeName, rule.Options) + nodeMap[nodeName] = op + case "switch": + sconf, err := parseSwitch(gn.Props, sourceNames) + if err != nil { + return nil, fmt.Errorf("parse switch %s with %v error: %w", nodeName, gn.Props, err) + } + op, err := node.NewSwitchNode(nodeName, sconf, rule.Options) + if err != nil { + return nil, fmt.Errorf("create switch %s with %v error: %w", nodeName, gn.Props, err) + } + nodeMap[nodeName] = op + default: + gnf, ok := extNodes[nt] + if !ok { + return nil, fmt.Errorf("unknown operator type %s", gn.NodeType) + } + op, err := gnf(nodeName, gn.Props, rule.Options) + if err != nil { + return nil, err + } + nodeMap[nodeName] = op + } + default: + return nil, fmt.Errorf("unknown node type %s", gn.Type) + } + } + + // validate source node + for _, nodeName := range ruleGraph.Topo.Sources { + if _, ok := sources[nodeName]; !ok { + return nil, fmt.Errorf("source %s is not a source type node", nodeName) + } + } + + // reverse edges, value is a 2-dim array. Only switch node will have the second dim + reversedEdges := make(map[string][][]string) + rclone := make(map[string][]string) + for fromNode, toNodes := range ruleGraph.Topo.Edges { + if _, ok := ruleGraph.Nodes[fromNode]; !ok { + return nil, fmt.Errorf("node %s is not defined", fromNode) + } + for i, toNode := range toNodes { + switch tn := toNode.(type) { + case string: + if _, ok := ruleGraph.Nodes[tn]; !ok { + return nil, fmt.Errorf("node %s is not defined", tn) + } + if _, ok := reversedEdges[tn]; !ok { + reversedEdges[tn] = make([][]string, 1) + } + reversedEdges[tn][0] = append(reversedEdges[tn][0], fromNode) + rclone[tn] = append(rclone[tn], fromNode) + case []interface{}: + for _, tni := range tn { + tnn, ok := tni.(string) + if !ok { // never happen + return nil, fmt.Errorf("invalid edge toNode %v", toNode) + } + if _, ok := ruleGraph.Nodes[tnn]; !ok { + return nil, fmt.Errorf("node %s is not defined", tnn) + } + for len(reversedEdges[tnn]) <= i { + reversedEdges[tnn] = append(reversedEdges[tnn], []string{}) + } + reversedEdges[tnn][i] = append(reversedEdges[tnn][i], fromNode) + rclone[tnn] = append(rclone[tnn], fromNode) + } + } + } + } + // sort the nodes by topological order + nodesInOrder := make([]string, len(ruleGraph.Nodes)) + i := 0 + genNodesInOrder(ruleGraph.Topo.Sources, ruleGraph.Topo.Edges, rclone, nodesInOrder, i) + + // validate the typo + // the map is to record the output for each node + dataFlow := make(map[string]*graph.IOType) + for _, n := range nodesInOrder { + gn := ruleGraph.Nodes[n] + if gn == nil { + return nil, fmt.Errorf("can't find node %s", n) + } + if gn.Type == "source" { + dataFlow[n] = &graph.IOType{ + Type: graph.IOINPUT_TYPE_ROW, + RowType: graph.IOROW_TYPE_SINGLE, + CollectionType: graph.IOCOLLECTION_TYPE_ANY, + AllowMulti: false, + } + } else if gn.Type == "sink" { + continue + } else { + nodeIO, ok := graph.OpIO[strings.ToLower(gn.NodeType)] + if !ok { + return nil, fmt.Errorf("can't find the io definition for node type %s", gn.NodeType) + } + dataInCondition := nodeIO[0] + indim := reversedEdges[n] + var innodes []string + for _, in := range indim { + innodes = append(innodes, in...) + } + if len(innodes) > 1 { + if dataInCondition.AllowMulti { + // special case for join which does not allow multiple streams + if gn.NodeType == "join" { + joinStreams := 0 + for _, innode := range innodes { + if _, isLookup := lookupTableChildren[innode]; !isLookup { + joinStreams++ + } + if joinStreams > 1 { + return nil, fmt.Errorf("join node %s does not allow multiple stream inputs", n) + } + } + } + for _, innode := range innodes { + _, err = graph.Fit(dataFlow[innode], dataInCondition) + if err != nil { + return nil, fmt.Errorf("node %s output does not match node %s input: %v", innode, n, err) + } + } + } else { + return nil, fmt.Errorf("operator %s of type %s does not allow multiple inputs", n, gn.NodeType) + } + } else if len(innodes) == 1 { + _, err := graph.Fit(dataFlow[innodes[0]], dataInCondition) + if err != nil { + return nil, fmt.Errorf("node %s output does not match node %s input: %v", innodes[0], n, err) + } + } else { + return nil, fmt.Errorf("operator %s of type %s has no input", n, gn.NodeType) + } + out := nodeIO[1] + in := dataFlow[innodes[0]] + dataFlow[n] = graph.MapOut(in, out) + // convert filter to having if the input is aggregated + if gn.NodeType == "filter" && in.Type == graph.IOINPUT_TYPE_COLLECTION && in.CollectionType == graph.IOCOLLECTION_TYPE_GROUPED { + fop, err := parseHaving(gn.Props, sourceNames) + if err != nil { + return nil, err + } + op := Transform(fop, n, rule.Options) + nodeMap[n] = op + } + } + } + // add the linkages + for nodeName, fromNodes := range reversedEdges { + totalLen := 0 + for _, fromNode := range fromNodes { + totalLen += len(fromNode) + } + inputs := make([]node.Emitter, 0, totalLen) + for i, fromNode := range fromNodes { + for _, from := range fromNode { + if i == 0 { + if src, ok := nodeMap[from].(node.Emitter); ok { + inputs = append(inputs, src) + } + } else { + switch sn := nodeMap[from].(type) { + case *node.SwitchNode: + inputs = append(inputs, sn.GetEmitter(i)) + default: + return nil, fmt.Errorf("node %s is not a switch node but have multiple output", from) + } + } + } + } + n := nodeMap[nodeName] + if n == nil { + return nil, fmt.Errorf("node %s is not defined", nodeName) + } + if _, ok := sinks[nodeName]; ok { + switch nt := n.(type) { + case node.CompNode: + PlanSinkOps(tp, inputs, nt) + default: + tp.AddSink(inputs, nt.(*node.SinkNode)) + } + } else { + tp.AddOperator(inputs, n.(node.OperatorNode)) + } + } + return tp, nil +} + +func genNodesInOrder(toNodes []string, edges map[string][]interface{}, flatReversedEdges map[string][]string, nodesInOrder []string, i int) int { + for _, src := range toNodes { + if len(flatReversedEdges[src]) > 1 { + flatReversedEdges[src] = flatReversedEdges[src][1:] + continue + } + nodesInOrder[i] = src + i++ + tns := make([]string, 0, len(edges[src])) + for _, toNode := range edges[src] { + switch toNode.(type) { + case string: + tns = append(tns, toNode.(string)) + case []interface{}: + for _, tni := range toNode.([]interface{}) { + tns = append(tns, tni.(string)) + } + } + } + i = genNodesInOrder(tns, edges, flatReversedEdges, nodesInOrder, i) + } + return i +} + +func parseSource(nodeName string, gn *def.GraphNode, rule *def.Rule, tp *topo.Topo, store kv.KeyValue, lookupTableChildren map[string]*ast.Options) (node.DataSourceNode, sourceType, string, []node.OperatorNode, error) { + sourceMeta := &def.SourceMeta{ + SourceType: "stream", + } + err := cast.MapToStruct(gn.Props, sourceMeta) + if err != nil { + return nil, ILLEGAL, "", nil, err + } + if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" { + return nil, ILLEGAL, "", nil, fmt.Errorf("source type %s not supported", sourceMeta.SourceType) + } + // If source name is specified, find the created stream/table from store + if sourceMeta.SourceName != "" { + if store == nil { + store, err = store2.GetKV("stream") + if err != nil { + return nil, ILLEGAL, "", nil, err + } + } + streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName) + if e != nil { + return nil, ILLEGAL, "", nil, fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName) + } + if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" { + return nil, ILLEGAL, "", nil, fmt.Errorf("stream %s is not a table", sourceMeta.SourceName) + } else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" { + return nil, ILLEGAL, "", nil, fmt.Errorf("table %s is not a stream", sourceMeta.SourceName) + } + st := streamStmt.Options.TYPE + if st == "" { + st = "mqtt" + } + if st != gn.NodeType { + return nil, ILLEGAL, "", nil, fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st) + } + sInfo, err := convertStreamInfo(streamStmt) + if err != nil { + return nil, ILLEGAL, "", nil, err + } + if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup { + lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options + return nil, LOOKUPTABLE, string(sInfo.stmt.Name), nil, nil + } else { + // Use the plan to calculate the schema and other meta info + p := DataSourcePlan{ + name: sInfo.stmt.Name, + streamStmt: sInfo.stmt, + streamFields: sInfo.schema.ToJsonSchema(), + isSchemaless: sInfo.schema == nil, + iet: rule.Options.IsEventTime, + allMeta: rule.Options.SendMetaToSink, + }.Init() + + if sInfo.stmt.StreamType == ast.TypeStream { + err = p.PruneColumns(nil) + if err != nil { + return nil, ILLEGAL, "", nil, err + } + srcNode, ops, _, e := transformSourceNode(tp.GetContext(), p, nil, rule.Id, rule.Options, 1) + if e != nil { + return nil, ILLEGAL, "", nil, e + } + return srcNode, STREAM, string(sInfo.stmt.Name), ops, nil + } else { + return nil, SCANTABLE, string(sInfo.stmt.Name), nil, nil + } + } + } else { + sourceOption := &ast.Options{} + err = cast.MapToStruct(gn.Props, sourceOption) + if err != nil { + return nil, ILLEGAL, "", nil, err + } + sourceOption.TYPE = gn.NodeType + if sourceOption.SCHEMAID == "" && gn.Props["schemaName"] != nil && gn.Props["schemaMessage"] != nil { + schemaName, ok1 := gn.Props["schemaName"].(string) + schemaMessage, ok2 := gn.Props["schemaMessage"].(string) + if ok1 && ok2 { + sourceOption.SCHEMAID = schemaName + "." + schemaMessage + } + } + p := DataSourcePlan{ + name: ast.StreamName(nodeName), + streamStmt: &ast.StreamStmt{ + Name: ast.StreamName(nodeName), + StreamFields: nil, + Options: sourceOption, + }, + streamFields: nil, + isSchemaless: true, + iet: rule.Options.IsEventTime, + allMeta: rule.Options.SendMetaToSink, + timestampField: sourceOption.TIMESTAMP, + timestampFormat: sourceOption.TIMESTAMP_FORMAT, + }.Init() + srcNode, ops, _, e := transformSourceNode(tp.GetContext(), p, nil, rule.Id, rule.Options, 1) + if e != nil { + return nil, ILLEGAL, "", nil, e + } + return srcNode, STREAM, nodeName, ops, nil + } +} + +func parseOrderBy(props map[string]interface{}, sourceNames []string) (*operator.OrderOp, error) { + n := &graph.Orderby{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + stmt := "SELECT * FROM unknown ORDER BY" + for _, s := range n.Sorts { + stmt += " " + s.Field + " " + if s.Desc { + stmt += "DESC" + } + } + p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse() + if err != nil { + return nil, fmt.Errorf("invalid order by statement error: %v", err) + } + if len(p.SortFields) == 0 { + return nil, fmt.Errorf("order by statement is empty") + } + return &operator.OrderOp{ + SortFields: p.SortFields, + }, nil +} + +func parseGroupBy(props map[string]interface{}, sourceNames []string) (*operator.AggregateOp, error) { + n := &graph.Groupby{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if len(n.Dimensions) == 0 { + return nil, fmt.Errorf("groupby must have at least one dimension") + } + stmt := "SELECT * FROM unknown Group By " + strings.Join(n.Dimensions, ",") + p, err := xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse() + if err != nil { + return nil, fmt.Errorf("invalid join statement error: %v", err) + } + return &operator.AggregateOp{Dimensions: p.Dimensions}, nil +} + +func parseJoinAst(props map[string]interface{}, sourceNames []string) (*ast.SelectStatement, error) { + n := &graph.Join{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + stmt := "SELECT * FROM " + n.From + for _, join := range n.Joins { + stmt += " " + join.Type + " JOIN " + join.Name + " ON " + join.On + } + return xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse() +} + +func parseWatermark(props map[string]interface{}, streamEmitters map[string]struct{}) (*graph.Watermark, error) { + n := &graph.Watermark{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if len(n.Emitters) == 0 { + return nil, fmt.Errorf("watermark must have at least one emitter") + } + for _, e := range n.Emitters { + if _, ok := streamEmitters[e]; !ok { + return nil, fmt.Errorf("emitter %s does not exist", e) + } + } + return n, nil +} + +func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) { + n := &graph.Window{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if n.Size <= 0 { + return nil, fmt.Errorf("window size %d is invalid", n.Size) + } + var ( + wt ast.WindowType + length time.Duration + interval time.Duration + countLength int + countInterval int + rawInterval int + ) + switch strings.ToLower(n.Type) { + case "tumblingwindow": + wt = ast.TUMBLING_WINDOW + if n.Interval != 0 && n.Interval != n.Size { + return nil, fmt.Errorf("tumbling window interval must equal to size") + } + rawInterval = n.Size + case "hoppingwindow": + wt = ast.HOPPING_WINDOW + if n.Interval <= 0 { + return nil, fmt.Errorf("hopping window interval must be greater than 0") + } + if n.Interval > n.Size { + return nil, fmt.Errorf("hopping window interval must be less than size") + } + rawInterval = n.Interval + case "sessionwindow": + wt = ast.SESSION_WINDOW + if n.Interval <= 0 { + return nil, fmt.Errorf("hopping window interval must be greater than 0") + } + rawInterval = n.Size + case "slidingwindow": + wt = ast.SLIDING_WINDOW + if n.Interval != 0 && n.Interval != n.Size { + return nil, fmt.Errorf("tumbling window interval must equal to size") + } + case "countwindow": + wt = ast.COUNT_WINDOW + if n.Interval < 0 { + return nil, fmt.Errorf("count window interval must be greater or equal to 0") + } + if n.Interval > n.Size { + return nil, fmt.Errorf("count window interval must be less than size") + } + if n.Interval == 0 { + n.Interval = n.Size + } + default: + return nil, fmt.Errorf("unknown window type %s", n.Type) + } + var timeUnit ast.Token + if wt == ast.COUNT_WINDOW { + countLength = n.Size + countInterval = n.Interval + } else { + var unit time.Duration + switch strings.ToLower(n.Unit) { + case "dd": + unit = 24 * time.Hour + timeUnit = ast.DD + case "hh": + unit = time.Hour + timeUnit = ast.HH + case "mi": + unit = time.Minute + timeUnit = ast.MI + case "ss": + unit = time.Second + timeUnit = ast.SS + case "ms": + unit = time.Millisecond + timeUnit = ast.MS + default: + return nil, fmt.Errorf("Invalid unit %s", n.Unit) + } + length = time.Duration(n.Size) * unit + interval = time.Duration(n.Interval) * unit + } + return &node.WindowConfig{ + RawInterval: rawInterval, + Type: wt, + Length: length, + Interval: interval, + CountLength: countLength, + CountInterval: countInterval, + TimeUnit: timeUnit, + }, nil +} + +func parsePick(props map[string]interface{}, sourceNames []string) (*operator.ProjectOp, error) { + n := &graph.Select{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+strings.Join(n.Fields, ",")+" from nonexist"), sourceNames).Parse() + if err != nil { + return nil, err + } + t := ProjectPlan{ + fields: stmt.Fields, + isAggregate: n.IsAgg, + }.Init() + return &operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, ExceptNames: t.exceptNames, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, nil +} + +func parseFunc(props map[string]interface{}, sourceNames []string) (*operator.FuncOp, error) { + m, ok := props["expr"] + if !ok { + return nil, errors.New("no expr") + } + funcExpr, ok := m.(string) + if !ok { + return nil, fmt.Errorf("expr %v is not string", m) + } + stmt, err := xsql.NewParserWithSources(strings.NewReader("select "+funcExpr+" from nonexist"), sourceNames).Parse() + if err != nil { + return nil, err + } + f := stmt.Fields[0] + c, ok := f.Expr.(*ast.Call) + if !ok { + // never happen + return nil, fmt.Errorf("expr %s is not ast.Call", funcExpr) + } + var name string + if f.AName != "" { + name = f.AName + } else { + name = f.Name + } + return &operator.FuncOp{CallExpr: c, Name: name, IsAgg: function.IsAggFunc(name)}, nil +} + +func parseFilter(props map[string]interface{}, sourceNames []string) (*operator.FilterOp, error) { + m, ok := props["expr"] + if !ok { + return nil, errors.New("no expr") + } + conditionExpr, ok := m.(string) + if !ok { + return nil, fmt.Errorf("expr %v is not string", m) + } + p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames) + if exp, err := p.ParseCondition(); err != nil { + return nil, err + } else { + if exp != nil { + return &operator.FilterOp{Condition: exp}, nil + } + } + return nil, fmt.Errorf("expr %v is not a condition", m) +} + +func parseHaving(props map[string]interface{}, sourceNames []string) (*operator.HavingOp, error) { + m, ok := props["expr"] + if !ok { + return nil, errors.New("no expr") + } + conditionExpr, ok := m.(string) + if !ok { + return nil, fmt.Errorf("expr %v is not string", m) + } + p := xsql.NewParserWithSources(strings.NewReader("where "+conditionExpr), sourceNames) + if exp, err := p.ParseCondition(); err != nil { + return nil, err + } else { + if exp != nil { + return &operator.HavingOp{Condition: exp}, nil + } + } + return nil, fmt.Errorf("expr %v is not a condition", m) +} + +func parseSwitch(props map[string]interface{}, sourceNames []string) (*node.SwitchConfig, error) { + n := &graph.Switch{} + err := cast.MapToStruct(props, n) + if err != nil { + return nil, err + } + if len(n.Cases) == 0 { + return nil, fmt.Errorf("switch node must have at least one case") + } + caseExprs := make([]ast.Expr, len(n.Cases)) + for i, c := range n.Cases { + p := xsql.NewParserWithSources(strings.NewReader("where "+c), sourceNames) + if exp, err := p.ParseCondition(); err != nil { + return nil, fmt.Errorf("parse case %d error: %v", i, err) + } else { + if exp != nil { + caseExprs[i] = exp + } + } + } + return &node.SwitchConfig{ + Cases: caseExprs, + StopAtFirstMatch: n.StopAtFirstMatch, + }, nil +} diff --git a/internal/topo/planner/planner_graph_test.go b/internal/topo/planner/planner_graph_test.go new file mode 100644 index 0000000000..3a3ac821bb --- /dev/null +++ b/internal/topo/planner/planner_graph_test.go @@ -0,0 +1,998 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner + +import ( + "encoding/json" + "fmt" + "reflect" + "testing" + "time" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + "github.com/lf-edge/ekuiper/v2/internal/pkg/store" + "github.com/lf-edge/ekuiper/v2/internal/testx" + "github.com/lf-edge/ekuiper/v2/internal/xsql" + "github.com/lf-edge/ekuiper/v2/pkg/ast" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +func TestPlannerGraphValidate(t *testing.T) { + tests := []struct { + graph string + err string + }{ + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "myfilter": { + "type": "operator", + "nodeType": "filter", + "props": { + "expr": "temperature > 20" + } + }, + "logfunc": { + "type": "operator", + "nodeType": "function", + "props": { + "expr": "log(temperature) as log_temperature" + } + }, + "sinfunc": { + "type": "operator", + "nodeType": "function", + "props": { + "expr": "sin(temperature) as sin_temperature" + } + }, + "pick": { + "type": "operator", + "nodeType": "pick", + "props": { + "fields": [ + "log_temperature", + "humidity" + ] + } + }, + "mqttpv": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result", + "sendSingle": true + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc" + ], + "edges": { + "abc": [ + "myfilter", + "sinfunc" + ], + "myfilter": [ + "logfunc" + ], + "logfunc": [ + "pick" + ], + "pick": [ + "mqttpv" + ], + "sinfunc": [ + "mqtt2" + ] + } + } +}`, + err: "", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc" + ], + "edges": { + "abc": [ + "myfilter" + ] + } + } +}`, + err: "node myfilter is not defined", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc" + ], + "edges": { + } + } +}`, + err: "no edge defined for source node abc", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "aggfunc": { + "type": "operator", + "nodeType": "aggfunc", + "props": { + "expr": "avg(temperature) as avg_temperature" + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc" + ], + "edges": { + "abc": ["aggfunc"], + "aggfunc": ["mqtt2"] + } + } +}`, + err: "node abc output does not match node aggfunc input: input type mismatch, expect collection, got row", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "abc2": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo1" + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "abc", + "joins": [ + { + "name": "abc2", + "type": "inner", + "on": "abc.id = abc2.id" + } + ] + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc","abc2" + ], + "edges": { + "abc": ["joinop"], + "abc2": ["joinop"], + "joinop": ["mqtt2"] + } + } +}`, + err: "join node joinop does not allow multiple stream inputs", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "abc2": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo1" + } + }, + "windowop": { + "type": "operator", + "nodeType": "window", + "props": { + "type": "hoppingwindow", + "unit": "ss", + "size": 10, + "interval": 5 + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "abc", + "joins": [ + { + "name": "abc2", + "type": "inner", + "on": "abc.id = abc2.id" + } + ] + } + }, + "groupop": { + "type": "operator", + "nodeType": "groupby", + "props": { + "dimensions": ["id","userId"] + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc","abc2" + ], + "edges": { + "abc": ["windowop"], + "abc2": ["windowop"], + "windowop": ["joinop"], + "joinop": ["groupop"], + "groupop": ["mqtt2"] + } + } +}`, + err: "", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "abc2": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo1" + } + }, + "windowop": { + "type": "operator", + "nodeType": "window", + "props": { + "type": "hoppingwindow", + "unit": "ss", + "size": 10, + "interval": 5 + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "abc", + "joins": [ + { + "name": "abc2", + "type": "inner", + "on": "abc.id = abc2.id" + } + ] + } + }, + "groupop": { + "type": "operator", + "nodeType": "groupby", + "props": { + "dimensions": ["id","userId"] + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc","abc2" + ], + "edges": { + "abc": ["windowop"], + "abc2": ["windowop"], + "windowop": ["groupop"], + "joinop": ["mqtt2"], + "groupop": ["joinop"] + } + } +}`, + err: "node groupop output does not match node joinop input: collection type mismatch, expect non-grouped collection, got grouped collection", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "abc2": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo1" + } + }, + "windowop": { + "type": "operator", + "nodeType": "window", + "props": { + "type": "hoppingwindow", + "unit": "ss", + "size": 10, + "interval": 5 + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "abc", + "joins": [ + { + "name": "abc2", + "type": "inner", + "on": "abc.id = abc2.id" + } + ] + } + }, + "groupop": { + "type": "operator", + "nodeType": "groupby", + "props": { + "dimensions": ["id","userId"] + } + }, + "aggfunc": { + "type": "operator", + "nodeType": "aggFunc", + "props": { + "expr": "avg(temperature) as avg_temperature" + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc","abc2" + ], + "edges": { + "abc": ["windowop"], + "abc2": ["windowop"], + "windowop": ["groupop"], + "joinop": ["mqtt2"], + "groupop": ["aggfunc"], + "aggfunc": ["joinop"] + } + } +}`, + err: "node aggfunc output does not match node joinop input: collection type mismatch, expect non-grouped collection, got grouped collection", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "aggfunc": { + "type": "operator", + "nodeType": "aggfunc", + "props": { + "expr": "avg(,temperature) as avg_temperature" + } + }, + "mqtt2": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result2", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc" + ], + "edges": { + "abc": ["aggfunc"], + "aggfunc": ["mqtt2"] + } + } +}`, + err: "parse aggfunc aggfunc with map[expr:avg(,temperature) as avg_temperature] error: found \",\", expected expression.", + }, + { + graph: `{ + "nodes": { + "abc": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo" + } + }, + "myfilter": { + "type": "operator", + "nodeType": "filter", + "props": { + "expr": "data.nested.temperature > 20" + } + }, + "mqttpv": { + "type": "sink", + "nodeType": "mqtt", + "props": { + "server": "tcp://syno.home:1883", + "topic": "result", + "sendSingle": true + } + } + }, + "topo": { + "sources": [ + "abc" + ], + "edges": { + "abc": [ + "myfilter" + ], + "myfilter": [ + "mqttpv" + ] + } + } +}`, + err: "", + }, + { + graph: `{ + "nodes": { + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + }, + "mqtt": { + "type": "source", + "nodeType": "mqtt", + "props": { + "datasource": "demo", + "format": "json", + "shared": false + } + } + }, + "topo": { + "sources": [ + "mqtt" + ], + "edges": { + "mqtt": [ + "log" + ] + } + } + }`, + err: "", + }, + } + + t.Logf("The test bucket size is %d.\n\n", len(tests)) + for i, tt := range tests { + rg := &def.RuleGraph{} + err := json.Unmarshal([]byte(tt.graph), rg) + if err != nil { + t.Error(err) + continue + } + _, err = PlanByGraph(&def.Rule{ + Triggered: false, + Id: fmt.Sprintf("rule%d", i), + Name: fmt.Sprintf("rule%d", i), + Graph: rg, + Options: &def.RuleOption{ + IsEventTime: false, + LateTol: 1000, + Concurrency: 1, + BufferLength: 1024, + SendMetaToSink: false, + SendError: true, + Qos: def.AtMostOnce, + CheckpointInterval: cast.DurationConf(30 * time.Second), + }, + }) + if !reflect.DeepEqual(tt.err, testx.Errstring(err)) { + t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, err) + } + } +} + +func TestPlannerGraphWithStream(t *testing.T) { + store, err := store.GetKV("stream") + if err != nil { + t.Error(err) + return + } + streamSqls := map[string]string{ + "src1": `CREATE STREAM src1 ( + id1 BIGINT, + temp BIGINT, + name string, + myarray array(string) + ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`, + "src2": `CREATE STREAM src2 ( + id2 BIGINT, + hum BIGINT + ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`, + "tableInPlanner": `CREATE TABLE tableInPlanner ( + id BIGINT, + name STRING, + value STRING, + hum BIGINT + ) WITH (TYPE="file");`, + "lookupT": `CREATE TABLE lookupT () WITH (DATASOURCE="alertVal", TYPE="memory", KIND="lookup", KEY="id");`, + } + types := map[string]ast.StreamType{ + "src1": ast.TypeStream, + "src2": ast.TypeStream, + "tableInPlanner": ast.TypeTable, + "lookupT": ast.TypeTable, + } + for name, sql := range streamSqls { + s, err := json.Marshal(&xsql.StreamInfo{ + StreamType: types[name], + Statement: sql, + }) + if err != nil { + t.Error(err) + t.Fail() + } + err = store.Set(name, string(s)) + if err != nil { + t.Error(err) + t.Fail() + } + } + testCases := []struct { + name string + graph string + err error + }{ + { + name: "test stream", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "mqtt", + "props": { + "sourceType": "stream", + "sourceName": "src1" + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo"], + "edges": { + "demo": ["log"] + } + } +}`, + err: nil, + }, + { + name: "stream type wrong", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "file", + "props": { + "sourceType": "stream", + "sourceName": "src1" + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo"], + "edges": { + "demo": ["log"] + } + } +}`, + err: fmt.Errorf("parse source demo with map[sourceName:src1 sourceType:stream] error: source type file does not match the stream type mqtt"), + }, + { + name: "non exist stream", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "mqtt", + "props": { + "sourceType": "stream", + "sourceName": "unknown" + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo"], + "edges": { + "demo": ["log"] + } + } +}`, + err: fmt.Errorf("parse source demo with map[sourceName:unknown sourceType:stream] error: fail to get stream unknown, please check if stream is created"), + }, + { + name: "wrong source type", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "mqtt", + "props": { + "sourceType": "stream", + "sourceName": "tableInPlanner" + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo"], + "edges": { + "demo": ["log"] + } + } +}`, + err: fmt.Errorf("parse source demo with map[sourceName:tableInPlanner sourceType:stream] error: table tableInPlanner is not a stream"), + }, + { + name: "stream and table", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "mqtt", + "props": { + "sourceType": "stream", + "sourceName": "src1" + } + }, + "lookupT":{ + "type": "source", + "nodeType": "memory", + "props": { + "sourceType": "table", + "sourceName": "lookupT" + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "src1", + "joins": [ + { + "name": "lookupT", + "type": "inner", + "on": "src1.deviceKind = lookupT.id" + } + ] + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo", "lookupT"], + "edges": { + "demo": ["joinop"], + "lookupT": ["joinop"], + "joinop": ["log"] + } + } +}`, + err: nil, + }, + { + name: "wrong join stream name", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "mqtt", + "props": { + "sourceType": "stream", + "sourceName": "src1" + } + }, + "lookupT":{ + "type": "source", + "nodeType": "memory", + "props": { + "sourceType": "table", + "sourceName": "lookupT" + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "demo", + "joins": [ + { + "name": "lookupT", + "type": "inner", + "on": "demo.deviceKind = lookupT.id" + } + ] + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo", "lookupT"], + "edges": { + "demo": ["joinop"], + "lookupT": ["joinop"], + "joinop": ["log"] + } + } +}`, + err: fmt.Errorf("parse join joinop with map[from:demo joins:[map[name:lookupT on:demo.deviceKind = lookupT.id type:inner]]] error: join source demo is not a stream"), + }, + { + name: "stream and scan table", + graph: `{ + "nodes": { + "demo": { + "type": "source", + "nodeType": "mqtt", + "props": { + "sourceType": "stream", + "sourceName": "src1" + } + }, + "lookupT":{ + "type": "source", + "nodeType": "file", + "props": { + "sourceType": "table", + "sourceName": "tableInPlanner" + } + }, + "joinop": { + "type": "operator", + "nodeType": "join", + "props": { + "from": "src1", + "joins": [ + { + "name": "lookupT", + "type": "inner", + "on": "demo.deviceKind = lookupT.id" + } + ] + } + }, + "log": { + "type": "sink", + "nodeType": "log", + "props": {} + } + }, + "topo": { + "sources": ["demo", "lookupT"], + "edges": { + "demo": ["joinop"], + "lookupT": ["joinop"], + "joinop": ["log"] + } + } +}`, + err: fmt.Errorf("parse join joinop with map[from:src1 joins:[map[name:lookupT on:demo.deviceKind = lookupT.id type:inner]]] error: do not support scan table [tableInPlanner] yet"), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rg := &def.RuleGraph{} + err := json.Unmarshal([]byte(tc.graph), rg) + if err != nil { + t.Error(err) + return + } + _, err = PlanByGraph(&def.Rule{ + Triggered: false, + Id: "test", + Graph: rg, + Options: &def.RuleOption{ + IsEventTime: false, + LateTol: 1000, + Concurrency: 1, + BufferLength: 1024, + SendMetaToSink: false, + SendError: true, + Qos: def.AtMostOnce, + CheckpointInterval: cast.DurationConf(30 * time.Second), + }, + }) + if tc.err == nil { + if err != nil { + t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", tc.err, err) + } + return + } + if !reflect.DeepEqual(tc.err.Error(), err.Error()) { + t.Errorf("error mismatch:\n exp=%s\n got=%s\n\n", tc.err, err) + } + }) + } +} diff --git a/internal/topo/planner/planner_sink.go b/internal/topo/planner/planner_sink.go index add7dbcbc6..2d49f6f47a 100644 --- a/internal/topo/planner/planner_sink.go +++ b/internal/topo/planner/planner_sink.go @@ -33,83 +33,116 @@ import ( func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int) error { for i, m := range rule.Actions { for name, action := range m { - s, _ := io.Sink(name) - if s == nil { - return fmt.Errorf("sink %s is not defined", name) - } props, ok := action.(map[string]any) if !ok { return fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action) } - commonConf, err := node.ParseConf(tp.GetContext().GetLogger(), props) - if err != nil { - return fmt.Errorf("fail to parse sink configuration: %v", err) - } - templates := findTemplateProps(props) - // Split sink node sinkName := fmt.Sprintf("%s_%d", name, i) - newInputs, err := splitSink(tp, inputs, s, sinkName, rule.Options, commonConf, templates) - if err != nil { - return err - } - if err = s.Provision(tp.GetContext(), props); err != nil { - return err - } - tp.GetContext().GetLogger().Infof("provision sink %s with props %+v", sinkName, props) - - var snk node.DataSinkNode - switch ss := s.(type) { - case api.BytesCollector: - snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, false) - case api.TupleCollector: - snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, false) - default: - err = fmt.Errorf("sink type %s does not implement any collector", name) - } + cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount) if err != nil { return err } - tp.AddSink(newInputs, snk) - // Cache in alter queue, the topo becomes sink (fail) -> cache -> resendSink - // If no alter queue, the topo is cache -> sink - if commonConf.EnableCache && commonConf.ResendAlterQueue { - s, _ := io.Sink(name) - // TODO currently, the destination prop must be named topic - if commonConf.ResendDestination != "" { - props["topic"] = commonConf.ResendDestination - } - if err = s.Provision(tp.GetContext(), props); err != nil { - return err - } - tp.GetContext().GetLogger().Infof("provision sink %s with props %+v", sinkName, props) - - cacheOp, err := node.NewCacheOp(tp.GetContext(), fmt.Sprintf("%s_cache", sinkName), rule.Options, &commonConf.SinkConf) - if err != nil { - return err - } - tp.AddSinkAlterOperator(snk.(*node.SinkNode), cacheOp) - newInputs = []node.Emitter{cacheOp} - - sinkName := fmt.Sprintf("%s_resend_%d", name, i) - var snk node.DataSinkNode - switch ss := s.(type) { - case api.BytesCollector: - snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, true) - case api.TupleCollector: - snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, true) - default: - err = fmt.Errorf("sink type %s does not implement any collector", name) - } - if err != nil { - return err - } - tp.AddSink(newInputs, snk) - } + PlanSinkOps(tp, inputs, cn) } } return nil } +func PlanSinkOps(tp *topo.Topo, inputs []node.Emitter, cn node.CompNode) { + newInputs := inputs + var preSink node.DataSinkNode + for _, n := range cn.Nodes() { + switch nt := n.(type) { + // The case order is important, because sink node is also operator node + case *node.SinkNode: + preSink = nt + tp.AddSink(newInputs, nt) + case node.OperatorNode: + if preSink != nil { // resend + tp.AddSinkAlterOperator(preSink.(*node.SinkNode), nt) + preSink = nil + } else { + tp.AddOperator(newInputs, nt) + } + newInputs = []node.Emitter{nt} + } + } +} + +func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int) (node.CompNode, error) { + s, _ := io.Sink(sinkType) + if s == nil { + return nil, fmt.Errorf("sink %s is not defined", sinkType) + } + commonConf, err := node.ParseConf(tp.GetContext().GetLogger(), props) + if err != nil { + return nil, fmt.Errorf("fail to parse sink configuration: %v", err) + } + templates := findTemplateProps(props) + // Split sink node + sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates) + if err != nil { + return nil, err + } + if err = s.Provision(tp.GetContext(), props); err != nil { + return nil, err + } + tp.GetContext().GetLogger().Infof("provision sink %s with props %+v", sinkName, props) + + result := &SinkCompNode{ + name: sinkName, + nodes: sinkOps, + } + var snk node.DataSinkNode + switch ss := s.(type) { + case api.BytesCollector: + snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, false) + case api.TupleCollector: + snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, false) + default: + err = fmt.Errorf("sink type %s does not implement any collector", sinkType) + } + if err != nil { + return nil, err + } + result.nodes = append(result.nodes, snk) + // Cache in alter queue, the topo becomes sink (fail) -> cache -> resendSink + // If no alter queue, the topo is cache -> sink + if commonConf.EnableCache && commonConf.ResendAlterQueue { + s, _ := io.Sink(sinkType) + // TODO currently, the destination prop must be named topic + if commonConf.ResendDestination != "" { + props["topic"] = commonConf.ResendDestination + } + if err = s.Provision(tp.GetContext(), props); err != nil { + return nil, err + } + tp.GetContext().GetLogger().Infof("provision sink %s with props %+v", sinkName, props) + + cacheOp, err := node.NewCacheOp(tp.GetContext(), fmt.Sprintf("%s_cache", sinkName), rule.Options, &commonConf.SinkConf) + if err != nil { + return nil, err + } + result.nodes = append(result.nodes, cacheOp) + + sinkName := fmt.Sprintf("%s_resend", sinkName) + var snk node.DataSinkNode + switch ss := s.(type) { + case api.BytesCollector: + snk, err = node.NewBytesSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, true) + case api.TupleCollector: + snk, err = node.NewTupleSinkNode(tp.GetContext(), sinkName, ss, *rule.Options, streamCount, &commonConf.SinkConf, true) + default: + err = fmt.Errorf("sink type %s does not implement any collector", sinkType) + } + if err != nil { + return nil, err + } + result.nodes = append(result.nodes, snk) + } + return result, nil +} + func findTemplateProps(props map[string]any) []string { var result []string re := regexp.MustCompile(`{{(.*?)}}`) @@ -128,9 +161,9 @@ func findTemplateProps(props map[string]any) []string { } // Split sink node according to the sink configuration. Return the new input emitters. -func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.Emitter, error) { +func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) { index := 0 - newInputs := inputs + result := make([]node.TopNode, 0) // Batch enabled if sc.BatchSize > 0 || sc.LingerInterval > 0 { batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval)) @@ -138,8 +171,7 @@ func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string return nil, err } index++ - tp.AddOperator(newInputs, batchOp) - newInputs = []node.Emitter{batchOp} + result = append(result, batchOp) } // Transform enabled // Currently, the row to map is done here and is required. TODO: eliminate map and this could become optional @@ -148,8 +180,7 @@ func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string return nil, err } index++ - tp.AddOperator(newInputs, transformOp) - newInputs = []node.Emitter{transformOp} + result = append(result, transformOp) // Encode will convert the result to []byte if _, ok := s.(api.BytesCollector); ok { encodeOp, err := node.NewEncodeOp(tp.GetContext(), fmt.Sprintf("%s_%d_encode", sinkName, index), options, sc) @@ -157,8 +188,7 @@ func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string return nil, err } index++ - tp.AddOperator(newInputs, encodeOp) - newInputs = []node.Emitter{encodeOp} + result = append(result, encodeOp) _, isStreamWriter := s.(model.StreamWriter) if !isStreamWriter && sc.Compression != "" { compressOp, err := node.NewCompressOp(fmt.Sprintf("%s_%d_compress", sinkName, index), options, sc.Compression) @@ -166,8 +196,7 @@ func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string return nil, err } index++ - tp.AddOperator(newInputs, compressOp) - newInputs = []node.Emitter{compressOp} + result = append(result, compressOp) } if !isStreamWriter && sc.Encryption != "" { @@ -176,8 +205,7 @@ func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string return nil, err } index++ - tp.AddOperator(newInputs, encryptOp) - newInputs = []node.Emitter{encryptOp} + result = append(result, encryptOp) } } // Caching @@ -187,8 +215,22 @@ func splitSink(tp *topo.Topo, inputs []node.Emitter, s api.Sink, sinkName string return nil, err } index++ - tp.AddOperator(newInputs, cacheOp) - newInputs = []node.Emitter{cacheOp} + result = append(result, cacheOp) } - return newInputs, nil + return result, nil } + +type SinkCompNode struct { + name string + nodes []node.TopNode +} + +func (s *SinkCompNode) GetName() string { + return s.name +} + +func (s *SinkCompNode) Nodes() []node.TopNode { + return s.nodes +} + +var _ node.CompNode = &SinkCompNode{} diff --git a/internal/topo/planner/planner_sink_test.go b/internal/topo/planner/planner_sink_test.go index 930916c2e3..2e5fcec581 100644 --- a/internal/topo/planner/planner_sink_test.go +++ b/internal/topo/planner/planner_sink_test.go @@ -241,7 +241,7 @@ func TestSinkPlan(t *testing.T) { "op_log_0_cache", }, "op_log_0_cache": { - "sink_log_resend_0", + "sink_log_0_resend", }, }, }, diff --git a/test/run_jmeter.sh b/test/run_jmeter.sh index b6c75febfd..78f65c8294 100755 --- a/test/run_jmeter.sh +++ b/test/run_jmeter.sh @@ -132,23 +132,23 @@ echo -e "---------------------------------------------\n" /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/shared_source_rules.jmx -Dfvt="$fvt_dir" -l jmeter_logs/shared_source_rules.jtl -j jmeter_logs/shared_source_rules.log echo -e "---------------------------------------------\n" -#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_condition_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_condition_rule.jtl -j jmeter_logs/graph_condition_rule.log -#echo -e "---------------------------------------------\n" -# -#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_order_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_order_rule.jtl -j jmeter_logs/graph_group_order_rule.log -#echo -e "---------------------------------------------\n" -# -#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_rule.jtl -j jmeter_logs/graph_group_rule.log -#echo -e "---------------------------------------------\n" -# -#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_join_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_join_rule.jtl -j jmeter_logs/graph_join_rule.log -#echo -e "---------------------------------------------\n" -# -#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_mix_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_mix_rule.jtl -j jmeter_logs/graph_mix_rule.log -#echo -e "---------------------------------------------\n" -# -#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_window_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_window_rule.jtl -j jmeter_logs/graph_window_rule.log -#echo -e "---------------------------------------------\n" +/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_condition_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_condition_rule.jtl -j jmeter_logs/graph_condition_rule.log +echo -e "---------------------------------------------\n" + +/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_order_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_order_rule.jtl -j jmeter_logs/graph_group_order_rule.log +echo -e "---------------------------------------------\n" + +/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_rule.jtl -j jmeter_logs/graph_group_rule.log +echo -e "---------------------------------------------\n" + +/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_join_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_join_rule.jtl -j jmeter_logs/graph_join_rule.log +echo -e "---------------------------------------------\n" + +/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_mix_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_mix_rule.jtl -j jmeter_logs/graph_mix_rule.log +echo -e "---------------------------------------------\n" + +/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_window_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_window_rule.jtl -j jmeter_logs/graph_window_rule.log +echo -e "---------------------------------------------\n" /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_memory.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_memory.jtl -j jmeter_logs/lookup_table_memory.log echo -e "---------------------------------------------\n"