From fa613e0628533e9943e89472996a031ab02e0fd3 Mon Sep 17 00:00:00 2001 From: polebug Date: Mon, 21 Oct 2024 17:25:54 +0800 Subject: [PATCH] feat: support snapshot staker's cumulative earnings --- internal/database/client.go | 2 + .../dialer/cockroachdb/client_stake.go | 109 ++++- ...ker_cumulative_earning_snapshots_table.sql | 25 ++ .../staker_cumulative_earning_snapshot.go | 77 ++++ .../service/hub/handler/nta/stake_staking.go | 6 +- internal/service/scheduler/snapshot/server.go | 2 + .../staker_cumulative_earnings.go | 392 ++++++++++++++++++ schema/stake_transaction.go | 23 +- schema/staker_cumulative_earning_snapshot.go | 29 ++ 9 files changed, 649 insertions(+), 16 deletions(-) create mode 100644 internal/database/dialer/cockroachdb/migration/20241009021234_add_staker_cumulative_earning_snapshots_table.sql create mode 100644 internal/database/dialer/cockroachdb/table/staker_cumulative_earning_snapshot.go create mode 100644 internal/service/scheduler/snapshot/staker_cumulative_earnings/staker_cumulative_earnings.go create mode 100644 schema/staker_cumulative_earning_snapshot.go diff --git a/internal/database/client.go b/internal/database/client.go index 1e6b3b15..fdd5f202 100644 --- a/internal/database/client.go +++ b/internal/database/client.go @@ -64,6 +64,8 @@ type Client interface { FindEpochAPYSnapshots(ctx context.Context, query schema.EpochAPYSnapshotQuery) ([]*schema.EpochAPYSnapshot, error) SaveEpochAPYSnapshot(ctx context.Context, epochAPYSnapshots *schema.EpochAPYSnapshot) error FindEpochAPYSnapshotsAverage(ctx context.Context) (decimal.Decimal, error) + FindStakerCumulativeEarningSnapshots(ctx context.Context, query schema.StakerCumulativeEarningSnapshotsQuery) ([]*schema.StakerCumulativeEarningSnapshot, error) + SaveStakerCumulativeEarningSnapshots(ctx context.Context, stakerCumulativeEarningSnapshots []*schema.StakerCumulativeEarningSnapshot) error FindBridgeTransaction(ctx context.Context, query schema.BridgeTransactionQuery) (*schema.BridgeTransaction, error) FindBridgeTransactions(ctx context.Context, query schema.BridgeTransactionsQuery) ([]*schema.BridgeTransaction, error) diff --git a/internal/database/dialer/cockroachdb/client_stake.go b/internal/database/dialer/cockroachdb/client_stake.go index 28ca4a79..65ac6da4 100644 --- a/internal/database/dialer/cockroachdb/client_stake.go +++ b/internal/database/dialer/cockroachdb/client_stake.go @@ -96,8 +96,12 @@ func (c *client) FindStakeTransactions(ctx context.Context, query schema.StakeTr databaseClient = databaseClient.Where(`"type" = ?`, query.Type) } - if query.BlockTimestamp != nil { - databaseClient = databaseClient.Where(`"block_timestamp" >= ?`, query.BlockTimestamp) + if query.AfterBlockTimestamp != nil { + databaseClient = databaseClient.Where(`"block_timestamp" >= ?`, query.AfterBlockTimestamp) + } + + if query.BlockNumber != nil { + databaseClient = databaseClient.Where(`"block_number" = ?`, *query.BlockNumber) } if query.Finalized != nil { @@ -753,6 +757,107 @@ func (c *client) SaveStakerProfitSnapshots(ctx context.Context, snapshots []*sch return c.database.WithContext(ctx).Clauses(onConflict).Create(&value).Error } +func (c *client) FindStakerCumulativeEarningSnapshots(ctx context.Context, query schema.StakerCumulativeEarningSnapshotsQuery) ([]*schema.StakerCumulativeEarningSnapshot, error) { + databaseClient := c.database.WithContext(ctx).Table((*table.StakerCumulativeEarningSnapshot).TableName(nil)) + + if query.Cursor != nil { + databaseClient = databaseClient.Where(`"id" < ?`, query.Cursor) + } + + if query.OwnerAddress != nil { + databaseClient = databaseClient.Where(`"owner_address" = ?`, query.OwnerAddress) + } + + if query.EpochID != nil { + databaseClient = databaseClient.Where(`"epoch_id" = ?`, query.EpochID) + } + + if query.BeforeDate != nil { + databaseClient = databaseClient.Where(`"date" <= ?`, query.BeforeDate) + } + + if query.AfterDate != nil { + databaseClient = databaseClient.Where(`"date" >= ?`, query.AfterDate) + } + + if query.EpochIDs != nil { + databaseClient = databaseClient.Where(`"epoch_id" IN ?`, query.EpochIDs) + } + + if query.Limit != nil { + databaseClient = databaseClient.Limit(*query.Limit) + } + + var rows []*table.StakerCumulativeEarningSnapshot + + if len(query.Dates) > 0 { + var ( + queries []string + values []interface{} + ) + + for _, date := range query.Dates { + queries = append(queries, `(SELECT * FROM "stake"."cumulative_earning_snapshots" WHERE "date" >= ? AND "owner_address" = ? ORDER BY "date" LIMIT 1)`) + values = append(values, date, query.OwnerAddress) + } + + // Combine all queries with UNION ALL + fullQuery := strings.Join(queries, " UNION ALL ") + + // Execute the combined query + if err := databaseClient.Raw(fullQuery, values...).Scan(&rows).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, database.ErrorRowNotFound + } + + return nil, fmt.Errorf("find rows: %w", err) + } + } else { + if err := databaseClient.Order("epoch_id DESC, id DESC").Find(&rows).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, database.ErrorRowNotFound + } + + return nil, fmt.Errorf("find rows: %w", err) + } + } + + results := make([]*schema.StakerCumulativeEarningSnapshot, 0, len(rows)) + + for _, row := range rows { + result, err := row.Export() + if err != nil { + return nil, fmt.Errorf("export row: %w", err) + } + + results = append(results, result) + } + + return results, nil +} + +func (c *client) SaveStakerCumulativeEarningSnapshots(ctx context.Context, snapshots []*schema.StakerCumulativeEarningSnapshot) error { + var value table.StakerCumulativeEarningSnapshots + + if err := value.Import(snapshots); err != nil { + return fmt.Errorf("import staker cumulative earning snapshots: %w", err) + } + + onConflict := clause.OnConflict{ + Columns: []clause.Column{ + { + Name: "owner_address", + }, + { + Name: "epoch_id", + }, + }, + UpdateAll: true, + } + + return c.database.WithContext(ctx).Clauses(onConflict).Create(&value).Error +} + func (c *client) DeleteStakeTransactionsByBlockNumber(ctx context.Context, blockNumber uint64) error { return c.database. WithContext(ctx). diff --git a/internal/database/dialer/cockroachdb/migration/20241009021234_add_staker_cumulative_earning_snapshots_table.sql b/internal/database/dialer/cockroachdb/migration/20241009021234_add_staker_cumulative_earning_snapshots_table.sql new file mode 100644 index 00000000..33566ad2 --- /dev/null +++ b/internal/database/dialer/cockroachdb/migration/20241009021234_add_staker_cumulative_earning_snapshots_table.sql @@ -0,0 +1,25 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE "stake"."cumulative_earning_snapshots" +( + "id" bigint GENERATED BY DEFAULT AS IDENTITY (INCREMENT 1 MINVALUE 0 START 0), + "date" timestamptz NOT NULL, + "epoch_id" bigint NOT NULL, + "owner_address" bytea NOT NULL, + "total_staking_cumulative_earnings" decimal NOT NULL, + "created_at" timestamptz NOT NULL DEFAULT now(), + "updated_at" timestamptz NOT NULL DEFAULT now(), + + CONSTRAINT "pkey" PRIMARY KEY ("owner_address", "epoch_id") +); + +CREATE INDEX "cumulative_earning_snapshots_date_idx" ON "stake"."cumulative_earning_snapshots" ("date"); +CREATE INDEX "cumulative_earning_snapshots_epoch_id_idx" ON "stake"."cumulative_earning_snapshots" ("epoch_id" DESC); +CREATE INDEX "cumulative_earning_snapshots_id_idx" ON "stake"."cumulative_earning_snapshots" ("id" DESC); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE "stake"."cumulative_earning_snapshots"; +-- +goose StatementEnd diff --git a/internal/database/dialer/cockroachdb/table/staker_cumulative_earning_snapshot.go b/internal/database/dialer/cockroachdb/table/staker_cumulative_earning_snapshot.go new file mode 100644 index 00000000..8eaac050 --- /dev/null +++ b/internal/database/dialer/cockroachdb/table/staker_cumulative_earning_snapshot.go @@ -0,0 +1,77 @@ +package table + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/rss3-network/global-indexer/schema" + "github.com/shopspring/decimal" +) + +type StakerCumulativeEarningSnapshot struct { + ID uint64 `gorm:"column:id"` + Date time.Time `gorm:"column:date"` + EpochID uint64 `gorm:"column:epoch_id"` + OwnerAddress common.Address `gorm:"column:owner_address"` + CumulativeEarning decimal.Decimal `gorm:"column:cumulative_earning"` + CreatedAt time.Time `gorm:"column:created_at"` + UpdatedAt time.Time `gorm:"column:updated_at"` +} + +func (s *StakerCumulativeEarningSnapshot) TableName() string { + return "stake.cumulative_earning_snapshots" +} + +func (s *StakerCumulativeEarningSnapshot) Import(snapshot schema.StakerCumulativeEarningSnapshot) error { + s.Date = snapshot.Date + s.EpochID = snapshot.EpochID + s.OwnerAddress = snapshot.OwnerAddress + s.CumulativeEarning = snapshot.CumulativeEarning + s.CreatedAt = snapshot.CreatedAt + s.UpdatedAt = snapshot.UpdatedAt + + return nil +} + +func (s *StakerCumulativeEarningSnapshot) Export() (*schema.StakerCumulativeEarningSnapshot, error) { + return &schema.StakerCumulativeEarningSnapshot{ + ID: s.ID, + Date: s.Date, + EpochID: s.EpochID, + OwnerAddress: s.OwnerAddress, + CumulativeEarning: s.CumulativeEarning, + CreatedAt: s.CreatedAt, + UpdatedAt: s.UpdatedAt, + }, nil +} + +type StakerCumulativeEarningSnapshots []StakerCumulativeEarningSnapshot + +func (s *StakerCumulativeEarningSnapshots) Import(snapshots []*schema.StakerCumulativeEarningSnapshot) error { + for _, snapshot := range snapshots { + var imported StakerCumulativeEarningSnapshot + + if err := imported.Import(*snapshot); err != nil { + return err + } + + *s = append(*s, imported) + } + + return nil +} + +func (s *StakerCumulativeEarningSnapshots) Export() ([]*schema.StakerCumulativeEarningSnapshot, error) { + snapshots := make([]*schema.StakerCumulativeEarningSnapshot, 0) + + for _, snapshot := range *s { + exported, err := snapshot.Export() + if err != nil { + return nil, err + } + + snapshots = append(snapshots, exported) + } + + return snapshots, nil +} diff --git a/internal/service/hub/handler/nta/stake_staking.go b/internal/service/hub/handler/nta/stake_staking.go index 6165605a..33c414a3 100644 --- a/internal/service/hub/handler/nta/stake_staking.go +++ b/internal/service/hub/handler/nta/stake_staking.go @@ -119,9 +119,9 @@ func (n *NTA) findStakerHistoryProfitSnapshots(ctx context.Context, owner common // Calculate profit changes from staking transactions. transactions, err := n.databaseClient.FindStakeTransactions(ctx, schema.StakeTransactionsQuery{ - User: lo.ToPtr(owner), - BlockTimestamp: lo.ToPtr(blockTimestamp), - Order: "block_timestamp ASC", + User: lo.ToPtr(owner), + AfterBlockTimestamp: lo.ToPtr(blockTimestamp), + Order: "block_timestamp ASC", }) if err != nil && !errors.Is(err, database.ErrorRowNotFound) { return nil, fmt.Errorf("find stake transactions: %w", err) diff --git a/internal/service/scheduler/snapshot/server.go b/internal/service/scheduler/snapshot/server.go index cbb13368..b7572b32 100644 --- a/internal/service/scheduler/snapshot/server.go +++ b/internal/service/scheduler/snapshot/server.go @@ -14,6 +14,7 @@ import ( nodecount "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/node_count" operatorprofit "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/operator_profit" stakercount "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/staker_count" + stakercumulativeearnings "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/staker_cumulative_earnings" stakerprofit "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/staker_profit" "github.com/sourcegraph/conc/pool" ) @@ -67,6 +68,7 @@ func New(databaseClient database.Client, redis *redis.Client, ethereumClient *et stakerprofit.New(databaseClient, redis, stakingContract), operatorprofit.New(databaseClient, redis, stakingContract), apy.New(databaseClient, redis, stakingContract), + stakercumulativeearnings.New(databaseClient, redis, stakingContract), }, }, nil } diff --git a/internal/service/scheduler/snapshot/staker_cumulative_earnings/staker_cumulative_earnings.go b/internal/service/scheduler/snapshot/staker_cumulative_earnings/staker_cumulative_earnings.go new file mode 100644 index 00000000..31b30ec7 --- /dev/null +++ b/internal/service/scheduler/snapshot/staker_cumulative_earnings/staker_cumulative_earnings.go @@ -0,0 +1,392 @@ +package stakercumulativeearnings + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/redis/go-redis/v9" + "github.com/rss3-network/global-indexer/common/ethereum" + stakingv2 "github.com/rss3-network/global-indexer/contract/l2/staking/v2" + "github.com/rss3-network/global-indexer/internal/cronjob" + "github.com/rss3-network/global-indexer/internal/database" + "github.com/rss3-network/global-indexer/internal/service" + "github.com/rss3-network/global-indexer/schema" + "github.com/samber/lo" + "github.com/shopspring/decimal" + "github.com/sourcegraph/conc/pool" + "go.uber.org/zap" +) + +var ( + Name = "staker_cumulative_earnings" + Timeout = 3 * time.Minute +) + +var _ service.Server = (*server)(nil) + +type server struct { + cronJob *cronjob.CronJob + databaseClient database.Client + redisClient *redis.Client + stakingContract *stakingv2.Staking +} + +func (s *server) Name() string { + return Name +} + +func (s *server) Spec() string { + return "0 */1 * * * *" // every minute +} + +func (s *server) Run(ctx context.Context) error { + err := s.cronJob.AddFunc(ctx, s.Spec(), func() { + // Query the latest epoch of the epoch events. + epochEvents, err := s.databaseClient.FindEpochs(ctx, &schema.FindEpochsQuery{Limit: lo.ToPtr(1)}) + if err != nil && !errors.Is(err, database.ErrorRowNotFound) { + zap.L().Error("find epochs", zap.Error(err)) + + return + } + + if len(epochEvents) == 0 { + return + } + + // Find the latest epoch of the staker cumulative earnings snapshots. + latestEpochSnapshot, err := s.findLatestStakerCumulativeEarningsSnapshot(ctx, epochEvents[0].ID) + if err != nil { + zap.L().Error("find latest staker cumulative earnings snapshot", zap.Error(err)) + + return + } + + // Save the staker cumulative earnings snapshots. + if latestEpochSnapshot < epochEvents[0].ID { + if err := s.saveStakerCumulativeEarningsSnapshots(ctx, latestEpochSnapshot, epochEvents[0].ID); err != nil { + zap.L().Error("save staker cumulative earnings snapshots", zap.Error(err)) + } + } + }) + + if err != nil { + return fmt.Errorf("add staker_cumulative_earnings cron job: %w", err) + } + + s.cronJob.Start() + defer s.cronJob.Stop() + + stopchan := make(chan os.Signal, 1) + + signal.Notify(stopchan, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) + <-stopchan + + return nil +} + +func (s *server) findLatestStakerCumulativeEarningsSnapshot(ctx context.Context, latestEpochID uint64) (uint64, error) { + // Check the latest epoch of the staker cumulative earning snapshots. + for epochID := latestEpochID; epochID > 0; epochID-- { + // Query the epoch of the staker cumulative earnings snapshots. + snapshots, err := s.databaseClient.FindStakerCumulativeEarningSnapshots(ctx, schema.StakerCumulativeEarningSnapshotsQuery{EpochID: lo.ToPtr(epochID)}) + if err != nil && !errors.Is(err, database.ErrorRowNotFound) { + zap.L().Error("find staker cumulative earnings snapshots", zap.Error(err)) + + return 0, err + } + + if len(snapshots) == 0 { + continue + } + + epochItems, err := s.databaseClient.FindEpochTransactions(ctx, epochID, 1, nil) + if err != nil { + zap.L().Error("find epoch transactions", zap.Error(err)) + + return 0, err + } + + stakerCount, err := s.databaseClient.FindStakerCount(ctx, schema.StakeChipsQuery{ + BlockNumber: epochItems[0].BlockNumber, + }) + if err != nil { + zap.L().Error("find staker count", zap.Error(err)) + + return 0, err + } + + if int64(len(snapshots)) < stakerCount { + continue + } + + return epochID, nil + } + + return 0, nil +} + +func (s *server) saveStakerCumulativeEarningsSnapshots(ctx context.Context, latestEpochSnapshot, latestEpochEvent uint64) error { + // Iterate the epoch id from the latest epoch snapshot to the latest epoch event. + for epochID := latestEpochSnapshot + 1; epochID <= latestEpochEvent; epochID++ { + if err := s.saveStakerCumulativeEarningsSnapshotsByEpochID(ctx, epochID); err != nil { + return fmt.Errorf("save staker profit snapshots by epoch id: %w", err) + } + } + + return nil +} + +// saveStakerCumulativeEarningsSnapshotsByEpochID saves the staker cumulative earnings snapshots by the epoch id. +// Cumulative earnings = last epoch cumulative earnings + current chip value changes. +// But it is worth noting that merged chips need to be handled specially. +func (s *server) saveStakerCumulativeEarningsSnapshotsByEpochID(ctx context.Context, epochID uint64) error { + // Fetch the epoch items by the epoch id. + epochItems, err := s.databaseClient.FindEpochTransactions(ctx, epochID, 1, nil) + if err != nil { + return fmt.Errorf("find epoch transactions: %w", err) + } + + if len(epochItems) == 0 { + return nil + } + + var cursor *big.Int + + for { + // Fetch the distinct stakers from the chips table. + findStakeChips := schema.StakeChipsQuery{ + Cursor: cursor, + Limit: lo.ToPtr(500), + DistinctOwner: true, + BlockNumber: epochItems[0].BlockNumber, + } + + stakers, err := s.databaseClient.FindStakeChips(ctx, findStakeChips) + if errors.Is(err, database.ErrorRowNotFound) || len(stakers) == 0 { + break + } + + if err != nil { + return fmt.Errorf("find stake chips: %w", err) + } + + snapshots := make([]*schema.StakerCumulativeEarningSnapshot, 0, len(stakers)) + + // Fetch the chips by the stakers. + for _, staker := range stakers { + staker := staker + + if staker.Owner == ethereum.AddressGenesis { + continue + } + + // Query the staker profit snapshots by the owner address and the epoch id. + exist, _ := s.databaseClient.FindStakerCumulativeEarningSnapshots(ctx, schema.StakerCumulativeEarningSnapshotsQuery{ + OwnerAddress: lo.ToPtr(staker.Owner), + EpochID: lo.ToPtr(epochID), + Limit: lo.ToPtr(1), + }) + if len(exist) > 0 { + continue + } + + data, err := s.buildSaveStakerCumulativeEarningsSnapshot(ctx, epochItems[0], staker.Owner) + if err != nil { + return fmt.Errorf("build staker profit snapshots: %w", err) + } + + snapshots = append(snapshots, data) + } + + // Save the staker profit snapshots. + if len(snapshots) > 0 { + if err := s.databaseClient.SaveStakerCumulativeEarningSnapshots(ctx, snapshots); err != nil { + return fmt.Errorf("save staker profit snapshots: %w", err) + } + } + + cursor = stakers[len(stakers)-1].ID + } + + return nil +} + +func (s *server) buildSaveStakerCumulativeEarningsSnapshot(ctx context.Context, currentEpoch *schema.Epoch, staker common.Address) (*schema.StakerCumulativeEarningSnapshot, error) { + var chipsCursor *big.Int + + profit := &schema.StakerCumulativeEarningSnapshot{ + EpochID: currentEpoch.ID, + OwnerAddress: staker, + Date: time.Unix(currentEpoch.BlockTimestamp, 0), + } + + for { + findStakeChips := schema.StakeChipsQuery{ + Owner: lo.ToPtr(staker), + Cursor: chipsCursor, + BlockNumber: currentEpoch.BlockNumber, + Limit: lo.ToPtr(200), + } + + // Fetch the chips by the stakers. + chips, err := s.databaseClient.FindStakeChips(ctx, findStakeChips) + if errors.Is(err, database.ErrorRowNotFound) || len(chips) == 0 { + break + } + + if err != nil { + return nil, fmt.Errorf("find stake chips: %w", err) + } + + var mutex sync.Mutex + + // Parallel processing the chips. + errorPool := pool.New().WithContext(ctx).WithMaxGoroutines(30).WithCancelOnError().WithFirstError() + + for _, chip := range chips { + chip := chip + + errorPool.Go(func(ctx context.Context) error { + // Query the previous profit by the owner address and the epoch id. + previousProfit, err := s.databaseClient.FindStakerCumulativeEarningSnapshots(ctx, schema.StakerCumulativeEarningSnapshotsQuery{ + OwnerAddress: lo.ToPtr(staker), + EpochID: lo.ToPtr(currentEpoch.ID - 1), + }) + if err != nil && !errors.Is(err, database.ErrorRowNotFound) { + zap.L().Error("find staker cumulative earning snapshots", zap.Error(err)) + + return fmt.Errorf("find staker cumulative earning snapshots: %w", err) + } + + if len(previousProfit) > 0 { + profit.CumulativeEarning = previousProfit[0].CumulativeEarning + } + + // Query the chip value by the current epoch. + chipInfo, err := s.stakingContract.GetChipInfo(&bind.CallOpts{Context: ctx, BlockNumber: currentEpoch.BlockNumber}, chip.ID) + if err != nil { + zap.L().Error("get chip info from chain", zap.Error(err), zap.String("chipID", chip.ID.String()), zap.Uint64("blockNumber", currentEpoch.BlockNumber.Uint64())) + + return fmt.Errorf("get chip info from chain: %w", err) + } + + previousChipTokens, err := s.calculatePreviousChipTokens(ctx, staker, currentEpoch.BlockNumber, chip.ID) + if err != nil { + return fmt.Errorf("calculate previous chip tokens: %w", err) + } + + if previousChipTokens != nil { + mutex.Lock() + profit.CumulativeEarning = profit.CumulativeEarning.Add(decimal.NewFromBigInt(new(big.Int).Sub(chipInfo.Tokens, previousChipTokens), 0)) + mutex.Unlock() + } + + return nil + }) + } + + if err := errorPool.Wait(); err != nil { + return nil, fmt.Errorf("parallel processing the chips: %w", err) + } + + chipsCursor = chips[len(chips)-1].ID + } + + return profit, nil +} + +func (s *server) calculatePreviousChipTokens(ctx context.Context, staker common.Address, blockNumber *big.Int, chipID *big.Int) (*big.Int, error) { + var previousChipTokens *big.Int + + // Query the chip value by the current block number - 1. + previousChipInfo, err := s.stakingContract.GetChipInfo(&bind.CallOpts{Context: ctx, BlockNumber: new(big.Int).Sub(blockNumber, big.NewInt(1))}, chipID) + if err != nil { + zap.L().Error("get chip info from chain", zap.Error(err)) + + return nil, fmt.Errorf("get chip info from chain: %w", err) + } + + if previousChipInfo.Tokens != nil && previousChipInfo.Tokens.Cmp(big.NewInt(0)) != 0 { + return previousChipInfo.Tokens, nil + } + + // If the previous chip value is nil or zero, it is possible that the chip is merged. + mergedChips, err := s.databaseClient.FindStakeTransactions(ctx, schema.StakeTransactionsQuery{ + User: lo.ToPtr(staker), + BlockNumber: lo.ToPtr(blockNumber.Uint64()), + }) + if err != nil && !errors.Is(err, database.ErrorRowNotFound) { + zap.L().Error("find stake transactions", zap.Error(err)) + + return nil, fmt.Errorf("find stake transactions: %w", err) + } + + if len(mergedChips) > 0 { + for _, mergedChip := range mergedChips { + var exist bool + + for _, mergedChipID := range mergedChip.ChipIDs { + if mergedChipID.Cmp(chipID) == 0 { + exist = true + + break + } + } + + if exist { + events, err := s.databaseClient.FindStakeEvents(ctx, schema.StakeEventsQuery{ + IDs: []common.Hash{mergedChip.ID}, + }) + if err != nil && !errors.Is(err, database.ErrorRowNotFound) { + zap.L().Error("find stake events", zap.Error(err)) + + return nil, fmt.Errorf("find stake events: %w", err) + } + + for _, event := range events { + var metadata schema.StakeEventChipsMergedMetadata + + if err := json.Unmarshal(event.Metadata, &metadata); err != nil { + return nil, fmt.Errorf("unmarshal stake event metadata: %w", err) + } + + if metadata.NewTokenID.Cmp(chipID) == 0 { + for _, burnedTokenID := range metadata.BurnedTokenIDs { + chipInfo, err := s.stakingContract.GetChipInfo(&bind.CallOpts{Context: ctx, BlockNumber: new(big.Int).Sub(blockNumber, big.NewInt(1))}, burnedTokenID) + if err != nil { + zap.L().Error("get chip info from chain", zap.Error(err)) + + return nil, fmt.Errorf("get chip info from chain: %w", err) + } + + previousChipTokens = new(big.Int).Add(previousChipTokens, chipInfo.Tokens) + } + + return previousChipTokens, nil + } + } + } + } + } + + return nil, nil +} + +func New(databaseClient database.Client, redisClient *redis.Client, stakingContract *stakingv2.Staking) service.Server { + return &server{ + cronJob: cronjob.New(redisClient, Name, Timeout), + databaseClient: databaseClient, + redisClient: redisClient, + stakingContract: stakingContract, + } +} diff --git a/schema/stake_transaction.go b/schema/stake_transaction.go index a68ac789..94563827 100644 --- a/schema/stake_transaction.go +++ b/schema/stake_transaction.go @@ -53,17 +53,18 @@ type StakeTransactionQuery struct { } type StakeTransactionsQuery struct { - Cursor *common.Hash - IDs []common.Hash - User *common.Address - Node *common.Address - Address *common.Address - Type *StakeTransactionType - BlockTimestamp *time.Time - Pending *bool - Limit int - Order string - Finalized *bool + Cursor *common.Hash + IDs []common.Hash + User *common.Address + Node *common.Address + Address *common.Address + Type *StakeTransactionType + AfterBlockTimestamp *time.Time + BlockNumber *uint64 + Pending *bool + Limit int + Order string + Finalized *bool } type StakeRecentCount struct { diff --git a/schema/staker_cumulative_earning_snapshot.go b/schema/staker_cumulative_earning_snapshot.go new file mode 100644 index 00000000..bd39152a --- /dev/null +++ b/schema/staker_cumulative_earning_snapshot.go @@ -0,0 +1,29 @@ +package schema + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/shopspring/decimal" +) + +type StakerCumulativeEarningSnapshot struct { + ID uint64 `json:"id"` + Date time.Time `json:"date"` + EpochID uint64 `json:"epoch_id"` + OwnerAddress common.Address `json:"owner_address"` + CumulativeEarning decimal.Decimal `json:"cumulative_earning"` + CreatedAt time.Time `json:"-"` + UpdatedAt time.Time `json:"-"` +} + +type StakerCumulativeEarningSnapshotsQuery struct { + Cursor *string `json:"cursor"` + Limit *int `json:"limit"` + OwnerAddress *common.Address `json:"owner_address"` + EpochID *uint64 `json:"epoch_id"` + EpochIDs []uint64 `json:"epoch_ids"` + Dates []time.Time `json:"dates"` + BeforeDate *time.Time `json:"before_date"` + AfterDate *time.Time `json:"after_date"` +}