Skip to content

Commit

Permalink
feat: support simulator source (#2993)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Jul 8, 2024
1 parent 461b48b commit 22f7daf
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/binder/io/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/io/mqtt"
mqttCon "github.com/lf-edge/ekuiper/v2/internal/io/mqtt/client"
"github.com/lf-edge/ekuiper/v2/internal/io/neuron"
"github.com/lf-edge/ekuiper/v2/internal/io/simulator"
"github.com/lf-edge/ekuiper/v2/internal/io/sink"
"github.com/lf-edge/ekuiper/v2/internal/io/websocket"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
Expand All @@ -37,7 +38,7 @@ func init() {
modules.RegisterSource("memory", func() api.Source { return memory.GetSource() })
modules.RegisterSource("neuron", neuron.GetSource)
modules.RegisterSource("websocket", func() api.Source { return websocket.GetSource() })
// modules.RegisterSource("simulator", func() api.Source { return &simulator.Source{} })
modules.RegisterSource("simulator", func() api.Source { return simulator.GetSource() })

modules.RegisterSink("log", sink.NewLogSink)
modules.RegisterSink("logToMemory", sink.NewLogSinkToMemory)
Expand Down
69 changes: 69 additions & 0 deletions internal/io/simulator/simulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package simulator

import (
"fmt"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

type SimulatorSource struct {
index int
cfg *sConfig
}

type sConfig struct {
Data []map[string]any `json:"data"`
Loop bool `json:"loop"`
}

func (s *SimulatorSource) Provision(ctx api.StreamContext, configs map[string]any) error {
cfg := &sConfig{}
if err := cast.MapToStruct(configs, cfg); err != nil {
return err
}
s.cfg = cfg
return nil
}

func (s SimulatorSource) Close(ctx api.StreamContext) error {
return nil
}

func (s *SimulatorSource) Connect(ctx api.StreamContext) error {
return nil
}

func (s *SimulatorSource) Pull(ctx api.StreamContext, trigger time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
if s.index >= len(s.cfg.Data) {
if s.cfg.Loop {
s.index = 0
} else {
ingestError(ctx, fmt.Errorf("simulator source message running out"))
return
}
}
ingest(ctx, s.cfg.Data[s.index], nil, trigger)
s.index++
}

func GetSource() api.Source {
return &SimulatorSource{}
}

var _ api.PullTupleSource = &SimulatorSource{}
72 changes: 72 additions & 0 deletions internal/io/simulator/simulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package simulator

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/contract/v2/api"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func TestSourcePull(t *testing.T) {
data := make([]map[string]any, 0)
data = append(data, map[string]any{
"a": 1,
})
props1 := map[string]any{
"data": data,
"loop": true,
}
s1 := &SimulatorSource{}
ctx := mockContext.NewMockContext("1", "2")
require.NoError(t, s1.Provision(ctx, props1))
require.NoError(t, s1.Connect(ctx))
recvData := make(chan any, 10)
s1.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
recvData <- data
}, func(ctx api.StreamContext, err error) {})
expData := map[string]any{
"a": 1,
}
require.Equal(t, expData, <-recvData)
s1.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
recvData <- data
}, func(ctx api.StreamContext, err error) {})
require.Equal(t, expData, <-recvData)
require.NoError(t, s1.Close(ctx))

props2 := map[string]any{
"data": data,
"loop": false,
}
s2 := &SimulatorSource{}
require.NoError(t, s2.Provision(ctx, props2))
require.NoError(t, s2.Connect(ctx))
s2.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
recvData <- data
}, func(ctx api.StreamContext, err error) {})
require.Equal(t, expData, <-recvData)
s2.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {}, func(ctx api.StreamContext, err error) {
recvData <- err
})
expErr := fmt.Errorf("simulator source message running out")
require.Equal(t, expErr, <-recvData)
require.NoError(t, s2.Close(ctx))
}

0 comments on commit 22f7daf

Please sign in to comment.