From b6f73c04b1a74bc74e8ff93e9305cdcd12a6da06 Mon Sep 17 00:00:00 2001 From: w-h-a Date: Tue, 27 Aug 2024 07:08:45 -0700 Subject: [PATCH] refactor: clean up some stuff around stores and brokers --- cmd/config/config.go | 2 + cmd/run.go | 87 ++++++++++++-------------------------------- cmd/utils.go | 81 +++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 10 +++-- go.mod | 2 +- go.sum | 4 +- 6 files changed, 116 insertions(+), 70 deletions(-) create mode 100644 cmd/utils.go diff --git a/cmd/config/config.go b/cmd/config/config.go index d1e1600..eba540b 100644 --- a/cmd/config/config.go +++ b/cmd/config/config.go @@ -10,7 +10,9 @@ var ( RpcAddress = os.Getenv("RPC_ADDRESS") StoreAddress = os.Getenv("STORE_ADDRESS") BrokerAddress = os.Getenv("BROKER_ADDRESS") + Store = os.Getenv("STORE") Stores = Split(os.Getenv("STORES")) + Broker = os.Getenv("BROKER") Producers = Split(os.Getenv("PRODUCERS")) Consumers = Split(os.Getenv("CONSUMERS")) ServiceName = os.Getenv("SERVICE_NAME") diff --git a/cmd/run.go b/cmd/run.go index 2056427..fb29ef6 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -13,8 +13,6 @@ import ( "github.com/w-h-a/pkg/api" "github.com/w-h-a/pkg/api/httpapi" "github.com/w-h-a/pkg/broker" - memorybroker "github.com/w-h-a/pkg/broker/memory" - "github.com/w-h-a/pkg/broker/snssqs" "github.com/w-h-a/pkg/client/grpcclient" "github.com/w-h-a/pkg/client/httpclient" "github.com/w-h-a/pkg/server" @@ -22,8 +20,6 @@ import ( "github.com/w-h-a/pkg/sidecar" "github.com/w-h-a/pkg/sidecar/custom" "github.com/w-h-a/pkg/store" - "github.com/w-h-a/pkg/store/cockroach" - memorystore "github.com/w-h-a/pkg/store/memory" "github.com/w-h-a/pkg/telemetry/log" ) @@ -37,75 +33,38 @@ func run(ctx *cli.Context) { brokers := map[string]broker.Broker{} - if len(config.Memory) > 0 { - for _, s := range config.Stores { - if len(s) == 0 { - continue - } + st, err := GetStoreBuilder(config.Store) + if err != nil { + log.Fatal(err) + } - stores[s] = memorystore.NewStore( - store.StoreWithTable(s), - ) + for _, s := range config.Stores { + if len(s) == 0 { + continue } - for _, s := range config.Consumers { - if len(s) == 0 { - continue - } - - publishOptions := broker.NewPublishOptions( - broker.PublishWithTopic(s), - ) + stores[s] = MakeStore(st, []string{config.StoreAddress}, config.ServiceName, s) + } - subscribeOptions := broker.NewSubscribeOptions( - broker.SubscribeWithGroup(s), - ) + bk, err := GetBrokerBuilder(config.Broker) + if err != nil { + log.Fatal(err) + } - brokers[s] = memorybroker.NewBroker( - broker.BrokerWithPublishOptions(publishOptions), - broker.BrokerWithSubscribeOptions(subscribeOptions), - ) - } - } else { - for _, s := range config.Stores { - if len(s) == 0 { - continue - } - - stores[s] = cockroach.NewStore( - store.StoreWithNodes(config.StoreAddress), - store.StoreWithDatabase(config.ServiceName), - store.StoreWithTable(s), - ) + for _, s := range config.Producers { + if len(s) == 0 { + continue } - for _, s := range config.Producers { - if len(s) == 0 { - continue - } - - publishOptions := broker.NewPublishOptions( - broker.PublishWithTopic(s), - ) + brokers[s] = MakeProducer(bk, []string{config.BrokerAddress}, s) + } - brokers[s] = snssqs.NewBroker( - broker.BrokerWithPublishOptions(publishOptions), - ) + for _, s := range config.Consumers { + if len(s) == 0 { + continue } - for _, s := range config.Consumers { - if len(s) == 0 { - continue - } - - subscribeOptions := broker.NewSubscribeOptions( - broker.SubscribeWithGroup(s), - ) - - brokers[s] = snssqs.NewBroker( - broker.BrokerWithSubscribeOptions(subscribeOptions), - ) - } + brokers[s] = MakeConsumer(bk, []string{config.BrokerAddress}, s, len(config.Memory) > 0) } // get services @@ -187,7 +146,7 @@ func run(ctx *cli.Context) { }() // block here - err := <-errCh + err = <-errCh if err != nil { log.Errorf("failed to start action: %v", err) } diff --git a/cmd/utils.go b/cmd/utils.go new file mode 100644 index 0000000..f3cbf75 --- /dev/null +++ b/cmd/utils.go @@ -0,0 +1,81 @@ +package cmd + +import ( + "fmt" + + "github.com/w-h-a/pkg/broker" + memorybroker "github.com/w-h-a/pkg/broker/memory" + "github.com/w-h-a/pkg/broker/snssqs" + "github.com/w-h-a/pkg/store" + "github.com/w-h-a/pkg/store/cockroach" + memorystore "github.com/w-h-a/pkg/store/memory" +) + +var ( + defaultStores = map[string]func(...store.StoreOption) store.Store{ + "cockroach": cockroach.NewStore, + "memory": memorystore.NewStore, + } + + defaultBrokers = map[string]func(...broker.BrokerOption) broker.Broker{ + "snssqs": snssqs.NewBroker, + "memory": memorybroker.NewBroker, + } +) + +func GetStoreBuilder(s string) (func(...store.StoreOption) store.Store, error) { + storeBuilder, exists := defaultStores[s] + if !exists { + return nil, fmt.Errorf("store %s is not supported", s) + } + return storeBuilder, nil +} + +func MakeStore(storeBuilder func(...store.StoreOption) store.Store, nodes []string, database, table string) store.Store { + return storeBuilder( + store.StoreWithNodes(nodes...), + store.StoreWithDatabase(database), + store.StoreWithTable(table), + ) +} + +func GetBrokerBuilder(s string) (func(...broker.BrokerOption) broker.Broker, error) { + brokerBuilder, exists := defaultBrokers[s] + if !exists { + return nil, fmt.Errorf("broker %s is not supported", s) + } + return brokerBuilder, nil +} + +func MakeProducer(brokerBuilder func(...broker.BrokerOption) broker.Broker, nodes []string, topic string) broker.Broker { + options := broker.NewPublishOptions( + broker.PublishWithTopic(topic), + ) + + return brokerBuilder( + broker.BrokerWithNodes(nodes...), + broker.BrokerWithPublishOptions(options), + ) +} + +func MakeConsumer(brokerBuilder func(...broker.BrokerOption) broker.Broker, nodes []string, group string, memory bool) broker.Broker { + subOptions := broker.NewSubscribeOptions( + broker.SubscribeWithGroup(group), + ) + + if memory { + pubOptions := broker.NewPublishOptions( + broker.PublishWithTopic(group), + ) + + return brokerBuilder( + broker.BrokerWithPublishOptions(pubOptions), + broker.BrokerWithSubscribeOptions(subOptions), + ) + } + + return brokerBuilder( + broker.BrokerWithNodes(nodes...), + broker.BrokerWithSubscribeOptions(subOptions), + ) +} diff --git a/docker-compose.yml b/docker-compose.yml index 01d753a..27f1767 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,8 @@ services: HTTP_ADDRESS: ':3501' RPC_ADDRESS: ':50001' SERVICE_NAME: 'python' + STORE: 'memory' + BROKER: 'memory' MEMORY: 'memory' ports: - '3501:3501' @@ -30,12 +32,14 @@ services: VERSION: '0.1.0-alpha.0' HTTP_ADDRESS: ':3501' RPC_ADDRESS: ':50001' - STORES: 'orders' - PRODUCERS: 'neworder-queue' - CONSUMERS: 'neworder-queue' SERVICE_NAME: 'node' SERVICE_PORT: '3000' SERVICE_PROTOCOL: 'http' + STORE: 'memory' + STORES: 'orders' + BROKER: 'memory' + PRODUCERS: 'neworder-queue' + CONSUMERS: 'neworder-queue' MEMORY: 'memory' ports: - '3502:3501' diff --git a/go.mod b/go.mod index d71098d..34f8403 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.0 require ( github.com/gorilla/mux v1.8.1 github.com/urfave/cli v1.22.15 - github.com/w-h-a/pkg v0.19.0 + github.com/w-h-a/pkg v0.20.0 google.golang.org/protobuf v1.34.2 ) diff --git a/go.sum b/go.sum index 381da7e..9c29b17 100644 --- a/go.sum +++ b/go.sum @@ -65,8 +65,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/urfave/cli v1.22.15 h1:nuqt+pdC/KqswQKhETJjo7pvn/k4xMUxgW6liI7XpnM= github.com/urfave/cli v1.22.15/go.mod h1:wSan1hmo5zeyLGBjRJbzRTNk8gwoYa2B9n4q9dmRIc0= -github.com/w-h-a/pkg v0.19.0 h1:rzjMdlfKgP7A4MDihsXpRZFMqIv5CMfpHazpRBBhGjk= -github.com/w-h-a/pkg v0.19.0/go.mod h1:d2Z/O3YVN9L+9Sas5g5oeSpsgCaJ/+40bXctvEHXk+s= +github.com/w-h-a/pkg v0.20.0 h1:/y90BIO3Wr3I1O516f6WxzqGNL996gA1RE44T6ZXe3A= +github.com/w-h-a/pkg v0.20.0/go.mod h1:d2Z/O3YVN9L+9Sas5g5oeSpsgCaJ/+40bXctvEHXk+s= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=