Skip to content

Commit

Permalink
Check whether outgoing message is in a loop before sending
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Aug 16, 2019
1 parent f754203 commit 55ad528
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 11 deletions.
4 changes: 4 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type Backend interface {
// a backend wants to implement a failsafe against double sending messages (say if they were double queued)
WasMsgSent(context.Context, Msg) (bool, error)

// IsMsgLoop returns whether the passed in message is part of a message loop, possibly with another bot. Backends should
// implement their own logic to implement this.
IsMsgLoop(ctx context.Context, msg Msg) (bool, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// used to determine any sort of deduping of msg sends
Expand Down
56 changes: 55 additions & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.Msg, error) {
}

var luaSent = redis.NewScript(3,
`-- KEYS: [TodayKey, YesterdayKey, MsgId]
`-- KEYS: [TodayKey, YesterdayKey, MsgID]
local found = redis.call("sismember", KEYS[1], KEYS[3])
if found == 1 then
return 1
Expand All @@ -183,6 +183,60 @@ func (b *backend) WasMsgSent(ctx context.Context, msg courier.Msg) (bool, error)
return redis.Bool(luaSent.Do(rc, todayKey, yesterdayKey, msg.ID().String()))
}

var luaMsgLoop = redis.NewScript(3, `-- KEYS: [key, contact_id, text]
local key = KEYS[1]
local contact_id = KEYS[2]
local text = KEYS[3]
local count = 1
-- try to look up in window
local record = redis.call("hget", key, contact_id)
if record then
local record_count = string.sub(record, 1, 2)
local record_text = string.sub(record, 4, -1)
if record_text == text then
count = tonumber(record_count) + 1
else
count = 1
end
end
-- create our new record with our updated count
record = string.format("%02d:%s", count, text)
-- write our new record witn updated count
redis.call("hset", key, contact_id, record)
return count
`)

// IsMsgLoop checks whether the passed in message is part of a loop
func (b *backend) IsMsgLoop(ctx context.Context, msg courier.Msg) (bool, error) {
m := msg.(*DBMsg)

// things that aren't replies can't be loops, neither do we count retries
if m.ResponseToID_ == courier.NilMsgID || m.ErrorCount_ > 0 {
return false, nil
}

// otherwise run our script to check whether this is a loop in the past 5 minutes
rc := b.redisPool.Get()
defer rc.Close()

keyTime := time.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf(sentSetName, fmt.Sprintf("loop_msgs:%s", keyTime.Format("2006-01-02-15:04")))
count, err := redis.Int(luaMsgLoop.Do(rc, key, m.ContactID_, m.Text_))
if err != nil {
return false, errors.Wrapf(err, "error while checking for msg loop")
}

if count >= 20 {
return true, nil
}
return false, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg, status courier.MsgStatus) {
rc := b.redisPool.Get()
Expand Down
24 changes: 24 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,30 @@ func (ts *BackendTestSuite) TestExternalIDDupes() {
ts.True(m2.alreadyWritten)
}

func (ts *BackendTestSuite) TestLoop() {
ctx := context.Background()
dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)

dbMsg.ResponseToID_ = courier.MsgID(5)

loop, err := ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.False(loop)

// call it 18 times more, no loop still
for i := 0; i < 18; i++ {
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.False(loop)
}

// last one should make us a loop
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.True(loop)
}

func (ts *BackendTestSuite) TestStatus() {
// our health should just contain the header
ts.True(strings.Contains(ts.b.Status(), "Channel"), ts.b.Status())
Expand Down
33 changes: 23 additions & 10 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ func (w *Sender) sendMessage(msg Msg) {
sendCTX, cancel := context.WithTimeout(context.Background(), time.Second*35)
defer cancel()

msgLog := log.WithField("msg_id", msg.ID().String()).WithField("msg_text", msg.Text()).WithField("msg_urn", msg.URN().Identity())
log = log.WithField("msg_id", msg.ID().String()).WithField("msg_text", msg.Text()).WithField("msg_urn", msg.URN().Identity())
if len(msg.Attachments()) > 0 {
msgLog = msgLog.WithField("attachments", msg.Attachments())
log = log.WithField("attachments", msg.Attachments())
}
if len(msg.QuickReplies()) > 0 {
msgLog = msgLog.WithField("quick_replies", msg.QuickReplies())
log = log.WithField("quick_replies", msg.QuickReplies())
}

start := time.Now()
Expand All @@ -177,21 +177,34 @@ func (w *Sender) sendMessage(msg Msg) {

// failing on a lookup isn't a halting problem but we should log it
if err != nil {
msgLog.WithError(err).Warning("error looking up msg was sent")
log.WithError(err).Warning("error looking up msg was sent")
}

// is this msg in a loop?
loop, err := backend.IsMsgLoop(sendCTX, msg)

// failing on loop lookup isn't permanent, but log
if err != nil {
log.WithError(err).Warning("error looking up msg loop")
}

if sent {
// if this message was already sent, create a wired status for it
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgWired)
msgLog.Warning("duplicate send, marking as wired")
log.Warning("duplicate send, marking as wired")
} else if loop {
// if this contact is in a loop, fail the message immediately without sending
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgFailed)
status.AddLog(NewChannelLogFromError("Message Loop", msg.Channel(), msg.ID(), 0, fmt.Errorf("message loop detected, failing message without send")))
log.Error("message loop detected, failing message")
} else {
// send our message
status, err = server.SendMsg(sendCTX, msg)
duration := time.Now().Sub(start)
secondDuration := float64(duration) / float64(time.Second)

if err != nil {
msgLog.WithError(err).WithField("elapsed", duration).Error("error sending message")
log.WithError(err).WithField("elapsed", duration).Error("error sending message")
if status == nil {
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgErrored)
status.AddLog(NewChannelLogFromError("Sending Error", msg.Channel(), msg.ID(), duration, err))
Expand All @@ -200,10 +213,10 @@ func (w *Sender) sendMessage(msg Msg) {

// report to librato and log locally
if status.Status() == MsgErrored || status.Status() == MsgFailed {
msgLog.WithField("elapsed", duration).Warning("msg errored")
log.WithField("elapsed", duration).Warning("msg errored")
librato.Gauge(fmt.Sprintf("courier.msg_send_error_%s", msg.Channel().ChannelType()), secondDuration)
} else {
msgLog.WithField("elapsed", duration).Info("msg sent")
log.WithField("elapsed", duration).Info("msg sent")
librato.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration)
}
}
Expand All @@ -214,13 +227,13 @@ func (w *Sender) sendMessage(msg Msg) {

err = backend.WriteMsgStatus(writeCTX, status)
if err != nil {
msgLog.WithError(err).Info("error writing msg status")
log.WithError(err).Info("error writing msg status")
}

// write our logs as well
err = backend.WriteChannelLogs(writeCTX, status.Logs())
if err != nil {
msgLog.WithError(err).Info("error writing msg logs")
log.WithError(err).Info("error writing msg logs")
}

// mark our send task as complete
Expand Down
5 changes: 5 additions & 0 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func (mb *MockBackend) WasMsgSent(ctx context.Context, msg Msg) (bool, error) {
return mb.sentMsgs[msg.ID()], nil
}

// IsMsgLoop returns whether the passed in msg is a loop
func (mb *MockBackend) IsMsgLoop(ctx context.Context, msg Msg) (bool, error) {
return false, nil
}

// MarkOutgoingMsgComplete marks the passed msg as having been dealt with
func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg Msg, s MsgStatus) {
mb.mutex.Lock()
Expand Down

0 comments on commit 55ad528

Please sign in to comment.