Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send to exact match only when notify in poller #537

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/aio/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package aio

import "github.com/resonatehq/resonate/pkg/message"

type Message struct {
Type message.Type
Data []byte
Body []byte
Done func(bool, error)
Expand Down
6 changes: 6 additions & 0 deletions internal/app/plugins/poll/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/message"
)

type Config struct {
Expand Down Expand Up @@ -305,6 +306,11 @@ func (w *PollWorker) Process(mesg *aio.Message) {
return
}

if mesg.Type == message.Notify && conn.id != data.Id {
mesg.Done(false, fmt.Errorf("no connection found for group %s and id %s", data.Group, data.Id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that not being able to deliver a notify message is not an error in that our guarantee is "at most once", should we return success here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the exact same thing!

The reason I went with an error is so that the sender subsystem could retry notify messages if we ever want to do an enqueue retry loop (but still go directly to completed on successful enqueue). I figure the poller in the sdk may lose and re-establish a connection from time to time so it is possible that at one moment there is no active connection and later there is where the sdk instance itself did not actually restart.

return
}

// send message to connection
select {
case conn.ch <- mesg.Body:
Expand Down
16 changes: 16 additions & 0 deletions internal/app/plugins/poll/poll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/resonatehq/resonate/pkg/message"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -190,6 +191,21 @@ func TestPollPlugin(t *testing.T) {
{"foo", []string{"a", "b", "c"}, "data: ok3"},
},
},
{
name: "NotifyMustBeSameGroupAndId",
mc: 5,
connections: []*Conn{
{"foo", "a"},
},
messages: []*Mesg{
{true, &aio.Message{Type: message.Notify, Data: []byte(`{"group":"foo","id":"a"}`), Body: []byte("ok1")}},
{false, &aio.Message{Type: message.Notify, Data: []byte(`{"group":"foo","id":"b"}`), Body: []byte("ok2")}},
{false, &aio.Message{Type: message.Notify, Data: []byte(`{"group":"foo","id":"c"}`), Body: []byte("ok3")}},
},
expected: []*Resp{
{"foo", []string{"a"}, "data: ok1"},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
config := &Config{
Expand Down
9 changes: 5 additions & 4 deletions internal/app/subsystems/aio/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,17 @@
var body []byte
var err error

messgType := sqe.Submission.Sender.Task.Mesg.Type
mesgType := sqe.Submission.Sender.Task.Mesg.Type

Check warning on line 251 in internal/app/subsystems/aio/sender/sender.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender.go#L251

Added line #L251 was not covered by tests

if messgType == message.Notify {
if mesgType == message.Notify {

Check warning on line 253 in internal/app/subsystems/aio/sender/sender.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender.go#L253

Added line #L253 was not covered by tests
util.Assert(sqe.Submission.Sender.Promise != nil, "promise must not be nil for a notify message")
body, err = json.Marshal(map[string]interface{}{
"type": messgType,
"type": mesgType,

Check warning on line 256 in internal/app/subsystems/aio/sender/sender.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender.go#L256

Added line #L256 was not covered by tests
"promise": sqe.Submission.Sender.Promise,
})
} else {
body, err = json.Marshal(map[string]interface{}{
"type": messgType,
"type": mesgType,

Check warning on line 261 in internal/app/subsystems/aio/sender/sender.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender.go#L261

Added line #L261 was not covered by tests
"task": sqe.Submission.Sender.Task,
"href": map[string]string{
"claim": sqe.Submission.Sender.ClaimHref,
Expand All @@ -277,6 +277,7 @@
counter := w.metrics.AioInFlight.WithLabelValues(plugin.String())

ok := plugin.Enqueue(&aio.Message{
Type: mesgType,

Check warning on line 280 in internal/app/subsystems/aio/sender/sender.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender.go#L280

Added line #L280 was not covered by tests
Data: recv.Data,
Body: body,
Done: func(success bool, err error) {
Expand Down