Skip to content

Commit

Permalink
optimize sends via bulk inserts and updatesO
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed May 9, 2019
1 parent 7aa4845 commit 82dbc10
Show file tree
Hide file tree
Showing 24 changed files with 797 additions and 156 deletions.
44 changes: 41 additions & 3 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/batch"
"github.com/nyaruka/courier/chatbase"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/utils"
Expand Down Expand Up @@ -244,9 +245,15 @@ func (b *backend) WriteMsgStatus(ctx context.Context, status courier.MsgStatus)
timeout, cancel := context.WithTimeout(ctx, backendTimeout)
defer cancel()

err := writeMsgStatus(timeout, b, status)
if err != nil {
return err
// if we have an ID, we can have our batch commit for us
if status.ID() != courier.NilMsgID {
b.statusCommitter.Queue(status.(*DBMsgStatus))
} else {
// otherwise, write normally (synchronously)
err := writeMsgStatus(timeout, b, status)
if err != nil {
return err
}
}

// if we have an id and are marking an outgoing msg as errored, then clear our sent flag
Expand Down Expand Up @@ -572,6 +579,24 @@ func (b *backend) Start() error {
log.Info("spool directories ok")
}

// create our status committer and start it
b.statusCommitter = batch.NewCommitter("status committer", b.db, bulkUpdateMsgStatusSQL, time.Millisecond*250, b.waitGroup,
func(err error, value batch.Value) {
logrus.WithField("comp", "status committer").WithError(err).Error("error writing status")
err = courier.WriteToSpool(b.config.SpoolDir, "statuses", value)
if err != nil {
logrus.WithField("comp", "status committer").WithError(err).Error("error writing status to spool")
}
})
b.statusCommitter.Start()

// create our log committer and start it
b.logCommitter = batch.NewCommitter("log committer", b.db, insertLogSQL, time.Millisecond*250, b.waitGroup,
func(err error, value batch.Value) {
logrus.WithField("comp", "log committer").WithError(err).Error("error writing channel log")
})
b.logCommitter.Start()

// register and start our spool flushers
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "msgs"), b.flushMsgFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile)
Expand All @@ -590,6 +615,16 @@ func (b *backend) Stop() error {
// close our stop channel
close(b.stopChan)

// stop our status committer
if b.statusCommitter != nil {
b.statusCommitter.Stop()
}

// stop our log committer
if b.logCommitter != nil {
b.logCommitter.Stop()
}

// wait for our threads to exit
b.waitGroup.Wait()
return nil
Expand Down Expand Up @@ -621,6 +656,9 @@ func newBackend(config *courier.Config) courier.Backend {
type backend struct {
config *courier.Config

statusCommitter batch.Committer
logCommitter batch.Committer

db *sqlx.DB
redisPool *redis.Pool
s3Client s3iface.S3API
Expand Down
69 changes: 49 additions & 20 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/null"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -211,7 +212,7 @@ func (ts *BackendTestSuite) TestContact() {
ts.Equal(contact.UUID_, contact2.UUID_)
ts.Equal(contact.ID_, contact2.ID_)
ts.Equal(knChannel.OrgID(), contact2.OrgID_)
ts.Equal("Ryan Lewis", contact2.Name_.String)
ts.Equal(null.String("Ryan Lewis"), contact2.Name_)
ts.True(contact2.ModifiedOn_.After(now))
ts.True(contact2.CreatedOn_.After(now))
ts.True(contact2.ModifiedOn_.Before(now2))
Expand All @@ -223,7 +224,7 @@ func (ts *BackendTestSuite) TestContact() {
ts.NoError(err)
ts.NotNil(contact)

ts.Equal("", contact.Name_.String)
ts.Equal(null.String(""), contact.Name_)
ts.Equal("a984069d-0008-4d8c-a772-b14a8a6acccc", contact.UUID_.String())

urn, _ = urns.NewTelURNForCountry("12065551519", "US")
Expand All @@ -234,7 +235,7 @@ func (ts *BackendTestSuite) TestContact() {
contact3, err := contactForURN(ctx, ts.b, knChannel.OrgID(), knChannel, urn, "", longName)
ts.NoError(err)

ts.Equal(longName[0:127], contact3.Name_.String)
ts.Equal(null.String(longName[0:127]), contact3.Name_)

}

Expand Down Expand Up @@ -324,14 +325,14 @@ func (ts *BackendTestSuite) TestContactURN() {

contactURNs, err := contactURNsForContact(tx, contact.ID_)
ts.NoError(err)
ts.Equal("chestnut", contactURNs[0].Auth.String)
ts.Equal(null.String("chestnut"), contactURNs[0].Auth)

// now build a URN for our number with the kannel channel
knURN, err := contactURNForURN(tx, knChannel.OrgID_, knChannel.ID_, contact.ID_, urn, "sesame")
ts.NoError(err)
ts.NoError(tx.Commit())
ts.Equal(knURN.OrgID, knChannel.OrgID_)
ts.Equal("sesame", knURN.Auth.String)
ts.Equal(null.String("sesame"), knURN.Auth)

tx, err = ts.b.db.Beginx()
ts.NoError(err)
Expand All @@ -351,7 +352,7 @@ func (ts *BackendTestSuite) TestContactURN() {
ts.Equal(twURN.ChannelID, twChannel.ID())

// auth should be unchanged
ts.Equal("sesame", twURN.Auth.String)
ts.Equal(null.String("sesame"), twURN.Auth)

tx, err = ts.b.db.Beginx()
ts.NoError(err)
Expand All @@ -360,7 +361,7 @@ func (ts *BackendTestSuite) TestContactURN() {
twURN, err = contactURNForURN(tx, twChannel.OrgID_, twChannel.ID_, contact.ID_, urn, "peanut")
ts.NoError(err)
ts.NoError(tx.Commit())
ts.Equal("peanut", twURN.Auth.String)
ts.Equal(null.String("peanut"), twURN.Auth)

// test that we don't use display when looking up URNs
tgChannel := ts.getChannel("TG", "dbc126ed-66bc-4e28-b67b-81dc3327c98a")
Expand All @@ -382,7 +383,7 @@ func (ts *BackendTestSuite) TestContactURN() {
ts.NoError(err)
ts.NoError(tx.Commit())
ts.Equal(tgContact.URNID_, tgContactURN.ID)
ts.Equal("Jane", tgContactURN.Display.String)
ts.Equal(null.String("Jane"), tgContactURN.Display)

// try to create two contacts at the same time in goroutines, this tests our transaction rollbacks
urn2, _ := urns.NewTelURNForCountry("12065551616", "US")
Expand All @@ -404,8 +405,8 @@ func (ts *BackendTestSuite) TestContactURN() {
wait.Wait()
ts.NotNil(contact2)
ts.NotNil(contact3)
ts.Equal(contact2.ID_.Int64, contact3.ID_.Int64)
ts.Equal(contact2.URNID_.Int64, contact3.URNID_.Int64)
ts.Equal(contact2.ID_, contact3.ID_)
ts.Equal(contact2.URNID_, contact3.URNID_)
}

func (ts *BackendTestSuite) TestContactURNPriority() {
Expand Down Expand Up @@ -448,6 +449,23 @@ func (ts *BackendTestSuite) TestContactURNPriority() {
ts.Equal(twChannel.ID(), urns[1].ChannelID)
}

func (ts *BackendTestSuite) TestMsgStatusPerformance() {
ctx := context.Background()
channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")
time.Sleep(2 * time.Millisecond)

start := time.Now()

for i := 0; i < 10000; i++ {
status := ts.b.NewMsgStatusForID(channel, courier.NewMsgID(10001), courier.MsgWired)
status.SetExternalID("ext0")
err := ts.b.WriteMsgStatus(ctx, status)
ts.NoError(err)
}

fmt.Printf("Took: %s\n", time.Since(start))
}

func (ts *BackendTestSuite) TestMsgStatus() {
ctx := context.Background()
channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")
Expand All @@ -459,27 +477,32 @@ func (ts *BackendTestSuite) TestMsgStatus() {
status.SetExternalID("ext0")
err := ts.b.WriteMsgStatus(ctx, status)
ts.NoError(err)
time.Sleep(time.Second)

m, err := readMsgFromDB(ts.b, courier.NewMsgID(10001))
ts.NoError(err)
ts.Equal(m.Status_, courier.MsgWired)
ts.Equal(m.ExternalID_.String, "ext0")
ts.Equal(m.ExternalID_, null.String("ext0"))
ts.True(m.ModifiedOn_.After(now))
ts.True(m.SentOn_.After(now))

// update by id, no external id, shouldn't overwrite it
status = ts.b.NewMsgStatusForID(channel, courier.NewMsgID(10001), courier.MsgSent)
err = ts.b.WriteMsgStatus(ctx, status)
ts.NoError(err)
time.Sleep(time.Second)

m, err = readMsgFromDB(ts.b, courier.NewMsgID(10001))
ts.NoError(err)
ts.Equal(m.Status_, courier.MsgSent)
ts.Equal(m.ExternalID_.String, "ext0")
ts.Equal(m.ExternalID_, null.String("ext0"))
ts.True(m.ModifiedOn_.After(now))

// update by external id
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgFailed)
err = ts.b.WriteMsgStatus(ctx, status)
ts.NoError(err)
time.Sleep(time.Second)
m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)
ts.Equal(m.Status_, courier.MsgFailed)
Expand All @@ -493,13 +516,16 @@ func (ts *BackendTestSuite) TestMsgStatus() {
// reset our status to sent
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgSent)
err = ts.b.WriteMsgStatus(ctx, status)
time.Sleep(time.Second)

// error our msg
now = time.Now().In(time.UTC)
time.Sleep(2 * time.Millisecond)
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgErrored)
err = ts.b.WriteMsgStatus(ctx, status)
ts.NoError(err)
time.Sleep(time.Second)

m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)
ts.Equal(m.Status_, courier.MsgErrored)
Expand All @@ -511,6 +537,8 @@ func (ts *BackendTestSuite) TestMsgStatus() {
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgErrored)
err = ts.b.WriteMsgStatus(ctx, status)
ts.NoError(err)
time.Sleep(time.Second)

m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)
ts.Equal(m.Status_, courier.MsgErrored)
Expand All @@ -519,6 +547,8 @@ func (ts *BackendTestSuite) TestMsgStatus() {
// third go
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgErrored)
err = ts.b.WriteMsgStatus(ctx, status)
time.Sleep(time.Second)

ts.NoError(err)
m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)
Expand Down Expand Up @@ -861,8 +891,7 @@ func (ts *BackendTestSuite) TestWriteMsg() {
ts.Equal(contactURN.ID, m.ContactURNID_)
ts.Equal(MsgIncoming, m.Direction_)
ts.Equal(courier.MsgPending, m.Status_)
ts.False(m.HighPriority_.Bool)
ts.False(m.HighPriority_.Valid)
ts.False(m.HighPriority_)
ts.Equal("ext123", m.ExternalID())
ts.Equal("test123", m.Text_)
ts.Equal(0, len(m.Attachments()))
Expand All @@ -875,7 +904,7 @@ func (ts *BackendTestSuite) TestWriteMsg() {
ts.NotNil(m.QueuedOn_)

contact, err := contactForURN(ctx, ts.b, m.OrgID_, knChannel, urn, "", "")
ts.Equal("test contact", contact.Name_.String)
ts.Equal(null.String("test contact"), contact.Name_)
ts.Equal(m.OrgID_, contact.OrgID_)
ts.Equal(m.ContactID_, contact.ID_)
ts.NotNil(contact.UUID_)
Expand All @@ -900,7 +929,7 @@ func (ts *BackendTestSuite) TestWriteMsg() {
// check that our mailroom queue has an item
rc := ts.b.redisPool.Get()
defer rc.Close()
rc.Do("DEL", "handler:1", "handler:active", fmt.Sprintf("c:1:%d", msg.ContactID_.Int64))
rc.Do("DEL", "handler:1", "handler:active", fmt.Sprintf("c:1:%d", msg.ContactID_))

// test queuing to mailroom
knChannel.OrgFlowServerEnabled_ = true
Expand All @@ -916,7 +945,7 @@ func (ts *BackendTestSuite) TestWriteMsg() {
ts.NoError(err)
ts.Equal(1, count)

count, err = redis.Int(rc.Do("LLEN", fmt.Sprintf("c:1:%d", msg.ContactID_.Int64)))
count, err = redis.Int(rc.Do("LLEN", fmt.Sprintf("c:1:%d", msg.ContactID_)))
ts.NoError(err)
ts.Equal(1, count)
}
Expand All @@ -932,7 +961,7 @@ func (ts *BackendTestSuite) TestChannelEvent() {

contact, err := contactForURN(ctx, ts.b, channel.OrgID_, channel, urn, "", "")
ts.NoError(err)
ts.Equal("kermit frog", contact.Name_.String)
ts.Equal(null.String("kermit frog"), contact.Name_)

dbE := event.(*DBChannelEvent)
dbE, err = readChannelEventFromDB(ts.b, dbE.ID_)
Expand Down Expand Up @@ -975,7 +1004,7 @@ func (ts *BackendTestSuite) TestMailroomEvents() {

contact, err := contactForURN(ctx, ts.b, channel.OrgID_, channel, urn, "", "")
ts.NoError(err)
ts.Equal("kermit frog", contact.Name_.String)
ts.Equal(null.String("kermit frog"), contact.Name_)

dbE := event.(*DBChannelEvent)
dbE, err = readChannelEventFromDB(ts.b, dbE.ID_)
Expand All @@ -993,7 +1022,7 @@ func (ts *BackendTestSuite) TestMailroomEvents() {
ts.NoError(err)
ts.Equal(1, count)

count, err = redis.Int(rc.Do("LLEN", fmt.Sprintf("c:1:%d", contact.ID_.Int64)))
count, err = redis.Int(rc.Do("LLEN", fmt.Sprintf("c:1:%d", contact.ID_)))
ts.NoError(err)
ts.Equal(1, count)
}
Expand Down
Loading

0 comments on commit 82dbc10

Please sign in to comment.