Skip to content

Commit

Permalink
fix: batching (#17)
Browse files Browse the repository at this point in the history
* fix: segfault on Put or Delete for getTransaction error

* fix: batching

* refactor: cache queries

* fix: use single connection for commit

* fix: use shorter option name

* docs: readme updates
  • Loading branch information
Alan Shaw authored May 13, 2020
1 parent 5d3312a commit 3edef88
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 119 deletions.
45 changes: 43 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,61 @@
# SQL Datastore

[![CircleCI](https://circleci.com/gh/ipfs/go-ds-sql.svg?style=shield)](https://circleci.com/gh/ipfs/go-ds-sql)
[![Coverage](https://codecov.io/gh/ipfs/go-ds-sql/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/go-ds-sql)
[![Standard README](https://img.shields.io/badge/readme%20style-standard-brightgreen.svg)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](http://img.shields.io/badge/godoc-reference-5272B4.svg)](https://godoc.org/github.com/ipfs/go-ds-sql)
[![golang version](https://img.shields.io/badge/golang-%3E%3D1.14.0-orange.svg)](https://golang.org/)
[![Go Report Card](https://goreportcard.com/badge/github.com/ipfs/go-ds-sql)](https://goreportcard.com/report/github.com/ipfs/go-ds-sql)

An implementation of [the datastore interface](https://github.com/ipfs/go-datastore)
that can be backed by any sql database.

## Install

```sh
go get github.com/ipfs/go-ds-sql
```

## Usage

Ensure a database is created and a table exists with `key` and `data` columns. For example, in PostgreSQL you can create a table with the following structure (replacing `table_name` with the name of the table the datastore will use - by default this is `blocks`):

```sql
CREATE TABLE IF NOT EXISTS table_name (key TEXT NOT NULL UNIQUE, data BYTEA)
```

It's recommended to create an index on the `key` column that is optimised for prefix scans. For example, in PostgreSQL you can create a `text_pattern_ops` index on the table:

```sql
CREATE INDEX IF NOT EXISTS table_name_key_text_pattern_ops_idx ON table_name (key text_pattern_ops)
```

Import and use in your application:

```go
import (
"database/sql"
"github.com/ipfs/go-ds-sql"
pg "github.com/ipfs/go-ds-sql/postgres"
)

mydb, _ := sql.Open("yourdb", "yourdbparameters")

ds := sqlds.NewDatastore(mydb)
// Implement the Queries interface for your SQL impl.
// ...or use the provided PostgreSQL queries
queries := pg.NewQueries("blocks")

ds := sqlds.NewDatastore(mydb, queries)
```

## API

[GoDoc Reference](https://godoc.org/github.com/ipfs/go-ds-sql)

## Contribute

Feel free to dive in! [Open an issue](https://github.com/ipfs/go-ds-sql/issues/new) or submit PRs.

## License
MIT

[MIT](LICENSE)
65 changes: 65 additions & 0 deletions batching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package sqlds

import (
"context"

ds "github.com/ipfs/go-datastore"
)

type op struct {
delete bool
value []byte
}

type batch struct {
ds *Datastore
ops map[ds.Key]op
}

// Batch creates a set of deferred updates to the database.
// Since SQL does not support a true batch of updates,
// operations are buffered and then executed sequentially
// over a single connection when Commit is called.
func (d *Datastore) Batch() (ds.Batch, error) {
return &batch{
ds: d,
ops: make(map[ds.Key]op),
}, nil
}

func (bt *batch) Put(key ds.Key, val []byte) error {
bt.ops[key] = op{value: val}
return nil
}

func (bt *batch) Delete(key ds.Key) error {
bt.ops[key] = op{delete: true}
return nil
}

func (bt *batch) Commit() error {
return bt.CommitContext(context.Background())
}

func (bt *batch) CommitContext(ctx context.Context) error {
conn, err := bt.ds.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

for k, op := range bt.ops {
if op.delete {
_, err = conn.ExecContext(ctx, bt.ds.queries.Delete(), k.String())
} else {
_, err = conn.ExecContext(ctx, bt.ds.queries.Put(), k.String(), op.value)
}
if err != nil {
break
}
}

return err
}

var _ ds.Batching = (*Datastore)(nil)
114 changes: 17 additions & 97 deletions dstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package sqlds

import (
"database/sql"
"errors"
"fmt"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)

// Queries generates SQL queries for datastore operations.
type Queries interface {
Delete() string
Exists() string
Expand All @@ -21,118 +21,33 @@ type Queries interface {
GetSize() string
}

// Datastore is a SQL backed datastore.
type Datastore struct {
db *sql.DB
queries Queries
}

// NewDatastore returns a new datastore
// NewDatastore returns a new SQL datastore.
func NewDatastore(db *sql.DB, queries Queries) *Datastore {
return &Datastore{db: db, queries: queries}
}

type batch struct {
db *sql.DB
queries Queries
txn *sql.Tx
}

func (b *batch) GetTransaction() (*sql.Tx, error) {
if b.txn != nil {
return b.txn, nil
}

newTransaction, err := b.db.Begin()
if err != nil {
if newTransaction != nil {
// nothing we can do about this error.
_ = newTransaction.Rollback()
}

return nil, err
}

b.txn = newTransaction
return newTransaction, nil
}

func (b *batch) Put(key ds.Key, val []byte) error {
txn, err := b.GetTransaction()
if err != nil {
_ = b.txn.Rollback()
return err
}

_, err = txn.Exec(b.queries.Put(), key.String(), val)
if err != nil {
_ = b.txn.Rollback()
return err
}

return nil
}

func (b *batch) Delete(key ds.Key) error {
txn, err := b.GetTransaction()
if err != nil {
_ = b.txn.Rollback()
return err
}

_, err = txn.Exec(b.queries.Delete(), key.String())
if err != nil {
_ = b.txn.Rollback()
return err
}

return err
}

func (b *batch) Commit() error {
if b.txn == nil {
return errors.New("no transaction started, cannot commit")
}
var err = b.txn.Commit()
if err != nil {
_ = b.txn.Rollback()
return err
}

return nil
}

func (d *Datastore) Batch() (ds.Batch, error) {
batch := &batch{
db: d.db,
queries: d.queries,
txn: nil,
}

return batch, nil
}

// Close closes the underying SQL database.
func (d *Datastore) Close() error {
return d.db.Close()
}

// Delete removes a row from the SQL database by the given key.
func (d *Datastore) Delete(key ds.Key) error {
result, err := d.db.Exec(d.queries.Delete(), key.String())
_, err := d.db.Exec(d.queries.Delete(), key.String())
if err != nil {
return err
}

rows, err := result.RowsAffected()
if err != nil {
return err
}

if rows == 0 {
return ds.ErrNotFound
}

return nil
}

// Get retrieves a value from the SQL database by the given key.
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
row := d.db.QueryRow(d.queries.Get(), key.String())
var out []byte
Expand All @@ -147,6 +62,7 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
}
}

// Has determines if a value for the given key exists in the SQL database.
func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
row := d.db.QueryRow(d.queries.Exists(), key.String())

Expand All @@ -160,6 +76,7 @@ func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
}
}

// Put "upserts" a row into the SQL database.
func (d *Datastore) Put(key ds.Key, value []byte) error {
_, err := d.db.Exec(d.queries.Put(), key.String(), value)
if err != nil {
Expand All @@ -169,8 +86,9 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return nil
}

// Query returns multiple rows from the SQL database based on the passed query parameters.
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
raw, err := d.RawQuery(q)
raw, err := d.rawQuery(q)
if err != nil {
return nil, err
}
Expand All @@ -194,11 +112,11 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return raw, nil
}

func (d *Datastore) RawQuery(q dsq.Query) (dsq.Results, error) {
func (d *Datastore) rawQuery(q dsq.Query) (dsq.Results, error) {
var rows *sql.Rows
var err error

rows, err = QueryWithParams(d, q)
rows, err = queryWithParams(d, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -236,10 +154,12 @@ func (d *Datastore) RawQuery(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsFromIterator(q, it), nil
}

// Sync is noop for SQL databases.
func (d *Datastore) Sync(key ds.Key) error {
return nil
}

// GetSize determines the size in bytes of the value for a given key.
func (d *Datastore) GetSize(key ds.Key) (int, error) {
row := d.db.QueryRow(d.queries.GetSize(), key.String())
var size int
Expand All @@ -254,8 +174,8 @@ func (d *Datastore) GetSize(key ds.Key) (int, error) {
}
}

// QueryWithParams applies prefix, limit, and offset params in pg query
func QueryWithParams(d *Datastore, q dsq.Query) (*sql.Rows, error) {
// queryWithParams applies prefix, limit, and offset params in pg query
func queryWithParams(d *Datastore, q dsq.Query) (*sql.Rows, error) {
var qNew = d.queries.Query()

if q.Prefix != "" {
Expand Down
Loading

0 comments on commit 3edef88

Please sign in to comment.