Skip to content

Commit

Permalink
Merge pull request #34 from kanmu/support_datadog
Browse files Browse the repository at this point in the history
Support dd-trace-go sql tracer
  • Loading branch information
winebarrel authored Oct 3, 2024
2 parents 4c089b0 + 57b94f5 commit d45c478
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 29 deletions.
24 changes: 24 additions & 0 deletions connector.go → db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package qg

import (
"database/sql"
"database/sql/driver"
"fmt"
"net/url"
Expand Down Expand Up @@ -35,3 +36,26 @@ func GetConnectorFromConnStr(connStr string) (driver.Connector, error) {

return connector, nil
}

// driver.Conn Wrapper
type ConnWrapper interface {
WrappedConn() driver.Conn
}

// Get *pgx.Conn from *sql.Conn and pass it to function.
func rawConn(conn *sql.Conn, f func(*pgx.Conn) error) error {
err := conn.Raw(func(driverConn any) error {
var stdlibConn *stdlib.Conn

if tracedConn, ok := driverConn.(ConnWrapper); ok {
stdlibConn = tracedConn.WrappedConn().(*stdlib.Conn)
} else {
stdlibConn = driverConn.(*stdlib.Conn)
}

pgxConn := stdlibConn.Conn()
return f(pgxConn)
})

return err
}
31 changes: 29 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
module github.com/kanmu/qg

go 1.22
go 1.22.0

toolchain go1.22.8

require (
github.com/jackc/pgx/v4 v4.18.3
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0
gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328
)

require (
github.com/DataDog/appsec-internal-go v1.7.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.48.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.48.1 // indirect
github.com/DataDog/datadog-go/v5 v5.3.0 // indirect
github.com/DataDog/go-libddwaf/v3 v3.3.0 // indirect
github.com/DataDog/go-tuf v1.0.2-0.5.2 // indirect
github.com/DataDog/sketches-go v1.4.5 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.6.0-alpha.5 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
Expand All @@ -16,7 +31,19 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/lib/pq v1.10.3 // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/crypto v0.20.0 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
179 changes: 172 additions & 7 deletions go.sum

Large diffs are not rendered by default.

19 changes: 6 additions & 13 deletions que.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
null "gopkg.in/guregu/null.v3"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

// Job is a single unit of work for Que to perform.
Expand Down Expand Up @@ -125,8 +124,7 @@ func (j *Job) DeleteContext(ctx context.Context) error {
return nil
}

err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err := rawConn(j.conn, func(pgxConn *pgx.Conn) error {
_, err := pgxConn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID)
return err
})
Expand Down Expand Up @@ -158,8 +156,7 @@ func (j *Job) DoneContext(ctx context.Context) {
var ok bool
// Swallow this error because we don't want an unlock failure to cause work to
// stop.
err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err := rawConn(j.conn, func(pgxConn *pgx.Conn) error {
return pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok)
})

Expand Down Expand Up @@ -188,8 +185,7 @@ func (j *Job) ErrorContext(ctx context.Context, msg string) error {
errorCount := j.ErrorCount + 1
delay := intPow(int(errorCount), 4) + 3 // TODO: configurable delay

err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err := rawConn(j.conn, func(pgxConn *pgx.Conn) error {
_, err := pgxConn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID)
return err
})
Expand Down Expand Up @@ -366,8 +362,7 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
j := Job{c: c, conn: conn}

for i := 0; i < maxLockJobAttempts; i++ {
err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err = rawConn(conn, func(pgxConn *pgx.Conn) error {
return pgxConn.QueryRow(ctx, "que_lock_job", queue).Scan(
&j.Queue,
&j.Priority,
Expand Down Expand Up @@ -400,8 +395,7 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
// I'm not sure how to reliably commit a transaction that deletes
// the job in a separate thread between lock_job and check_job.
var ok bool
err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err = rawConn(conn, func(pgxConn *pgx.Conn) error {
return pgxConn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok)
})
if err == nil {
Expand All @@ -414,8 +408,7 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
// eventually causing the server to run out of locks.
//
// Also swallow the possible error, exactly like in Done.
conn.Raw(func(driverConn any) error { //nolint:errcheck
pgxConn := driverConn.(*stdlib.Conn).Conn()
rawConn(conn, func(pgxConn *pgx.Conn) error { //nolint:errcheck
pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok) //nolint:errcheck
return nil
})
Expand Down
14 changes: 11 additions & 3 deletions que_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package qg

import (
"database/sql"
"database/sql/driver"
"testing"
"time"

"github.com/jackc/pgx/v4"
sqltracer "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"
)

var testConnConfig = func() *pgx.ConnConfig {
Expand All @@ -17,12 +19,12 @@ var testConnConfig = func() *pgx.ConnConfig {

const maxConn = 5

func openTestClientMaxConns(t testing.TB, maxConnections int) *Client {
func openTestClientMaxConns(t testing.TB, maxConnections int, openDB func(driver.Connector) *sql.DB) *Client {
connector, err := GetConnector("localhost", 5432, "qgtest", "", "qgtest")
if err != nil {
t.Fatal(err)
}
db := sql.OpenDB(connector)
db := openDB(connector)
// using stdlib, it's difficult to open max conn from the beginning
// if we want to open connections till its limit, need to use go routine to
// concurrently open connections
Expand All @@ -38,7 +40,13 @@ func openTestClientMaxConns(t testing.TB, maxConnections int) *Client {
}

func openTestClient(t testing.TB) *Client {
return openTestClientMaxConns(t, maxConn)
return openTestClientMaxConns(t, maxConn, sql.OpenDB)
}

func openTestClientWithTracer(t testing.TB) *Client {
return openTestClientMaxConns(t, maxConn, func(c driver.Connector) *sql.DB {
return sqltracer.OpenDB(c)
})
}

func truncateAndClose(c *Client) {
Expand Down
7 changes: 3 additions & 4 deletions work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package qg

import (
"context"
"database/sql"
"fmt"
"sync"
"testing"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

func TestLockJob(t *testing.T) {
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestJobConnRace(t *testing.T) {

// Test the race condition in LockJob
func TestLockJobAdvisoryRace(t *testing.T) {
c := openTestClientMaxConns(t, 4)
c := openTestClientMaxConns(t, 4, sql.OpenDB)
defer truncateAndClose(c)
ctx := context.Background()

Expand Down Expand Up @@ -349,8 +349,7 @@ func TestLockJobAdvisoryRace(t *testing.T) {
if err != nil {
panic(err)
}
conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
rawConn(conn, func(pgxConn *pgx.Conn) error {
ourBackendID = getBackendID(pgxConn)
return nil
})
Expand Down
31 changes: 31 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,37 @@ func TestWorkerWorkOne(t *testing.T) {
}
}

func TestWorkerWorkOneWithTracer(t *testing.T) {
c := openTestClientWithTracer(t)
defer truncateAndClose(c)

success := false
wm := WorkMap{
"MyJob": func(j *Job) error {
success = true
return nil
},
}
w := NewWorker(c, wm)

didWork := w.WorkOne()
if didWork {
t.Errorf("want didWork=false when no job was queued")
}

if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil {
t.Fatal(err)
}

didWork = w.WorkOne()
if !didWork {
t.Errorf("want didWork=true")
}
if !success {
t.Errorf("want success=true")
}
}

func TestWorkerShutdown(t *testing.T) {
c := openTestClient(t)
defer truncateAndClose(c)
Expand Down

0 comments on commit d45c478

Please sign in to comment.