Skip to content

Commit

Permalink
Flow name now passed as a argument
Browse files Browse the repository at this point in the history
this also fix a issue with different flow is listening to same queue

Update README.md
  • Loading branch information
s8sg committed Jul 21, 2020
1 parent f659297 commit 6edf939
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 25 deletions.
37 changes: 34 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Go-Flow
A Golang based workflow framework
A Golang based high performance, scalable and distributed workflow framework

![Build](https://github.com/faasflow/goflow/workflows/GO-Flow-Build/badge.svg)
[![GoDoc](https://godoc.org/github.com/faasflow/goflow?status.svg)](https://godoc.org/github.com/faasflow/goflow)
Expand Down Expand Up @@ -44,7 +44,7 @@ func main() {
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.Start(DefineWorkflow)
fs.Start("my-flow", DefineWorkflow)
}
```

Expand All @@ -60,4 +60,35 @@ curl -d HelloWorld localhost:8080
```

## Scale It
GoFlow scale horizontally, you can distribute the load by just adding more instances
GoFlow scale horizontally, you can distribute the load by just adding more instances.

#### Worker Mode
Alternatively you can start your goflow in worker mode. As a worker goflow only handles the workload,
and if required you can only scale the workers
```go
func main() {
fs := &goflow.FlowService{
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.StartWorker(DefineWorkflow)
}
```

#### Server Mode
Similarly you can start your goflow as a server. As a server goflow only handles the incoming http requests,
and you will need to run workers to handle the workload
```go
func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.StartWorker(DefineWorkflow)
}
```

> By default goflow runs both as a Server and a Worker mode
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/faasflow/faas-flow-redis-datastore v1.0.1-0.20200718081732-431d3cc7894a
github.com/faasflow/faas-flow-redis-statestore v1.0.1-0.20200718082116-d90985fdbde1
github.com/faasflow/lib v1.1.1-0.20200718152038-a5cb46dd9e65
github.com/faasflow/lib v1.1.1-0.20200719042107-174c40f5070b
github.com/faasflow/runtime v0.2.2
github.com/faasflow/sdk v1.0.0
github.com/garyburd/redigo v1.6.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/faasflow/faas-flow-redis-datastore v1.0.1-0.20200718081732-431d3cc789
github.com/faasflow/faas-flow-redis-datastore v1.0.1-0.20200718081732-431d3cc7894a/go.mod h1:vV/vLuH5WGs4T040uqIbhdbQmdImGY7iYVBkqKGeUrc=
github.com/faasflow/faas-flow-redis-statestore v1.0.1-0.20200718082116-d90985fdbde1 h1:k10t8Eq5jley1oVlMK1suXq0mmF9IaqYJm0VnrAHQYQ=
github.com/faasflow/faas-flow-redis-statestore v1.0.1-0.20200718082116-d90985fdbde1/go.mod h1:F2ZiHritsST6NSjqrQVgoU75rWtoimLV7qKXkCmNVgM=
github.com/faasflow/lib v1.1.1-0.20200718152038-a5cb46dd9e65 h1:jVdg6lEezJEnyphhxAbbW/UtJ1XY0TcYUzmY42WLojU=
github.com/faasflow/lib v1.1.1-0.20200718152038-a5cb46dd9e65/go.mod h1:8WerNZmVn5ad2Idf6hufkILrB/xXOJOnraIfykz+2Qc=
github.com/faasflow/lib v1.1.1-0.20200719042107-174c40f5070b h1:7XAaPgB8on80ZJ0dKuhE6D9cUSpjKsAT2hajnW4XbFg=
github.com/faasflow/lib v1.1.1-0.20200719042107-174c40f5070b/go.mod h1:8WerNZmVn5ad2Idf6hufkILrB/xXOJOnraIfykz+2Qc=
github.com/faasflow/runtime v0.2.2 h1:XaKJU9X9DuLuVZhc5Von7R98aw1GgtYjfQNvWQWHma0=
github.com/faasflow/runtime v0.2.2/go.mod h1:fd+6ZuXgYquHpKeaWSwbTWUrJuirfqIrt/Lrm3Rr/kY=
github.com/faasflow/sdk v0.0.0-20200705012738-72f2bcdb62d1/go.mod h1:cpcCvb40uzDNzTT0qxiA6QGuOu8a71LMV2w/ikAW5LU=
Expand Down
19 changes: 16 additions & 3 deletions goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ type FlowService struct {
runtime *runtime.FlowRuntime
}

func (fs *FlowService) Start(handler runtime.FlowDefinitionHandler) error {
func (fs *FlowService) Start(flowName string, handler runtime.FlowDefinitionHandler) error {
if flowName == "" {
return fmt.Errorf("flow-name must not be empty and a unique for each flow")
}

fs.ConfigureDefault()
fs.runtime = &runtime.FlowRuntime{
FlowName: flowName,
Handler: handler,
OpenTracingUrl: fs.OpenTraceUrl,
RedisURL: fs.RedisURL,
Expand All @@ -37,9 +42,13 @@ func (fs *FlowService) Start(handler runtime.FlowDefinitionHandler) error {
return err
}

func (fs *FlowService) StartServer(handler runtime.FlowDefinitionHandler) error {
func (fs *FlowService) StartServer(flowName string, handler runtime.FlowDefinitionHandler) error {
if flowName == "" {
return fmt.Errorf("flow-name must not be empty and a unique for each flow")
}
fs.ConfigureDefault()
fs.runtime = &runtime.FlowRuntime{
FlowName: flowName,
Handler: handler,
OpenTracingUrl: fs.OpenTraceUrl,
RedisURL: fs.RedisURL,
Expand All @@ -50,9 +59,13 @@ func (fs *FlowService) StartServer(handler runtime.FlowDefinitionHandler) error
return fmt.Errorf("server has stopped, error: %v", err)
}

func (fs *FlowService) StartWorker(handler runtime.FlowDefinitionHandler) error {
func (fs *FlowService) StartWorker(flowName string, handler runtime.FlowDefinitionHandler) error {
if flowName == "" {
return fmt.Errorf("flow-name must not be empty and a unique for each flow")
}
fs.ConfigureDefault()
fs.runtime = &runtime.FlowRuntime{
FlowName: flowName,
Handler: handler,
OpenTracingUrl: fs.OpenTraceUrl,
RedisURL: fs.RedisURL,
Expand Down
11 changes: 8 additions & 3 deletions runtime/flow_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

type FlowRuntime struct {
FlowName string
Handler FlowDefinitionHandler
OpenTracingUrl string
RedisURL string
Expand Down Expand Up @@ -92,7 +93,7 @@ func (fRuntime *FlowRuntime) StartQueueWorker(redisUri string, concurrency int)
fRuntime.settings = goworker.WorkerSettings{
URI: redisUri,
Connections: 100,
Queues: []string{PartialRequestQueue},
Queues: []string{fRuntime.queueId()},
UseNumber: true,
ExitOnComplete: false,
Concurrency: concurrency,
Expand All @@ -110,7 +111,7 @@ func (fRuntime *FlowRuntime) EnqueueRequest(pr *runtime.Request) error {
return fmt.Errorf("failed to marshal request while enqueing, error %v", error)
}
return goworker.Enqueue(&goworker.Job{
Queue: PartialRequestQueue,
Queue: fRuntime.queueId(),
Payload: goworker.Payload{
Class: "QueueWorker",
Args: []interface{}{string(encodedRequest)},
Expand All @@ -120,7 +121,7 @@ func (fRuntime *FlowRuntime) EnqueueRequest(pr *runtime.Request) error {

func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) error {
fRuntime.Logger.Log(fmt.Sprintf("Request received by worker at queue %v", queue))
if queue != PartialRequestQueue {
if queue != fRuntime.queueId() {
return nil
}

Expand Down Expand Up @@ -154,3 +155,7 @@ func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) er

return nil
}

func (fRuntime *FlowRuntime) queueId() string {
return fmt.Sprint("%s_%s", PartialRequestQueue, fRuntime.FlowName)
}
20 changes: 7 additions & 13 deletions runtime/new_request_handler_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"regexp"

runtimepkg "github.com/faasflow/runtime"

Expand Down Expand Up @@ -37,7 +36,7 @@ func newRequestHandlerWrapper(runtime runtimepkg.Runtime, handler func(*runtimep
request := &runtimepkg.Request{
Body: body,
Header: req.Header,
FlowName: getWorkflowNameFromHost(req.Host),
FlowName: getFlowName(runtime),
RequestID: id,
Query: reqParams,
RawQuery: req.URL.RawQuery,
Expand Down Expand Up @@ -65,15 +64,10 @@ func newRequestHandlerWrapper(runtime runtimepkg.Runtime, handler func(*runtimep
}
}

// internal

var re = regexp.MustCompile(`(?m)^[^:.]+\s*`)

// getWorkflowNameFromHostFromHost returns the flow name from env
func getWorkflowNameFromHost(host string) string {
matches := re.FindAllString(host, -1)
if matches[0] != "" {
return matches[0]
func getFlowName(runtime runtimepkg.Runtime) string {
fr, ok := runtime.(*FlowRuntime)
if !ok {
return ""
}
return ""
}
return fr.FlowName
}

0 comments on commit 6edf939

Please sign in to comment.