Skip to content

Commit

Permalink
split queue subs
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe committed Mar 9, 2024
1 parent 37278aa commit f0cfcae
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 18 deletions.
9 changes: 5 additions & 4 deletions info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package psrpc

type RPCInfo struct {
Service string
Method string
Topic []string
Multi bool
Service string
Method string
Topic []string
Multi bool
Wildcard bool
}
2 changes: 1 addition & 1 deletion internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 14 additions & 11 deletions pkg/info/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func GetResponseChannel(service, clientID string) bus.Channel {
func (i *RequestInfo) GetRPCChannel() bus.Channel {
return bus.Channel{
Legacy: formatChannel('|', i.Service, i.Method, i.Topic, "REQ"),
Primary: formatServerChannel(i.Service, i.Topic, i.Method, "REQ"),
Wildcard: formatServerWildcard(i.Service, i.Topic),
Primary: formatServerChannel(i.Service, i.Topic, i.Queue, i.Method, "REQ"),
Wildcard: formatServerWildcard(i.Service, i.Topic, i.Queue),
}
}

Expand All @@ -69,16 +69,16 @@ func (i *RequestInfo) GetHandlerKey() string {
func (i *RequestInfo) GetClaimResponseChannel() bus.Channel {
return bus.Channel{
Legacy: formatChannel('|', i.Service, i.Method, i.Topic, "RCLAIM"),
Primary: formatServerChannel(i.Service, i.Topic, i.Method, "RCLAIM"),
Wildcard: formatServerWildcard(i.Service, i.Topic),
Primary: formatServerChannel(i.Service, i.Topic, i.Queue, i.Method, "RCLAIM"),
Wildcard: formatServerWildcard(i.Service, i.Topic, i.Queue),
}
}

func (i *RequestInfo) GetStreamServerChannel() bus.Channel {
return bus.Channel{
Legacy: formatChannel('|', i.Service, i.Method, i.Topic, "STR"),
Primary: formatServerChannel(i.Service, i.Topic, i.Method, "STR"),
Wildcard: formatServerWildcard(i.Service, i.Topic),
Primary: formatServerChannel(i.Service, i.Topic, false, i.Method, "STR"),
Wildcard: formatServerWildcard(i.Service, i.Topic, false),
}
}

Expand All @@ -101,26 +101,29 @@ func formatClientChannel(service, clientID, channel string) string {
return string(b)
}

func formatServerChannel(service string, topic []string, method, channel string) string {
func formatServerChannel(service string, topic []string, queue bool, method, channel string) string {
p := scratch.Get().(*[]byte)
defer scratch.Put(p)
b := appendServerPrefix(*p, service, topic)
b := appendServerPrefix(*p, service, topic, queue)
b = append(b, method...)
b = append(b, '.')
b = append(b, channel...)
return string(b)
}

func formatServerWildcard(service string, topic []string) string {
func formatServerWildcard(service string, topic []string, queue bool) string {
p := scratch.Get().(*[]byte)
defer scratch.Put(p)
b := appendServerPrefix(*p, service, topic)
b := appendServerPrefix(*p, service, topic, queue)
b = append(b, "*.*"...)
return string(b)
}

func appendServerPrefix(b []byte, service string, topic []string) []byte {
func appendServerPrefix(b []byte, service string, topic []string, queue bool) []byte {
b = append(b, "SRV/"...)
if queue {
b = append(b, "Q/"...)
}
b = append(b, service...)
if len(topic) > 0 {
b = append(b, '/')
Expand Down
12 changes: 12 additions & 0 deletions pkg/info/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,17 @@ func TestChannelFormatters(t *testing.T) {
require.Equal(t, "SRV/foo/a/b/c.bar.STR", i.GetStreamServerChannel().Primary)
require.Equal(t, "SRV/foo/a/b/c.*.*", i.GetStreamServerChannel().Wildcard)

i.Queue = true

require.Equal(t, "foo|bar|a|b|c|REQ", i.GetRPCChannel().Legacy)
require.Equal(t, "SRV/Q/foo/a/b/c.bar.REQ", i.GetRPCChannel().Primary)
require.Equal(t, "SRV/Q/foo/a/b/c.*.*", i.GetRPCChannel().Wildcard)
require.Equal(t, "foo|bar|a|b|c|RCLAIM", i.GetClaimResponseChannel().Legacy)
require.Equal(t, "SRV/Q/foo/a/b/c.bar.RCLAIM", i.GetClaimResponseChannel().Primary)
require.Equal(t, "SRV/Q/foo/a/b/c.*.*", i.GetClaimResponseChannel().Wildcard)
require.Equal(t, "foo|bar|a|b|c|STR", i.GetStreamServerChannel().Legacy)
require.Equal(t, "SRV/foo/a/b/c.bar.STR", i.GetStreamServerChannel().Primary)
require.Equal(t, "SRV/foo/a/b/c.*.*", i.GetStreamServerChannel().Wildcard)

require.Equal(t, "U+0001f680_u+00c9.U+0001f6f0_bar.u+8f6fu+4ef6.END", formatChannel('.', "🚀_É", "🛰_bar", []string{"软件"}, "END"))
}
2 changes: 1 addition & 1 deletion protoc-gen-psrpc/options/options.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion testutils/testutils.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f0cfcae

Please sign in to comment.