Skip to content

Commit

Permalink
Merge pull request #3 from w-h-a/cleanup
Browse files Browse the repository at this point in the history
refactor: clean up some stuff around stores and brokers
  • Loading branch information
w-h-a authored Aug 27, 2024
2 parents e3c245d + b6f73c0 commit a20b13b
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 70 deletions.
2 changes: 2 additions & 0 deletions cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ var (
RpcAddress = os.Getenv("RPC_ADDRESS")
StoreAddress = os.Getenv("STORE_ADDRESS")
BrokerAddress = os.Getenv("BROKER_ADDRESS")
Store = os.Getenv("STORE")
Stores = Split(os.Getenv("STORES"))
Broker = os.Getenv("BROKER")
Producers = Split(os.Getenv("PRODUCERS"))
Consumers = Split(os.Getenv("CONSUMERS"))
ServiceName = os.Getenv("SERVICE_NAME")
Expand Down
87 changes: 23 additions & 64 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ import (
"github.com/w-h-a/pkg/api"
"github.com/w-h-a/pkg/api/httpapi"
"github.com/w-h-a/pkg/broker"
memorybroker "github.com/w-h-a/pkg/broker/memory"
"github.com/w-h-a/pkg/broker/snssqs"
"github.com/w-h-a/pkg/client/grpcclient"
"github.com/w-h-a/pkg/client/httpclient"
"github.com/w-h-a/pkg/server"
"github.com/w-h-a/pkg/server/grpcserver"
"github.com/w-h-a/pkg/sidecar"
"github.com/w-h-a/pkg/sidecar/custom"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/store/cockroach"
memorystore "github.com/w-h-a/pkg/store/memory"
"github.com/w-h-a/pkg/telemetry/log"
)

Expand All @@ -37,75 +33,38 @@ func run(ctx *cli.Context) {

brokers := map[string]broker.Broker{}

if len(config.Memory) > 0 {
for _, s := range config.Stores {
if len(s) == 0 {
continue
}
st, err := GetStoreBuilder(config.Store)
if err != nil {
log.Fatal(err)
}

stores[s] = memorystore.NewStore(
store.StoreWithTable(s),
)
for _, s := range config.Stores {
if len(s) == 0 {
continue
}

for _, s := range config.Consumers {
if len(s) == 0 {
continue
}

publishOptions := broker.NewPublishOptions(
broker.PublishWithTopic(s),
)
stores[s] = MakeStore(st, []string{config.StoreAddress}, config.ServiceName, s)
}

subscribeOptions := broker.NewSubscribeOptions(
broker.SubscribeWithGroup(s),
)
bk, err := GetBrokerBuilder(config.Broker)
if err != nil {
log.Fatal(err)
}

brokers[s] = memorybroker.NewBroker(
broker.BrokerWithPublishOptions(publishOptions),
broker.BrokerWithSubscribeOptions(subscribeOptions),
)
}
} else {
for _, s := range config.Stores {
if len(s) == 0 {
continue
}

stores[s] = cockroach.NewStore(
store.StoreWithNodes(config.StoreAddress),
store.StoreWithDatabase(config.ServiceName),
store.StoreWithTable(s),
)
for _, s := range config.Producers {
if len(s) == 0 {
continue
}

for _, s := range config.Producers {
if len(s) == 0 {
continue
}

publishOptions := broker.NewPublishOptions(
broker.PublishWithTopic(s),
)
brokers[s] = MakeProducer(bk, []string{config.BrokerAddress}, s)
}

brokers[s] = snssqs.NewBroker(
broker.BrokerWithPublishOptions(publishOptions),
)
for _, s := range config.Consumers {
if len(s) == 0 {
continue
}

for _, s := range config.Consumers {
if len(s) == 0 {
continue
}

subscribeOptions := broker.NewSubscribeOptions(
broker.SubscribeWithGroup(s),
)

brokers[s] = snssqs.NewBroker(
broker.BrokerWithSubscribeOptions(subscribeOptions),
)
}
brokers[s] = MakeConsumer(bk, []string{config.BrokerAddress}, s, len(config.Memory) > 0)
}

// get services
Expand Down Expand Up @@ -187,7 +146,7 @@ func run(ctx *cli.Context) {
}()

// block here
err := <-errCh
err = <-errCh
if err != nil {
log.Errorf("failed to start action: %v", err)
}
Expand Down
81 changes: 81 additions & 0 deletions cmd/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cmd

import (
"fmt"

"github.com/w-h-a/pkg/broker"
memorybroker "github.com/w-h-a/pkg/broker/memory"
"github.com/w-h-a/pkg/broker/snssqs"
"github.com/w-h-a/pkg/store"
"github.com/w-h-a/pkg/store/cockroach"
memorystore "github.com/w-h-a/pkg/store/memory"
)

var (
defaultStores = map[string]func(...store.StoreOption) store.Store{
"cockroach": cockroach.NewStore,
"memory": memorystore.NewStore,
}

defaultBrokers = map[string]func(...broker.BrokerOption) broker.Broker{
"snssqs": snssqs.NewBroker,
"memory": memorybroker.NewBroker,
}
)

func GetStoreBuilder(s string) (func(...store.StoreOption) store.Store, error) {
storeBuilder, exists := defaultStores[s]
if !exists {
return nil, fmt.Errorf("store %s is not supported", s)
}
return storeBuilder, nil
}

func MakeStore(storeBuilder func(...store.StoreOption) store.Store, nodes []string, database, table string) store.Store {
return storeBuilder(
store.StoreWithNodes(nodes...),
store.StoreWithDatabase(database),
store.StoreWithTable(table),
)
}

func GetBrokerBuilder(s string) (func(...broker.BrokerOption) broker.Broker, error) {
brokerBuilder, exists := defaultBrokers[s]
if !exists {
return nil, fmt.Errorf("broker %s is not supported", s)
}
return brokerBuilder, nil
}

func MakeProducer(brokerBuilder func(...broker.BrokerOption) broker.Broker, nodes []string, topic string) broker.Broker {
options := broker.NewPublishOptions(
broker.PublishWithTopic(topic),
)

return brokerBuilder(
broker.BrokerWithNodes(nodes...),
broker.BrokerWithPublishOptions(options),
)
}

func MakeConsumer(brokerBuilder func(...broker.BrokerOption) broker.Broker, nodes []string, group string, memory bool) broker.Broker {
subOptions := broker.NewSubscribeOptions(
broker.SubscribeWithGroup(group),
)

if memory {
pubOptions := broker.NewPublishOptions(
broker.PublishWithTopic(group),
)

return brokerBuilder(
broker.BrokerWithPublishOptions(pubOptions),
broker.BrokerWithSubscribeOptions(subOptions),
)
}

return brokerBuilder(
broker.BrokerWithNodes(nodes...),
broker.BrokerWithSubscribeOptions(subOptions),
)
}
10 changes: 7 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ services:
HTTP_ADDRESS: ':3501'
RPC_ADDRESS: ':50001'
SERVICE_NAME: 'python'
STORE: 'memory'
BROKER: 'memory'
MEMORY: 'memory'
ports:
- '3501:3501'
Expand All @@ -30,12 +32,14 @@ services:
VERSION: '0.1.0-alpha.0'
HTTP_ADDRESS: ':3501'
RPC_ADDRESS: ':50001'
STORES: 'orders'
PRODUCERS: 'neworder-queue'
CONSUMERS: 'neworder-queue'
SERVICE_NAME: 'node'
SERVICE_PORT: '3000'
SERVICE_PROTOCOL: 'http'
STORE: 'memory'
STORES: 'orders'
BROKER: 'memory'
PRODUCERS: 'neworder-queue'
CONSUMERS: 'neworder-queue'
MEMORY: 'memory'
ports:
- '3502:3501'
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23.0
require (
github.com/gorilla/mux v1.8.1
github.com/urfave/cli v1.22.15
github.com/w-h-a/pkg v0.19.0
github.com/w-h-a/pkg v0.20.0
google.golang.org/protobuf v1.34.2
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli v1.22.15 h1:nuqt+pdC/KqswQKhETJjo7pvn/k4xMUxgW6liI7XpnM=
github.com/urfave/cli v1.22.15/go.mod h1:wSan1hmo5zeyLGBjRJbzRTNk8gwoYa2B9n4q9dmRIc0=
github.com/w-h-a/pkg v0.19.0 h1:rzjMdlfKgP7A4MDihsXpRZFMqIv5CMfpHazpRBBhGjk=
github.com/w-h-a/pkg v0.19.0/go.mod h1:d2Z/O3YVN9L+9Sas5g5oeSpsgCaJ/+40bXctvEHXk+s=
github.com/w-h-a/pkg v0.20.0 h1:/y90BIO3Wr3I1O516f6WxzqGNL996gA1RE44T6ZXe3A=
github.com/w-h-a/pkg v0.20.0/go.mod h1:d2Z/O3YVN9L+9Sas5g5oeSpsgCaJ/+40bXctvEHXk+s=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
Expand Down

0 comments on commit a20b13b

Please sign in to comment.