From dd5927133dd3f0d390840d313d6f755d42f114c3 Mon Sep 17 00:00:00 2001 From: dcb9 Date: Tue, 11 Sep 2018 15:11:03 +0800 Subject: [PATCH] Add eth filter --- pkg/eth/filter.go | 55 ++++++++++ pkg/eth/rpc_types.go | 36 ++++++- pkg/qtum/method.go | 7 ++ pkg/qtum/rpc_types.go | 13 ++- pkg/transformer/eth_call.go | 2 +- pkg/transformer/eth_getFilterChanges.go | 128 ++++++++++++++++++++++-- pkg/transformer/eth_getFilterLogs.go | 10 ++ pkg/transformer/eth_getLogs.go | 4 +- pkg/transformer/eth_newBlockFilter.go | 51 +--------- pkg/transformer/eth_newFilter.go | 70 +++++++++++++ pkg/transformer/eth_uninstallFilter.go | 4 +- pkg/transformer/transformer.go | 12 ++- 12 files changed, 321 insertions(+), 71 deletions(-) create mode 100644 pkg/eth/filter.go create mode 100644 pkg/transformer/eth_getFilterLogs.go create mode 100644 pkg/transformer/eth_newFilter.go diff --git a/pkg/eth/filter.go b/pkg/eth/filter.go new file mode 100644 index 00000000..c30fa842 --- /dev/null +++ b/pkg/eth/filter.go @@ -0,0 +1,55 @@ +package eth + +import ( + "math/big" + "sync" + "sync/atomic" +) + +type FilterType int + +const ( + NewFilterTy FilterType = iota + NewBlockFilterTy + NewPendingTransactionFilterTy +) + +type Filter struct { + ID uint64 + Type FilterType + Request interface{} + LastBlockNum *big.Int + Data sync.Map +} + +type FilterSimulator struct { + filters sync.Map + maxFilterID *uint64 +} + +func NewFilterSimulator() *FilterSimulator { + id := uint64(0) + return &FilterSimulator{ + maxFilterID: &id, + } +} + +func (f *FilterSimulator) New(ty FilterType, req ...interface{}) *Filter { + id := atomic.AddUint64(f.maxFilterID, 1) + filter := &Filter{ID: id, Type: ty} + if ty == NewFilterTy { + filter.Request = req[0] + } + + f.filters.Store(id, filter) + + return filter +} + +func (f *FilterSimulator) Uninstall(filterID uint64) { + f.filters.Delete(filterID) +} + +func (f *FilterSimulator) Filter(filterID uint64) (value interface{}, ok bool) { + return f.filters.Load(filterID) +} diff --git a/pkg/eth/rpc_types.go b/pkg/eth/rpc_types.go index c030cb12..4edf60fe 100644 --- a/pkg/eth/rpc_types.go +++ b/pkg/eth/rpc_types.go @@ -291,9 +291,7 @@ func (r *UninstallFilterRequest) UnmarshalJSON(data []byte) error { // the filter id type GetFilterChangesRequest string -//For filters created with eth_newBlockFilter the return are block hashes (DATA, 32 Bytes), -// e.g. ["0x3454645634534..."]. -type GetFilterChangesResponse []string +type GetFilterChangesResponse []interface{} func (r *GetFilterChangesRequest) UnmarshalJSON(data []byte) error { var params []string @@ -387,3 +385,35 @@ func (r *GetBlockByNumberRequest) UnmarshalJSON(data []byte) error { return nil } + +// ========== eth_newFilter ============= // + +type NewFilterRequest struct { + FromBlock json.RawMessage `json:"fromBlock"` + ToBlock json.RawMessage `json:"toBlock"` + Address json.RawMessage `json:"address"` + Topics []interface{} `json:"topics"` +} + +func (r *NewFilterRequest) UnmarshalJSON(data []byte) error { + var params []json.RawMessage + if err := json.Unmarshal(data, ¶ms); err != nil { + return err + } + + if len(params) == 0 { + return errors.New("params must be set") + } + type Req NewFilterRequest + var req Req + + if err := json.Unmarshal(params[0], &req); err != nil { + return err + } + + *r = NewFilterRequest(req) + + return nil +} + +type NewFilterResponse string diff --git a/pkg/qtum/method.go b/pkg/qtum/method.go index ccf7667d..952c81b9 100644 --- a/pkg/qtum/method.go +++ b/pkg/qtum/method.go @@ -89,3 +89,10 @@ func (m *Method) Generate(blockNum int, maxTries *int) (resp GenerateResponse, e err = m.Request(MethodGenerate, &req, &resp) return } + +func (m *Method) SearchLogs(req *SearchLogsRequest) (receipts SearchLogsResponse, err error) { + if err := m.Request(MethodSearchLogs, req, &receipts); err != nil { + return nil, err + } + return +} diff --git a/pkg/qtum/rpc_types.go b/pkg/qtum/rpc_types.go index 192196d2..0d1fbc92 100644 --- a/pkg/qtum/rpc_types.go +++ b/pkg/qtum/rpc_types.go @@ -599,6 +599,7 @@ type ( FromBlock *big.Int ToBlock *big.Int Addresses []string + Topics []interface{} } SearchLogsResponse []TransactionReceiptStruct @@ -612,13 +613,21 @@ func (r *SearchLogsRequest) MarshalJSON() ([]byte, error) { 4. "topics" (string, optional) An array of values from which at least one must appear in the log entries. The order is important, if you want to leave topics out use null, e.g. ["null", "0x00..."]. 5. "minconf" (uint, optional, default=0) Minimal number of confirmations before a log is returned */ - return json.Marshal([]interface{}{ + data := []interface{}{ r.FromBlock, r.ToBlock, map[string][]string{ "addresses": r.Addresses, }, - }) + } + + if len(r.Topics) > 0 { + data = append(data, map[string][]interface{}{ + "topics": r.Topics, + }) + } + + return json.Marshal(data) } // ========== GetAccountInfo ============= // diff --git a/pkg/transformer/eth_call.go b/pkg/transformer/eth_call.go index f6d62513..eb13615a 100644 --- a/pkg/transformer/eth_call.go +++ b/pkg/transformer/eth_call.go @@ -57,7 +57,7 @@ func (p *ProxyETHCall) ToRequest(ethreq *eth.CallRequest) (*qtum.CallContractReq return &qtum.CallContractRequest{ To: ethreq.To, - From: ethreq.From, + From: from, Data: ethreq.Data, GasLimit: gasLimit, }, nil diff --git a/pkg/transformer/eth_getFilterChanges.go b/pkg/transformer/eth_getFilterChanges.go index 9c6c999d..ef501c1f 100644 --- a/pkg/transformer/eth_getFilterChanges.go +++ b/pkg/transformer/eth_getFilterChanges.go @@ -1,8 +1,11 @@ package transformer import ( + "encoding/json" "math/big" + "github.com/pkg/errors" + "github.com/dcb9/janus/pkg/eth" "github.com/dcb9/janus/pkg/qtum" "github.com/dcb9/janus/pkg/utils" @@ -12,7 +15,7 @@ import ( // ProxyETHGetFilterChanges implements ETHProxy type ProxyETHGetFilterChanges struct { *qtum.Qtum - blockFilter *BlockFilterSimulator + filter *eth.FilterSimulator } func (p *ProxyETHGetFilterChanges) Method() string { @@ -25,20 +28,38 @@ func (p *ProxyETHGetFilterChanges) Request(rawreq *eth.JSONRPCRequest) (interfac return nil, err } - return p.request(&req) + filterID, err := hexutil.DecodeUint64(string(req)) + if err != nil { + return nil, err + } + + _filter, ok := p.filter.Filter(filterID) + if !ok { + return nil, errors.New("Invalid filter id") + } + filter := _filter.(*eth.Filter) + + switch filter.Type { + case eth.NewFilterTy: + return p.requestFilter(filter) + case eth.NewBlockFilterTy: + return p.requestBlockFilter(filter) + case eth.NewPendingTransactionFilterTy: + fallthrough + default: + + return nil, errors.New("Unknown filter type") + } } -func (p *ProxyETHGetFilterChanges) request(ethreq *eth.GetFilterChangesRequest) (qtumresp eth.GetFilterChangesResponse, err error) { +func (p *ProxyETHGetFilterChanges) requestBlockFilter(filter *eth.Filter) (qtumresp eth.GetFilterChangesResponse, err error) { qtumresp = make(eth.GetFilterChangesResponse, 0) - filterID, err := hexutil.DecodeUint64(string(*ethreq)) - if err != nil { - return qtumresp, err - } - lastBlockNumber, err := p.blockFilter.GetBlockNumber(filterID) - if err != nil { - return qtumresp, err + _lastBlockNumber, ok := filter.Data.Load("lastBlockNumber") + if !ok { + return qtumresp, errors.New("Could not get lastBlockNumber") } + lastBlockNumber := _lastBlockNumber.(uint64) blockCountBigInt, err := p.GetBlockCount() if err != nil { @@ -61,5 +82,92 @@ func (p *ProxyETHGetFilterChanges) request(ethreq *eth.GetFilterChangesRequest) } qtumresp = hashes + filter.Data.Store("lastBlockNumber", blockCount) return } +func (p *ProxyETHGetFilterChanges) requestFilter(filter *eth.Filter) (qtumresp eth.GetFilterChangesResponse, err error) { + qtumresp = make(eth.GetFilterChangesResponse, 0) + + _lastBlockNumber, ok := filter.Data.Load("lastBlockNumber") + if !ok { + return qtumresp, errors.New("Could not get lastBlockNumber") + } + lastBlockNumber := _lastBlockNumber.(uint64) + + blockCountBigInt, err := p.GetBlockCount() + if err != nil { + return qtumresp, err + } + blockCount := blockCountBigInt.Uint64() + + differ := blockCount - lastBlockNumber + + if differ == 0 { + return eth.GetFilterChangesResponse{}, nil + } + + searchLogsReq, err := p.toSearchLogsReq(filter, big.NewInt(int64(lastBlockNumber+1)), big.NewInt(int64(blockCount))) + if err != nil { + return nil, err + } + + return p.doSearchLogs(searchLogsReq) +} + +func (p *ProxyETHGetFilterChanges) doSearchLogs(req *qtum.SearchLogsRequest) (eth.GetFilterChangesResponse, error) { + resp, err := p.SearchLogs(req) + if err != nil { + return nil, err + } + + receiptToResult := func(receipt *qtum.TransactionReceiptStruct) []interface{} { + logs := getEthLogs(receipt) + res := make([]interface{}, len(logs)) + for i, _ := range res { + res[i] = logs[i] + } + return res + } + results := make(eth.GetFilterChangesResponse, 0) + for _, receipt := range resp { + r := qtum.TransactionReceiptStruct(receipt) + results = append(results, receiptToResult(&r)...) + } + + return results, nil +} + +func (p *ProxyETHGetFilterChanges) toSearchLogsReq(filter *eth.Filter, from, to *big.Int) (*qtum.SearchLogsRequest, error) { + ethreq := filter.Request.(*eth.NewFilterRequest) + var err error + var addresses []string + if ethreq.Address != nil { + if isString(ethreq.Address) { + var addr string + if err = json.Unmarshal(ethreq.Address, &addr); err != nil { + return nil, err + } + addresses = append(addresses, addr) + } else { + if err = json.Unmarshal(ethreq.Address, &addresses); err != nil { + return nil, err + } + } + for i, _ := range addresses { + addresses[i] = utils.RemoveHexPrefix(addresses[i]) + } + } + + qtumreq := &qtum.SearchLogsRequest{ + Addresses: addresses, + FromBlock: from, + ToBlock: to, + } + + topics, ok := filter.Data.Load("topics") + if ok { + qtumreq.Topics = topics.([]interface{}) + } + + return qtumreq, nil +} diff --git a/pkg/transformer/eth_getFilterLogs.go b/pkg/transformer/eth_getFilterLogs.go new file mode 100644 index 00000000..1ce4149b --- /dev/null +++ b/pkg/transformer/eth_getFilterLogs.go @@ -0,0 +1,10 @@ +package transformer + +// ProxyETHGetFilterLogs implements ETHProxy +type ProxyETHGetFilterLogs struct { + *ProxyETHGetFilterChanges +} + +func (p *ProxyETHGetFilterLogs) Method() string { + return "eth_getFilterLogs" +} diff --git a/pkg/transformer/eth_getLogs.go b/pkg/transformer/eth_getLogs.go index 90fc2b75..f96bf29f 100644 --- a/pkg/transformer/eth_getLogs.go +++ b/pkg/transformer/eth_getLogs.go @@ -39,8 +39,8 @@ func (p *ProxyETHGetLogs) Request(rawreq *eth.JSONRPCRequest) (interface{}, erro } func (p *ProxyETHGetLogs) request(req *qtum.SearchLogsRequest) (*eth.GetLogsResponse, error) { - var receipts qtum.SearchLogsResponse - if err := p.Qtum.Request(qtum.MethodSearchLogs, req, &receipts); err != nil { + receipts, err := p.SearchLogs(req) + if err != nil { return nil, err } diff --git a/pkg/transformer/eth_newBlockFilter.go b/pkg/transformer/eth_newBlockFilter.go index 094863a2..a5064104 100644 --- a/pkg/transformer/eth_newBlockFilter.go +++ b/pkg/transformer/eth_newBlockFilter.go @@ -2,18 +2,16 @@ package transformer import ( "log" - "sync" "github.com/dcb9/janus/pkg/eth" "github.com/dcb9/janus/pkg/qtum" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/pkg/errors" ) // ProxyETHNewBlockFilter implements ETHProxy type ProxyETHNewBlockFilter struct { *qtum.Qtum - blockFilter *BlockFilterSimulator + filter *eth.FilterSimulator } func (p *ProxyETHNewBlockFilter) Method() string { @@ -38,49 +36,8 @@ func (p *ProxyETHNewBlockFilter) request() (eth.NewBlockFilterResponse, error) { }() } - id := p.blockFilter.New(blockCount.Int.Uint64()) + filter := p.filter.New(eth.NewBlockFilterTy) + filter.Data.Store("lastBlockNumber", blockCount.Uint64()) - return eth.NewBlockFilterResponse(hexutil.EncodeUint64(id)), nil -} - -// BlockFilterSimulatorr a map filter id => last block number -type BlockFilterSimulator struct { - filters sync.Map - filterIDMutex sync.Mutex - maxFilterID uint64 -} - -func (f *BlockFilterSimulator) New(blockNumber uint64) uint64 { - f.filterIDMutex.Lock() - f.maxFilterID++ - id := f.maxFilterID - f.filterIDMutex.Unlock() - - f.filters.Store(id, blockNumber) - - return id -} - -func (f *BlockFilterSimulator) SetBlockNumber(filterID uint64, blockNumber uint64) error { - if _, ok := f.filters.Load(filterID); !ok { - return errors.Errorf("filter id %d does not exist", filterID) - } - - f.filters.Store(filterID, blockNumber) - - return nil -} - -func (f *BlockFilterSimulator) GetBlockNumber(filterID uint64) (uint64, error) { - val, ok := f.filters.Load(filterID) - if !ok { - return 0, errors.Errorf("filter id %d does not exist", filterID) - } - blockNumber := val.(uint64) - - return blockNumber, nil -} - -func (f *BlockFilterSimulator) Uninstall(filterID uint64) { - f.filters.Delete(filterID) + return eth.NewBlockFilterResponse(hexutil.EncodeUint64(filter.ID)), nil } diff --git a/pkg/transformer/eth_newFilter.go b/pkg/transformer/eth_newFilter.go new file mode 100644 index 00000000..5a7423a0 --- /dev/null +++ b/pkg/transformer/eth_newFilter.go @@ -0,0 +1,70 @@ +package transformer + +import ( + "encoding/json" + + "github.com/dcb9/janus/pkg/utils" + + "math/big" + + "github.com/dcb9/go-ethereum/common/hexutil" + "github.com/dcb9/janus/pkg/eth" + "github.com/dcb9/janus/pkg/qtum" +) + +// ProxyETHNewFilter implements ETHProxy +type ProxyETHNewFilter struct { + *qtum.Qtum + filter *eth.FilterSimulator +} + +func (p *ProxyETHNewFilter) Method() string { + return "eth_newFilter" +} + +func (p *ProxyETHNewFilter) Request(rawreq *eth.JSONRPCRequest) (interface{}, error) { + var req eth.NewFilterRequest + if err := json.Unmarshal(rawreq.Params, &req); err != nil { + return nil, err + } + + return p.request(&req) +} + +func (p *ProxyETHNewFilter) request(ethreq *eth.NewFilterRequest) (eth.NewFilterResponse, error) { + var from *big.Int + var err error + if ethreq.FromBlock == nil { + from, err = getQtumBlockNumber([]byte("latest"), 0) + } else { + from, err = getQtumBlockNumber(ethreq.FromBlock, 0) + } + if err != nil { + return "", err + } + + filter := p.filter.New(eth.NewFilterTy, ethreq) + filter.Data.Store("lastBlockNumber", from.Uint64()) + + if len(ethreq.Topics) > 0 { + filter.Data.Store("topics", convertTopics(ethreq.Topics)) + } + + return eth.NewFilterResponse(hexutil.EncodeUint64(filter.ID)), nil +} + +func convertTopics(ethtopics []interface{}) []interface{} { + var topics []interface{} + for _, topic := range ethtopics { + switch topic.(type) { + case []interface{}: + topics = append(topics, convertTopics(topic.([]interface{}))) + case string: + topics = append(topics, utils.RemoveHexPrefix(topic.(string))) + case nil: + topics = append(topics, "null") + } + } + + return topics +} diff --git a/pkg/transformer/eth_uninstallFilter.go b/pkg/transformer/eth_uninstallFilter.go index 8780d249..17c4287f 100644 --- a/pkg/transformer/eth_uninstallFilter.go +++ b/pkg/transformer/eth_uninstallFilter.go @@ -9,7 +9,7 @@ import ( // ProxyETHUninstallFilter implements ETHProxy type ProxyETHUninstallFilter struct { *qtum.Qtum - blockFilter *BlockFilterSimulator + filter *eth.FilterSimulator } func (p *ProxyETHUninstallFilter) Method() string { @@ -32,7 +32,7 @@ func (p *ProxyETHUninstallFilter) request(ethreq *eth.UninstallFilterRequest) (e } // uninstall - p.blockFilter.Uninstall(id) + p.filter.Uninstall(id) return true, nil } diff --git a/pkg/transformer/transformer.go b/pkg/transformer/transformer.go index 0229bc8e..614f2315 100644 --- a/pkg/transformer/transformer.go +++ b/pkg/transformer/transformer.go @@ -74,7 +74,9 @@ func (t *Transformer) getProxy(rpcReq *eth.JSONRPCRequest) (ETHProxy, error) { } func DefaultProxies(qtumRPCClient *qtum.Qtum) []ETHProxy { - blockFilter := new(BlockFilterSimulator) + filter := eth.NewFilterSimulator() + getFilterChanges := &ProxyETHGetFilterChanges{Qtum: qtumRPCClient, filter: filter} + return []ETHProxy{ &ProxyETHCall{Qtum: qtumRPCClient}, &ProxyETHPersonalUnlockAccount{}, @@ -87,9 +89,11 @@ func DefaultProxies(qtumRPCClient *qtum.Qtum) []ETHProxy { &ProxyETHAccounts{Qtum: qtumRPCClient}, &ProxyETHGetCode{Qtum: qtumRPCClient}, - &ProxyETHNewBlockFilter{Qtum: qtumRPCClient, blockFilter: blockFilter}, - &ProxyETHGetFilterChanges{Qtum: qtumRPCClient, blockFilter: blockFilter}, - &ProxyETHUninstallFilter{Qtum: qtumRPCClient, blockFilter: blockFilter}, + &ProxyETHNewFilter{Qtum: qtumRPCClient, filter: filter}, + &ProxyETHNewBlockFilter{Qtum: qtumRPCClient, filter: filter}, + getFilterChanges, + &ProxyETHGetFilterLogs{ProxyETHGetFilterChanges: getFilterChanges}, + &ProxyETHUninstallFilter{Qtum: qtumRPCClient, filter: filter}, &ProxyETHEstimateGas{Qtum: qtumRPCClient}, &ProxyETHGetBlockByNumber{Qtum: qtumRPCClient},