diff --git a/internal/binder/io/builtin.go b/internal/binder/io/builtin.go index eb7cc11371..b2c8be2c1a 100644 --- a/internal/binder/io/builtin.go +++ b/internal/binder/io/builtin.go @@ -23,6 +23,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/io/mqtt" mqttCon "github.com/lf-edge/ekuiper/v2/internal/io/mqtt/client" "github.com/lf-edge/ekuiper/v2/internal/io/neuron" + "github.com/lf-edge/ekuiper/v2/internal/io/simulator" "github.com/lf-edge/ekuiper/v2/internal/io/sink" "github.com/lf-edge/ekuiper/v2/internal/io/websocket" "github.com/lf-edge/ekuiper/v2/pkg/modules" @@ -37,7 +38,7 @@ func init() { modules.RegisterSource("memory", func() api.Source { return memory.GetSource() }) modules.RegisterSource("neuron", neuron.GetSource) modules.RegisterSource("websocket", func() api.Source { return websocket.GetSource() }) - // modules.RegisterSource("simulator", func() api.Source { return &simulator.Source{} }) + modules.RegisterSource("simulator", func() api.Source { return simulator.GetSource() }) modules.RegisterSink("log", sink.NewLogSink) modules.RegisterSink("logToMemory", sink.NewLogSinkToMemory) diff --git a/internal/io/simulator/simulator.go b/internal/io/simulator/simulator.go new file mode 100644 index 0000000000..2c8c28830d --- /dev/null +++ b/internal/io/simulator/simulator.go @@ -0,0 +1,69 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package simulator + +import ( + "fmt" + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +type SimulatorSource struct { + index int + cfg *sConfig +} + +type sConfig struct { + Data []map[string]any `json:"data"` + Loop bool `json:"loop"` +} + +func (s *SimulatorSource) Provision(ctx api.StreamContext, configs map[string]any) error { + cfg := &sConfig{} + if err := cast.MapToStruct(configs, cfg); err != nil { + return err + } + s.cfg = cfg + return nil +} + +func (s SimulatorSource) Close(ctx api.StreamContext) error { + return nil +} + +func (s *SimulatorSource) Connect(ctx api.StreamContext) error { + return nil +} + +func (s *SimulatorSource) Pull(ctx api.StreamContext, trigger time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) { + if s.index >= len(s.cfg.Data) { + if s.cfg.Loop { + s.index = 0 + } else { + ingestError(ctx, fmt.Errorf("simulator source message running out")) + return + } + } + ingest(ctx, s.cfg.Data[s.index], nil, trigger) + s.index++ +} + +func GetSource() api.Source { + return &SimulatorSource{} +} + +var _ api.PullTupleSource = &SimulatorSource{} diff --git a/internal/io/simulator/simulator_test.go b/internal/io/simulator/simulator_test.go new file mode 100644 index 0000000000..4ced096217 --- /dev/null +++ b/internal/io/simulator/simulator_test.go @@ -0,0 +1,72 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package simulator + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/lf-edge/ekuiper/contract/v2/api" + mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" +) + +func TestSourcePull(t *testing.T) { + data := make([]map[string]any, 0) + data = append(data, map[string]any{ + "a": 1, + }) + props1 := map[string]any{ + "data": data, + "loop": true, + } + s1 := &SimulatorSource{} + ctx := mockContext.NewMockContext("1", "2") + require.NoError(t, s1.Provision(ctx, props1)) + require.NoError(t, s1.Connect(ctx)) + recvData := make(chan any, 10) + s1.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) { + recvData <- data + }, func(ctx api.StreamContext, err error) {}) + expData := map[string]any{ + "a": 1, + } + require.Equal(t, expData, <-recvData) + s1.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) { + recvData <- data + }, func(ctx api.StreamContext, err error) {}) + require.Equal(t, expData, <-recvData) + require.NoError(t, s1.Close(ctx)) + + props2 := map[string]any{ + "data": data, + "loop": false, + } + s2 := &SimulatorSource{} + require.NoError(t, s2.Provision(ctx, props2)) + require.NoError(t, s2.Connect(ctx)) + s2.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) { + recvData <- data + }, func(ctx api.StreamContext, err error) {}) + require.Equal(t, expData, <-recvData) + s2.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {}, func(ctx api.StreamContext, err error) { + recvData <- err + }) + expErr := fmt.Errorf("simulator source message running out") + require.Equal(t, expErr, <-recvData) + require.NoError(t, s2.Close(ctx)) +}