Skip to content

Commit

Permalink
Add eth filter
Browse files Browse the repository at this point in the history
  • Loading branch information
dcb9 committed Sep 21, 2018
1 parent 4348be9 commit dd59271
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 71 deletions.
55 changes: 55 additions & 0 deletions pkg/eth/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package eth

import (
"math/big"
"sync"
"sync/atomic"
)

type FilterType int

const (
NewFilterTy FilterType = iota
NewBlockFilterTy
NewPendingTransactionFilterTy
)

type Filter struct {
ID uint64
Type FilterType
Request interface{}
LastBlockNum *big.Int
Data sync.Map
}

type FilterSimulator struct {
filters sync.Map
maxFilterID *uint64
}

func NewFilterSimulator() *FilterSimulator {
id := uint64(0)
return &FilterSimulator{
maxFilterID: &id,
}
}

func (f *FilterSimulator) New(ty FilterType, req ...interface{}) *Filter {
id := atomic.AddUint64(f.maxFilterID, 1)
filter := &Filter{ID: id, Type: ty}
if ty == NewFilterTy {
filter.Request = req[0]
}

f.filters.Store(id, filter)

return filter
}

func (f *FilterSimulator) Uninstall(filterID uint64) {
f.filters.Delete(filterID)
}

func (f *FilterSimulator) Filter(filterID uint64) (value interface{}, ok bool) {
return f.filters.Load(filterID)
}
36 changes: 33 additions & 3 deletions pkg/eth/rpc_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ func (r *UninstallFilterRequest) UnmarshalJSON(data []byte) error {
// the filter id
type GetFilterChangesRequest string

//For filters created with eth_newBlockFilter the return are block hashes (DATA, 32 Bytes),
// e.g. ["0x3454645634534..."].
type GetFilterChangesResponse []string
type GetFilterChangesResponse []interface{}

func (r *GetFilterChangesRequest) UnmarshalJSON(data []byte) error {
var params []string
Expand Down Expand Up @@ -387,3 +385,35 @@ func (r *GetBlockByNumberRequest) UnmarshalJSON(data []byte) error {

return nil
}

// ========== eth_newFilter ============= //

type NewFilterRequest struct {
FromBlock json.RawMessage `json:"fromBlock"`
ToBlock json.RawMessage `json:"toBlock"`
Address json.RawMessage `json:"address"`
Topics []interface{} `json:"topics"`
}

func (r *NewFilterRequest) UnmarshalJSON(data []byte) error {
var params []json.RawMessage
if err := json.Unmarshal(data, &params); err != nil {
return err
}

if len(params) == 0 {
return errors.New("params must be set")
}
type Req NewFilterRequest
var req Req

if err := json.Unmarshal(params[0], &req); err != nil {
return err
}

*r = NewFilterRequest(req)

return nil
}

type NewFilterResponse string
7 changes: 7 additions & 0 deletions pkg/qtum/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,10 @@ func (m *Method) Generate(blockNum int, maxTries *int) (resp GenerateResponse, e
err = m.Request(MethodGenerate, &req, &resp)
return
}

func (m *Method) SearchLogs(req *SearchLogsRequest) (receipts SearchLogsResponse, err error) {
if err := m.Request(MethodSearchLogs, req, &receipts); err != nil {
return nil, err
}
return
}
13 changes: 11 additions & 2 deletions pkg/qtum/rpc_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ type (
FromBlock *big.Int
ToBlock *big.Int
Addresses []string
Topics []interface{}
}

SearchLogsResponse []TransactionReceiptStruct
Expand All @@ -612,13 +613,21 @@ func (r *SearchLogsRequest) MarshalJSON() ([]byte, error) {
4. "topics" (string, optional) An array of values from which at least one must appear in the log entries. The order is important, if you want to leave topics out use null, e.g. ["null", "0x00..."].
5. "minconf" (uint, optional, default=0) Minimal number of confirmations before a log is returned
*/
return json.Marshal([]interface{}{
data := []interface{}{
r.FromBlock,
r.ToBlock,
map[string][]string{
"addresses": r.Addresses,
},
})
}

if len(r.Topics) > 0 {
data = append(data, map[string][]interface{}{
"topics": r.Topics,
})
}

return json.Marshal(data)
}

// ========== GetAccountInfo ============= //
Expand Down
2 changes: 1 addition & 1 deletion pkg/transformer/eth_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *ProxyETHCall) ToRequest(ethreq *eth.CallRequest) (*qtum.CallContractReq

return &qtum.CallContractRequest{
To: ethreq.To,
From: ethreq.From,
From: from,
Data: ethreq.Data,
GasLimit: gasLimit,
}, nil
Expand Down
128 changes: 118 additions & 10 deletions pkg/transformer/eth_getFilterChanges.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package transformer

import (
"encoding/json"
"math/big"

"github.com/pkg/errors"

"github.com/dcb9/janus/pkg/eth"
"github.com/dcb9/janus/pkg/qtum"
"github.com/dcb9/janus/pkg/utils"
Expand All @@ -12,7 +15,7 @@ import (
// ProxyETHGetFilterChanges implements ETHProxy
type ProxyETHGetFilterChanges struct {
*qtum.Qtum
blockFilter *BlockFilterSimulator
filter *eth.FilterSimulator
}

func (p *ProxyETHGetFilterChanges) Method() string {
Expand All @@ -25,20 +28,38 @@ func (p *ProxyETHGetFilterChanges) Request(rawreq *eth.JSONRPCRequest) (interfac
return nil, err
}

return p.request(&req)
filterID, err := hexutil.DecodeUint64(string(req))
if err != nil {
return nil, err
}

_filter, ok := p.filter.Filter(filterID)
if !ok {
return nil, errors.New("Invalid filter id")
}
filter := _filter.(*eth.Filter)

switch filter.Type {
case eth.NewFilterTy:
return p.requestFilter(filter)
case eth.NewBlockFilterTy:
return p.requestBlockFilter(filter)
case eth.NewPendingTransactionFilterTy:
fallthrough
default:

return nil, errors.New("Unknown filter type")
}
}

func (p *ProxyETHGetFilterChanges) request(ethreq *eth.GetFilterChangesRequest) (qtumresp eth.GetFilterChangesResponse, err error) {
func (p *ProxyETHGetFilterChanges) requestBlockFilter(filter *eth.Filter) (qtumresp eth.GetFilterChangesResponse, err error) {
qtumresp = make(eth.GetFilterChangesResponse, 0)
filterID, err := hexutil.DecodeUint64(string(*ethreq))
if err != nil {
return qtumresp, err
}

lastBlockNumber, err := p.blockFilter.GetBlockNumber(filterID)
if err != nil {
return qtumresp, err
_lastBlockNumber, ok := filter.Data.Load("lastBlockNumber")
if !ok {
return qtumresp, errors.New("Could not get lastBlockNumber")
}
lastBlockNumber := _lastBlockNumber.(uint64)

blockCountBigInt, err := p.GetBlockCount()
if err != nil {
Expand All @@ -61,5 +82,92 @@ func (p *ProxyETHGetFilterChanges) request(ethreq *eth.GetFilterChangesRequest)
}

qtumresp = hashes
filter.Data.Store("lastBlockNumber", blockCount)
return
}
func (p *ProxyETHGetFilterChanges) requestFilter(filter *eth.Filter) (qtumresp eth.GetFilterChangesResponse, err error) {
qtumresp = make(eth.GetFilterChangesResponse, 0)

_lastBlockNumber, ok := filter.Data.Load("lastBlockNumber")
if !ok {
return qtumresp, errors.New("Could not get lastBlockNumber")
}
lastBlockNumber := _lastBlockNumber.(uint64)

blockCountBigInt, err := p.GetBlockCount()
if err != nil {
return qtumresp, err
}
blockCount := blockCountBigInt.Uint64()

differ := blockCount - lastBlockNumber

if differ == 0 {
return eth.GetFilterChangesResponse{}, nil
}

searchLogsReq, err := p.toSearchLogsReq(filter, big.NewInt(int64(lastBlockNumber+1)), big.NewInt(int64(blockCount)))
if err != nil {
return nil, err
}

return p.doSearchLogs(searchLogsReq)
}

func (p *ProxyETHGetFilterChanges) doSearchLogs(req *qtum.SearchLogsRequest) (eth.GetFilterChangesResponse, error) {
resp, err := p.SearchLogs(req)
if err != nil {
return nil, err
}

receiptToResult := func(receipt *qtum.TransactionReceiptStruct) []interface{} {
logs := getEthLogs(receipt)
res := make([]interface{}, len(logs))
for i, _ := range res {
res[i] = logs[i]
}
return res
}
results := make(eth.GetFilterChangesResponse, 0)
for _, receipt := range resp {
r := qtum.TransactionReceiptStruct(receipt)
results = append(results, receiptToResult(&r)...)
}

return results, nil
}

func (p *ProxyETHGetFilterChanges) toSearchLogsReq(filter *eth.Filter, from, to *big.Int) (*qtum.SearchLogsRequest, error) {
ethreq := filter.Request.(*eth.NewFilterRequest)
var err error
var addresses []string
if ethreq.Address != nil {
if isString(ethreq.Address) {
var addr string
if err = json.Unmarshal(ethreq.Address, &addr); err != nil {
return nil, err
}
addresses = append(addresses, addr)
} else {
if err = json.Unmarshal(ethreq.Address, &addresses); err != nil {
return nil, err
}
}
for i, _ := range addresses {
addresses[i] = utils.RemoveHexPrefix(addresses[i])
}
}

qtumreq := &qtum.SearchLogsRequest{
Addresses: addresses,
FromBlock: from,
ToBlock: to,
}

topics, ok := filter.Data.Load("topics")
if ok {
qtumreq.Topics = topics.([]interface{})
}

return qtumreq, nil
}
10 changes: 10 additions & 0 deletions pkg/transformer/eth_getFilterLogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package transformer

// ProxyETHGetFilterLogs implements ETHProxy
type ProxyETHGetFilterLogs struct {
*ProxyETHGetFilterChanges
}

func (p *ProxyETHGetFilterLogs) Method() string {
return "eth_getFilterLogs"
}
4 changes: 2 additions & 2 deletions pkg/transformer/eth_getLogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (p *ProxyETHGetLogs) Request(rawreq *eth.JSONRPCRequest) (interface{}, erro
}

func (p *ProxyETHGetLogs) request(req *qtum.SearchLogsRequest) (*eth.GetLogsResponse, error) {
var receipts qtum.SearchLogsResponse
if err := p.Qtum.Request(qtum.MethodSearchLogs, req, &receipts); err != nil {
receipts, err := p.SearchLogs(req)
if err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit dd59271

Please sign in to comment.