Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement endpoint to list off-chain data by hashes #68

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Factory interface {
// Client is the interface that defines the implementation of all the endpoints
type Client interface {
GetOffChainData(ctx context.Context, hash common.Hash) ([]byte, error)
ListOffChainData(ctx context.Context, hashes []common.Hash) (map[common.Hash][]byte, error)
SignSequence(signedSequence types.SignedSequence) ([]byte, error)
}

Expand Down Expand Up @@ -84,3 +85,27 @@ func (c *client) GetOffChainData(ctx context.Context, hash common.Hash) ([]byte,

return result, nil
}

// ListOffChainData returns data based on the given hashes
func (c *client) ListOffChainData(ctx context.Context, hashes []common.Hash) (map[common.Hash][]byte, error) {
response, err := rpc.JSONRPCCallWithContext(ctx, c.url, "sync_listOffChainData", hashes)
if err != nil {
return nil, err
}

if response.Error != nil {
return nil, fmt.Errorf("%v %v", response.Error.Code, response.Error.Message)
}

result := make(map[common.Hash]types.ArgBytes)
if err = json.Unmarshal(response.Result, &result); err != nil {
return nil, err
}

preparedResult := make(map[common.Hash][]byte)
for key, val := range result {
preparedResult[key] = val
}

return preparedResult, nil
}
78 changes: 78 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,81 @@ func TestClient_GetOffChainData(t *testing.T) {
})
}
}

func TestClient_ListOffChainData(t *testing.T) {
tests := []struct {
name string
hashes []common.Hash
result string
data map[common.Hash][]byte
statusCode int
err error
}{
{
name: "successfully got offhcain data",
hashes: []common.Hash{common.BytesToHash([]byte("hash"))},
result: fmt.Sprintf(`{"result":{"%s":"%s"}}`,
common.BytesToHash([]byte("hash")).Hex(), hex.EncodeToString([]byte("offchaindata"))),
data: map[common.Hash][]byte{
common.BytesToHash([]byte("hash")): []byte("offchaindata"),
},
},
{
name: "error returned by server",
hashes: []common.Hash{common.BytesToHash([]byte("hash"))},
result: `{"error":{"code":123,"message":"test error"}}`,
err: errors.New("123 test error"),
},
{
name: "invalid offchain data returned by server",
hashes: []common.Hash{common.BytesToHash([]byte("hash"))},
result: fmt.Sprintf(`{"result":{"%s":"invalid-signature"}}`,
common.BytesToHash([]byte("hash")).Hex()),
data: map[common.Hash][]byte{
common.BytesToHash([]byte("hash")): nil,
},
},
{
name: "unsuccessful status code returned by server",
hashes: []common.Hash{common.BytesToHash([]byte("hash"))},
statusCode: http.StatusUnauthorized,
err: errors.New("invalid status code, expected: 200, found: 401"),
},
}
for _, tt := range tests {
tt := tt

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var res rpc.Request
require.NoError(t, json.NewDecoder(r.Body).Decode(&res))
require.Equal(t, "sync_listOffChainData", res.Method)

var params [][]common.Hash
require.NoError(t, json.Unmarshal(res.Params, &params))
require.Equal(t, tt.hashes, params[0])

if tt.statusCode > 0 {
w.WriteHeader(tt.statusCode)
}

_, err := fmt.Fprint(w, tt.result)
require.NoError(t, err)
}))
defer svr.Close()

c := &client{url: svr.URL}

got, err := c.ListOffChainData(context.Background(), tt.hashes)
if tt.err != nil {
require.Error(t, err)
require.EqualError(t, tt.err, err.Error())
} else {
require.NoError(t, err)
require.Equal(t, tt.data, got)
}
})
}
}
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ func start(cliCtx *cli.Context) error {
[]rpc.Service{
{
Name: sync.APISYNC,
Service: sync.NewSyncEndpoints(storage),
Service: sync.NewEndpoints(storage),
},
{
Name: datacom.APIDATACOM,
Service: datacom.NewDataComEndpoints(
Service: datacom.NewEndpoints(
storage,
pk,
sequencerTracker,
Expand Down
49 changes: 49 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type DB interface {

Exists(ctx context.Context, key common.Hash) bool
GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error)
ListOffChainData(ctx context.Context, keys []common.Hash, dbTx sqlx.QueryerContext) (map[common.Hash]types.ArgBytes, error)
StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error
}

Expand Down Expand Up @@ -218,6 +219,54 @@ func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.
return common.FromHex(hexValue), nil
}

// ListOffChainData returns values identified by the given keys
func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash, dbTx sqlx.QueryerContext) (map[common.Hash]types.ArgBytes, error) {
if len(keys) == 0 {
return nil, nil
}

const listOffchainDataSQL = `
SELECT key, value
FROM data_node.offchain_data
WHERE key IN (?);
`

preparedKeys := make([]string, len(keys))
for i, key := range keys {
preparedKeys[i] = key.Hex()
}

query, args, err := sqlx.In(listOffchainDataSQL, preparedKeys)
if err != nil {
return nil, err
}

// sqlx.In returns queries with the `?` bindvar, we can rebind it for our backend
query = db.pg.Rebind(query)

rows, err := db.querier(dbTx).QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
}

defer rows.Close()

list := make(map[common.Hash]types.ArgBytes)
for rows.Next() {
data := struct {
Key string `db:"key"`
Value string `db:"value"`
}{}
if err = rows.StructScan(&data); err != nil {
return nil, err
}

list[common.HexToHash(data.Key)] = common.FromHex(data.Value)
}

return list, nil
}

func (db *pgDB) execer(dbTx sqlx.ExecerContext) sqlx.ExecerContext {
if dbTx != nil {
return dbTx
Expand Down
120 changes: 120 additions & 0 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"database/sql/driver"
"errors"
"testing"

Expand Down Expand Up @@ -478,6 +479,125 @@ func Test_DB_GetOffChainData(t *testing.T) {
}
}

func Test_DB_ListOffChainData(t *testing.T) {
testTable := []struct {
name string
od []types.OffChainData
keys []common.Hash
expected map[common.Hash]types.ArgBytes
sql string
returnErr error
}{
{
name: "successfully selected one value",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Value: []byte("value1"),
}},
keys: []common.Hash{
common.BytesToHash([]byte("key1")),
},
expected: map[common.Hash]types.ArgBytes{
common.BytesToHash([]byte("key1")): []byte("value1"),
},
sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`,
},
{
name: "successfully selected two values",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Value: []byte("value1"),
}, {
Key: common.HexToHash("key2"),
Value: []byte("value2"),
}},
keys: []common.Hash{
common.BytesToHash([]byte("key1")),
common.BytesToHash([]byte("key2")),
},
expected: map[common.Hash]types.ArgBytes{
common.BytesToHash([]byte("key1")): []byte("value1"),
common.BytesToHash([]byte("key2")): []byte("value2"),
},
sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\, \$2\)`,
},
{
name: "error returned",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Value: []byte("value1"),
}},
keys: []common.Hash{
common.BytesToHash([]byte("key1")),
},
sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`,
returnErr: errors.New("test error"),
},
{
name: "no rows",
od: []types.OffChainData{{
Key: common.HexToHash("key1"),
Value: []byte("value1"),
}},
keys: []common.Hash{
common.BytesToHash([]byte("underfined")),
},
sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`,
returnErr: ErrStateNotSynchronized,
},
}

for _, tt := range testTable {
tt := tt

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)

defer db.Close()

wdb := sqlx.NewDb(db, "postgres")

// Seed data
seedOffchainData(t, wdb, mock, tt.od)

preparedKeys := make([]driver.Value, len(tt.keys))
for i, key := range tt.keys {
preparedKeys[i] = key.Hex()
}

expected := mock.ExpectQuery(tt.sql).
WithArgs(preparedKeys...)

if tt.returnErr != nil {
expected.WillReturnError(tt.returnErr)
} else {
returnData := sqlmock.NewRows([]string{"key", "value"})

for key, val := range tt.expected {
returnData = returnData.AddRow(key.Hex(), common.Bytes2Hex(val))
}

expected.WillReturnRows(returnData)
}

dbPG := New(wdb)

data, err := dbPG.ListOffChainData(context.Background(), tt.keys, wdb)
if tt.returnErr != nil {
require.ErrorIs(t, err, tt.returnErr)
} else {
require.NoError(t, err)
require.Equal(t, tt.expected, data)
}

require.NoError(t, mock.ExpectationsWereMet())
})
}
}

func Test_DB_Exist(t *testing.T) {
testTable := []struct {
name string
Expand Down
59 changes: 59 additions & 0 deletions mocks/client.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading