Skip to content

Commit

Permalink
update components
Browse files Browse the repository at this point in the history
update nst_post_handler, use balance to replace balance-change
remove nst balance change cap to 'unlimit'(uint64) balance
recovery for 2nd phase data
  • Loading branch information
leonz789 committed Feb 27, 2025
1 parent 44c6f77 commit acfabb2
Show file tree
Hide file tree
Showing 22 changed files with 1,761 additions and 278 deletions.
4 changes: 2 additions & 2 deletions local_node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ imua:
grpc: 127.0.0.1:9090
ws: !!str ws://127.0.0.1:26657/websocket
rpc: !!str http://127.0.0.1:26657
debugger:
grpc: !!str :50051
#debugger:
# grpc: !!str :50051
EOF
)

Expand Down
5 changes: 3 additions & 2 deletions proto/imuachain/oracle/v1/rawdata_nst.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ message NSTKV {
// staker index for a nst defined on imuachain side
uint32 staker_index = 1;
// balance change since last update
int64 balance_change = 2;
int64 balance = 2;
}

// RawDataNST represents balance changes of all stakers for a NST
message RawDataNST {
uint64 version = 1;
// NSTKV use array to describe {staker_indx: balance_change} for all stakers whose balance had changed
repeated NSTKV nst_balance_changes = 1;
repeated NSTKV nst_balance_changes = 2;
}
33 changes: 33 additions & 0 deletions proto/imuachain/oracle/v1/two_phases.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";
package imuachain.oracle.v1;

option go_package = "github.com/imua-xyz/imuachain/x/oracle/types";

// Nonce is a message that contains a nonce for a feeder
message ValidatorIndex {
// FeederID is the ID of the feeder that corresponding to the nonce
string validator = 1;
// value is the nonce value
uint32 next_index = 2;
}

// ValidatorNonce is a message that contains the nonces for a validator
message FeederValidatorsIndex{
// nonces is the list of nonces for the feeders
repeated ValidatorIndex validator_index_list= 2;
}

message HashNode {
uint32 index = 1;
bytes hash = 2;
}

// Proof represents all hash nodes of a Mekle tree with indexes
message FlattenTree{
repeated HashNode nodes = 1;
}

message TreeInfo {
uint32 leaf_count = 1;
bytes root_hash = 2;
}
12 changes: 12 additions & 0 deletions x/oracle/keeper/common/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
sdkmath "cosmossdk.io/math"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
dogfoodkeeper "github.com/imua-xyz/imuachain/x/dogfood/keeper"
Expand Down Expand Up @@ -71,6 +72,17 @@ type KeeperOracle interface {
SetNonce(ctx sdk.Context, nonce types.ValidatorNonce)
GetSpecifiedAssetsPrice(ctx sdk.Context, assetID string) (types.Price, error)
GetMultipleAssetsPrices(ctx sdk.Context, assetIDs map[string]interface{}) (map[string]types.Price, error)

Setup2ndPhase(ctx sdk.Context, feederID uint64, validators []string, leafCount uint32, rootHash []byte)
Clear2ndPhase(ctx sdk.Context, feederID uint64, rootIndex uint32)
AddNodesToMerkleTree(ctx sdk.Context, feederID uint64, proof []*types.HashNode)
SetNextPieceIndexForFeeder(ctx sdk.Context, feederID uint64, pieceIndex uint32)
GetPostAggregation(feederID int64) (handler PostAggregationHandler, found bool)
SetRawDataPiece(ctx sdk.Context, feederID uint64, pieceIndex uint32, rawData []byte)
GetRawDataPieces(ctx sdk.Context, feederID uint64) ([][]byte, error)
GetFeederTreeInfo(ctx sdk.Context, feederID uint64) (uint32, []byte)
GetNodesFromMerkleTree(ctx sdk.Context, feederID uint64) []*types.HashNode
MustUnmarshal(bz []byte, ptr codec.ProtoMarshaler)
}

var _ KeeperDogfood = dogfoodkeeper.Keeper{}
Expand Down
3 changes: 2 additions & 1 deletion x/oracle/keeper/common/two_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import (
)

// the input data could be either rawData bytes of data with big size for non-price senarios or 'price' info

Check failure on line 7 in x/oracle/keeper/common/two_phases.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

`senarios` is a misspelling of `scenarios` (misspell)
type PostAggregationHandler func(data []byte, ctx sdk.Context, k KeeperOracle) error
// type PostAggregationHandler func(data []byte, ctx sdk.Context, k KeeperOracle) error
type PostAggregationHandler func(ctx sdk.Context, data []byte, feederID, roundID uint64, k KeeperOracle) error
3 changes: 1 addition & 2 deletions x/oracle/keeper/feedermanagement/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/ethereum/go-ethereum/common"
"github.com/imua-xyz/imuachain/x/oracle/types"
oracletypes "github.com/imua-xyz/imuachain/x/oracle/types"
)

Expand Down Expand Up @@ -127,7 +126,7 @@ func (c *caches) IsRule2PhasesByFeederID(feederID uint64) bool {
return c.isRule2PhasesByRule(rule)
}

func (c *caches) isRule2PhasesByRule(rule *types.RuleSource) bool {
func (c *caches) isRule2PhasesByRule(rule *oracletypes.RuleSource) bool {
// just check the format and don't care the verification here, the verification should be done by 'params' not in this memory calculator(feedermanager)
if len(rule.SourceIDs) == 1 && rule.SourceIDs[0] == 0 &&
rule.Nom != nil && len(rule.Nom.SourceIDs) == 1 {
Expand Down
140 changes: 119 additions & 21 deletions x/oracle/keeper/feedermanagement/feedermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func (f *FeederManager) BeginBlock(ctx sdk.Context) (recovered bool) {
// if the cache is nil and we are not in recovery mode, init the caches
if f.cs == nil {
var err error
recovered, err = f.recovery(ctx)
// it's safe to panic since this will only happen when the node is starting with something wrong in the store
recovered, err = f.recovery(ctx) // it's safe to panic since this will only happen when the node is starting with something wrong in the store

Check warning

Code scanning / CodeQL

Panic in BeginBock or EndBlock consensus methods Warning

path flow from Begin/EndBlock to a panic call
path flow from Begin/EndBlock to a panic call
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -284,19 +283,57 @@ func (f *FeederManager) commitRounds(ctx sdk.Context) {

// #nosec G115 // tokenID is index of slice
if updated := f.k.AppendPriceTR(ctx, uint64(r.tokenID), *priceCommit, finalPrice.DetID); !updated {
// failed to append price due to roundID gap, and this is a 'should-not-happen' case
f.k.GrowRoundID(ctx, uint64(r.tokenID), uint64(r.roundID))
// this is an 'impossible' case, we should not reach here
latestPrice, latestRoundID := f.k.GrowRoundID(ctx, uint64(r.tokenID), uint64(r.roundID))
logger.Error("failed to append price due to roundID gap and update this round with GrowRoundID", "feederID", r.feederID, "try-to-update-roundID", r.roundID, "try-to-update-price", priceCommit, "restul-latestPrice", latestPrice, "result-latestRoundID", latestRoundID)
} else {
fstr := strconv.FormatInt(feederID, 10)
successFeederIDs = append(successFeederIDs, fstr)

// set up for 2-phases aggregation
if r.twoPhases {
// no more validation check, they've all been done by previous process
lc, _ := strconv.ParseUint(finalPrice.DetID, 10, 32)
// set up mem-round for 2nd phase aggregation
r.m = oracletypes.NewMT(f.cs.RawDataPieceSize(), uint32(lc), []byte(finalPrice.Price))
// set up state for 2nd phase aggregation
// #nosec G115
f.k.Setup2ndPhase(ctx, uint64(r.feederID), f.cs.GetValidators(), uint32(lc), []byte(finalPrice.Price))
}
}

fstr := strconv.FormatInt(feederID, 10)
successFeederIDs = append(successFeederIDs, fstr) // there's no valid price for any round yet
} else {
logger.Error("We currently only support rules under oracle V1: only allow price from source Chainlink", "feederID", r.feederID)
}
}
// keep aggregator for possible 'handlQuotingMisBehavior' at quotingWindowEnd
r.status = roundStatusClosed
} else if r.twoPhases {
// check if r is 2-phases and rawData is completed, for 2nd-phase, the status of round must be closed
if r.m.CollectingRawData() {
if len(r.cachedProofForBlock) > 0 {
// #nosec G115
f.k.AddNodesToMerkleTree(ctx, uint64(r.feederID), r.cachedProofForBlock)
// reset cachedProofForBlock after commit to sate
r.cachedProofForBlock = nil
}
if LatestLeafIndex, ok := r.m.LatestLeafIndex(); ok {
// #nosec G115
f.k.SetNextPieceIndexForFeeder(ctx, uint64(r.feederID), LatestLeafIndex+1)
}
} else if rawData, ok := r.m.CompleteRawData(); ok {
// execute postHandler with rawData
if err := r.h(ctx, rawData, uint64(r.feederID), uint64(r.roundID), f.k); err != nil {
// just log the error and wait for next round to update
// TODO(leonz): this suites for NST, we can just wait for next round to update, but does it suites for commmon case ? should we do some other postHandling for this fail when it's not of NST case?
logger.Error("failed to execute postHandler for 2phases aggregation on consensus price", "feederID", r.feederID, "roundID", r.roundID, "consensus 1st-phase hash:%s", hex.EncodeToString(r.m.RootHash()))
}
// reset related cache from state
// #nosec G115
f.k.Clear2ndPhase(ctx, uint64(r.feederID), r.m.RootIndex())
r.m = nil
}
}

// close all quotingWindow to skip current rounds' 'handlQuotingMisBehavior'
if f.forceSeal {
r.closeQuotingWindow()
Expand Down Expand Up @@ -532,7 +569,8 @@ func (f *FeederManager) updateRoundsParamsAndAddNewRounds(ctx sdk.Context) {
logger.Info("[mem] add new round", "feederID", feederID, "height", height)
f.sortedFeederIDs = append(f.sortedFeederIDs, feederID)
twoPhases := f.cs.IsRule2PhasesByFeederID(uint64(feederID))
f.rounds[feederID] = newRound(feederID, tokenFeeder, int64(params.MaxNonce), f.cs, NewAggMedian(), twoPhases)
ph, _ := f.k.GetPostAggregation(feederID)
f.rounds[feederID] = newRound(feederID, tokenFeeder, int64(params.MaxNonce), f.cs, NewAggMedian(), twoPhases, ph)
}
}
f.sortedFeederIDs.sort()
Expand Down Expand Up @@ -663,6 +701,9 @@ func (f *FeederManager) ValidateMsg(msg *oracletypes.MsgCreatePrice) error {
}
}

if f.cs.IsRule2PhasesByFeederID(msg.FeederID) && msg.IsNotTwoPhases() {
return fmt.Errorf("feederID:%d is configured for 2-phases aggregation, but the message is not of 2-phases", msg.FeederID)
}
// extra check for message as 1st phase for 2-phases aggregation
if msg.IsPhaseOne() {
if len(msg.Prices) != 1 {
Expand All @@ -687,10 +728,11 @@ func (f *FeederManager) ValidateMsg(msg *oracletypes.MsgCreatePrice) error {
}

// we wait one more maxNonce blocks to make sure proposer getting expected txs in their mempool
// we don't use the last block of current round(which is the baseBlock of the next round), so the quotingWindow for 2nd-phase message is from [baseBlock+2*maxNonce, nextBaseBlock-1]
// #nosec G115 // maxNonce is positive
windowForPhaseTwo := f.cs.IntervalForFeederID(msg.FeederID) - uint64(f.cs.GetMaxNonce())*2
if leafCount > windowForPhaseTwo {
return fmt.Errorf("2-phases aggregation for feederID:%d, should have detID less than or equal to %d", msg.FeederID, windowForPhaseTwo)
if leafCount < 1 || leafCount > windowForPhaseTwo {
return fmt.Errorf("2-phases aggregation for feederID:%d, should have detID less than or equal to %d and be at least 1, got%d", msg.FeederID, windowForPhaseTwo, leafCount)
}
}
return nil
Expand All @@ -713,6 +755,26 @@ func (f *FeederManager) ProcessQuote(ctx sdk.Context, msg *oracletypes.MsgCreate
return nil, fmt.Errorf("round not exists for feederID:%d, proposer:%s", msgItem.FeederID, msgItem.Validator)
}

// TODO(leonz): remove this ?
if !r.twoPhases != msg.IsNotTwoPhases() {
// this should not happen, since message itself had been checked in 'validateMsg', when came to here it means there' something wront in 'round' initialization
return nil, fmt.Errorf("the 2phases status of round and message is mismatched, there's got something wrong with mem-round initialzation, feederID:%d", msg.FeederID)
}

if msg.IsPhaseTwo() {
// either there's no consensus price from 1st phase or the 2nd phase had collected all pieces, this condition will be true and we will reject the transaction
// also we don't record any 'miss' count under this same condition
if r.m == nil || r.m.Completed() {
return nil, fmt.Errorf("message with 2-nd phase for feederID:%d of round_%d is reject since that round is not collecting raw data", msg.FeederID, r.roundID)
}

// #nosec G115
if uint64(ctx.BlockHeight()) < r.roundPhaseTwoStartBlock {
return nil, fmt.Errorf("message with 2-nd phase for feederID:%d of round_%d can only be accept at block height of at least %d", msg.FeederID, r.roundID, r.roundPhaseTwoStartBlock)
}

}

// #nosec G115 // baseBlock is block height which is not negative
if valid := r.ValidQuotingBaseBlock(int64(msg.BasedBlock)); !valid {
return nil, fmt.Errorf("failed to process price-feed msg for feederID:%d, round is quoting:%t,quotingWindow is open:%t, expected baseBlock:%d, got baseBlock:%d", msgItem.FeederID, r.IsQuoting(), r.IsQuotingWindowOpen(), r.roundBaseBlock, msg.BasedBlock)
Expand Down Expand Up @@ -839,8 +901,10 @@ func (f *FeederManager) recovery(ctx sdk.Context) (bool, error) {
continue
}
tfID := int64(tfID)
// #nosec G115 // safe conversion
twoPhases := f.cs.IsRule2PhasesByFeederID(uint64(tfID))
f.rounds[tfID] = newRound(tfID, tf, int64(params.MaxNonce), f.cs, NewAggMedian(), twoPhases)
postHandler, _ := f.k.GetPostAggregation(tfID)
f.rounds[tfID] = newRound(tfID, tf, int64(params.MaxNonce), f.cs, NewAggMedian(), twoPhases, postHandler)
f.sortedFeederIDs.add(tfID)
}
f.prepareRounds(ctxReplay)
Expand Down Expand Up @@ -878,10 +942,36 @@ func (f *FeederManager) recovery(ctx sdk.Context) (bool, error) {

f.cs.SkipCommit()

pieceSize := f.cs.RawDataPieceSize()
// recovery for 2nd-phase state
for _, r := range f.rounds {
if r.twoPhases {
//reset r.m from state

Check failure on line 949 in x/oracle/keeper/feedermanagement/feedermanager.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
// #nosec G115
feederID := uint64(r.feederID)
leafCount, rootHash := f.k.GetFeederTreeInfo(ctx, uint64(r.feederID))
if leafCount == 0 {
continue
}
r.m = oracletypes.NewMT(pieceSize, leafCount, rootHash)
// rawdata
rawDataPieces, err := f.k.GetRawDataPieces(ctx, feederID)
if err != nil {
return false, err
}
r.m.SetRawDataPieces(rawDataPieces)
// proof nodes
// #nosec G115
nodes := f.k.GetNodesFromMerkleTree(ctx, uint64(r.feederID))
r.m.SetProofNodes(nodes)
}
}

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism

return true, nil
}

func (f *FeederManager) RoundIDToBaseBlock(feederID, roundID uint64) (uint64, bool) {
// #nosec G115
r, ok := f.rounds[int64(feederID)]
if !ok {
return 0, false
Expand All @@ -891,7 +981,7 @@ func (f *FeederManager) RoundIDToBaseBlock(feederID, roundID uint64) (uint64, bo

// BaseBlockToRoundID returns the roundID which the input baseblock indicates to, it is different to the roundID of which this baseBlock BelongsTo (+1)
func (f *FeederManager) BaseBlockToNextRoundID(feederID, baseBlock uint64) (uint64, bool) {
//TODO(leonz): use uint64 as f.rounds key
// TODO(leonz): use uint64 as f.rounds key
// #nosec G115
r, ok := f.rounds[int64(feederID)]
if !ok {
Expand Down Expand Up @@ -996,34 +1086,42 @@ func getProtoMsgItemFromQuote(msg *oracletypes.MsgCreatePrice) *oracletypes.MsgI
}
}

func (f *FeederManager) ProcessRawData(msg *oracletypes.MsgCreatePrice) error {
// ProcessRawData verify the submitted piece of rawData with proof against the expected root and cached the result if it passded the verification
// return (cached rawData piece, error)
func (f *FeederManager) ProcessRawData(msg *oracletypes.MsgCreatePrice) ([]byte, error) {
if err := f.ValidateMsg(msg); err != nil {
return nil, oracletypes.ErrInvalidMsg.Wrap(err.Error())
}
piece, ok := f.GetPieceWithProof(msg)
if !ok {
return errors.New("failed to parse rawdata piece from message")
return nil, errors.New("failed to parse rawdata piece from message")
}
// #nosec G115
r, ok := f.rounds[int64(msg.FeederID)]
if !ok {
// this should not happen
return fmt.Errorf("round for feederID:%d not exists", msg.FeederID)
return nil, fmt.Errorf("round for feederID:%d not exists", msg.FeederID)
}
if r.m != nil {
return fmt.Errorf("feederID %d is not collecting rawData", msg.FeederID)
if r.m == nil {
return nil, fmt.Errorf("feederID %d is not collecting rawData", msg.FeederID)
}
// we don't check the 1st return value to see if this input proof is of the minimal, that's the duty of anteHandler, and 'verified' pieceWithProof will not fail the tx execution
_, ok = r.m.VerifyAndCache(piece.Index, piece.RawData, piece.Proof)
cachedProof, ok := r.m.VerifyAndCache(piece.Index, piece.RawData, piece.Proof)
if !ok {
return fmt.Errorf("failed to verify piece of index %d provided within message for feederID:%d against root:%s", piece.Index, msg.FeederID, hex.EncodeToString(r.m.RootHash()))
return nil, fmt.Errorf("failed to verify piece of index %d provided within message for feederID:%d against root:%s", piece.Index, msg.FeederID, hex.EncodeToString(r.m.RootHash()))
}
// we don't need to cache the proof for state updating if the merkle tree have collected all rawData
if !r.m.Completed() {
r.cachedProofForBlock = append(r.cachedProofForBlock, cachedProof...)
}
// we don't do no state update in tx exexuting, the postHandler and all state update will be handled in EndBlock
// // post handle rawData registered for the feederID
// // clear all caching pieces from stateDB
//
// // remove/reset merkleTree
// // remove merkleTree
// persist piece for recovery (with memory-cache update into merkleTree)
// save this piece and proof to db for recovery, for nodes without running,
// this process only causes additional: two write to stateDB(piece, proof), one read from the stateDB(piece)

return nil
return piece.RawData, nil
}
4 changes: 2 additions & 2 deletions x/oracle/keeper/feedermanagement/feedermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func TestFeederManagement(t *testing.T) {
ps2 := ps1
fm2 := *fm

fm.rounds[1] = newRound(1, testdata.DefaultParamsForTest().TokenFeeders[1], 3, c, defaultAggMedian)
fm.rounds[1] = newRound(1, testdata.DefaultParamsForTest().TokenFeeders[1], 3, c, defaultAggMedian, false, nil)
fm.rounds[1].PrepareForNextBlock(20)
fm.sortedFeederIDs.add(1)
fm.rounds[1].a.ds.AddPriceSource(&ps1, big.NewInt(1), "v1")

fm2.rounds = make(map[int64]*round)
fm2.sortedFeederIDs = make([]int64, 0)
fm2.rounds[1] = newRound(1, testdata.DefaultParamsForTest().TokenFeeders[1], 3, c, defaultAggMedian)
fm2.rounds[1] = newRound(1, testdata.DefaultParamsForTest().TokenFeeders[1], 3, c, defaultAggMedian, false, nil)
fm2.rounds[1].PrepareForNextBlock(20)
fm2.sortedFeederIDs.add(1)
fm2.rounds[1].a.ds.AddPriceSource(&ps2, big.NewInt(1), "v1")
Expand Down
4 changes: 2 additions & 2 deletions x/oracle/keeper/feedermanagement/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func (t *Test) NewAggregator(filled bool) *aggregator {

func (t *Test) NewRound(cs CacheReader) *round {
feederID := r.Intn(len(params.TokenFeeders)-1) + 1
round := newRound(int64(feederID), params.TokenFeeders[feederID], int64(params.MaxNonce), cs, defaultAggMedian, false)
round := newRound(int64(feederID), params.TokenFeeders[feederID], int64(params.MaxNonce), cs, defaultAggMedian, false, nil)
return round
}

func (t *Test) NewRoundWithFeederID(cs CacheReader, feederID int64) *round {
round := newRound(feederID, params.TokenFeeders[feederID], int64(params.MaxNonce), cs, defaultAggMedian, false)
round := newRound(feederID, params.TokenFeeders[feederID], int64(params.MaxNonce), cs, defaultAggMedian, false, nil)
return round
}

Expand Down
Loading

0 comments on commit acfabb2

Please sign in to comment.