Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support snapshot staker's cumulative earnings #413

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/database/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Client interface {
FindEpochAPYSnapshots(ctx context.Context, query schema.EpochAPYSnapshotQuery) ([]*schema.EpochAPYSnapshot, error)
SaveEpochAPYSnapshot(ctx context.Context, epochAPYSnapshots *schema.EpochAPYSnapshot) error
FindEpochAPYSnapshotsAverage(ctx context.Context) (decimal.Decimal, error)
FindStakerCumulativeEarningSnapshots(ctx context.Context, query schema.StakerCumulativeEarningSnapshotsQuery) ([]*schema.StakerCumulativeEarningSnapshot, error)
SaveStakerCumulativeEarningSnapshots(ctx context.Context, stakerCumulativeEarningSnapshots []*schema.StakerCumulativeEarningSnapshot) error

FindBridgeTransaction(ctx context.Context, query schema.BridgeTransactionQuery) (*schema.BridgeTransaction, error)
FindBridgeTransactions(ctx context.Context, query schema.BridgeTransactionsQuery) ([]*schema.BridgeTransaction, error)
Expand Down
109 changes: 107 additions & 2 deletions internal/database/dialer/cockroachdb/client_stake.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ func (c *client) FindStakeTransactions(ctx context.Context, query schema.StakeTr
databaseClient = databaseClient.Where(`"type" = ?`, query.Type)
}

if query.BlockTimestamp != nil {
databaseClient = databaseClient.Where(`"block_timestamp" >= ?`, query.BlockTimestamp)
if query.AfterBlockTimestamp != nil {
databaseClient = databaseClient.Where(`"block_timestamp" >= ?`, query.AfterBlockTimestamp)
}

if query.BlockNumber != nil {
databaseClient = databaseClient.Where(`"block_number" = ?`, *query.BlockNumber)
}

if query.Finalized != nil {
Expand Down Expand Up @@ -753,6 +757,107 @@ func (c *client) SaveStakerProfitSnapshots(ctx context.Context, snapshots []*sch
return c.database.WithContext(ctx).Clauses(onConflict).Create(&value).Error
}

func (c *client) FindStakerCumulativeEarningSnapshots(ctx context.Context, query schema.StakerCumulativeEarningSnapshotsQuery) ([]*schema.StakerCumulativeEarningSnapshot, error) {
databaseClient := c.database.WithContext(ctx).Table((*table.StakerCumulativeEarningSnapshot).TableName(nil))

if query.Cursor != nil {
databaseClient = databaseClient.Where(`"id" < ?`, query.Cursor)
}

if query.OwnerAddress != nil {
databaseClient = databaseClient.Where(`"owner_address" = ?`, query.OwnerAddress)
}

if query.EpochID != nil {
databaseClient = databaseClient.Where(`"epoch_id" = ?`, query.EpochID)
}

if query.BeforeDate != nil {
databaseClient = databaseClient.Where(`"date" <= ?`, query.BeforeDate)
}

if query.AfterDate != nil {
databaseClient = databaseClient.Where(`"date" >= ?`, query.AfterDate)
}

if query.EpochIDs != nil {
databaseClient = databaseClient.Where(`"epoch_id" IN ?`, query.EpochIDs)
}

if query.Limit != nil {
databaseClient = databaseClient.Limit(*query.Limit)
}

var rows []*table.StakerCumulativeEarningSnapshot

if len(query.Dates) > 0 {
var (
queries []string
values []interface{}
)

for _, date := range query.Dates {
queries = append(queries, `(SELECT * FROM "stake"."cumulative_earning_snapshots" WHERE "date" >= ? AND "owner_address" = ? ORDER BY "date" LIMIT 1)`)
values = append(values, date, query.OwnerAddress)
}

// Combine all queries with UNION ALL
fullQuery := strings.Join(queries, " UNION ALL ")

// Execute the combined query
if err := databaseClient.Raw(fullQuery, values...).Scan(&rows).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, database.ErrorRowNotFound
}

return nil, fmt.Errorf("find rows: %w", err)
}
} else {
if err := databaseClient.Order("epoch_id DESC, id DESC").Find(&rows).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, database.ErrorRowNotFound
}

return nil, fmt.Errorf("find rows: %w", err)
}
}

results := make([]*schema.StakerCumulativeEarningSnapshot, 0, len(rows))

for _, row := range rows {
result, err := row.Export()
if err != nil {
return nil, fmt.Errorf("export row: %w", err)
}

results = append(results, result)
}

return results, nil
}

func (c *client) SaveStakerCumulativeEarningSnapshots(ctx context.Context, snapshots []*schema.StakerCumulativeEarningSnapshot) error {
var value table.StakerCumulativeEarningSnapshots

if err := value.Import(snapshots); err != nil {
return fmt.Errorf("import staker cumulative earning snapshots: %w", err)
}

onConflict := clause.OnConflict{
Columns: []clause.Column{
{
Name: "owner_address",
},
{
Name: "epoch_id",
},
},
UpdateAll: true,
}

return c.database.WithContext(ctx).Clauses(onConflict).Create(&value).Error
}

func (c *client) DeleteStakeTransactionsByBlockNumber(ctx context.Context, blockNumber uint64) error {
return c.database.
WithContext(ctx).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE "stake"."cumulative_earning_snapshots"
(
"id" bigint GENERATED BY DEFAULT AS IDENTITY (INCREMENT 1 MINVALUE 0 START 0),
"date" timestamptz NOT NULL,
"epoch_id" bigint NOT NULL,
"owner_address" bytea NOT NULL,
"total_staking_cumulative_earnings" decimal NOT NULL,
"created_at" timestamptz NOT NULL DEFAULT now(),
"updated_at" timestamptz NOT NULL DEFAULT now(),

CONSTRAINT "pkey" PRIMARY KEY ("owner_address", "epoch_id")
);

CREATE INDEX "cumulative_earning_snapshots_date_idx" ON "stake"."cumulative_earning_snapshots" ("date");
CREATE INDEX "cumulative_earning_snapshots_epoch_id_idx" ON "stake"."cumulative_earning_snapshots" ("epoch_id" DESC);
CREATE INDEX "cumulative_earning_snapshots_id_idx" ON "stake"."cumulative_earning_snapshots" ("id" DESC);

-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE "stake"."cumulative_earning_snapshots";
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package table

import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/rss3-network/global-indexer/schema"
"github.com/shopspring/decimal"
)

type StakerCumulativeEarningSnapshot struct {
ID uint64 `gorm:"column:id"`
Date time.Time `gorm:"column:date"`
EpochID uint64 `gorm:"column:epoch_id"`
OwnerAddress common.Address `gorm:"column:owner_address"`
CumulativeEarning decimal.Decimal `gorm:"column:cumulative_earning"`
CreatedAt time.Time `gorm:"column:created_at"`
UpdatedAt time.Time `gorm:"column:updated_at"`
}

func (s *StakerCumulativeEarningSnapshot) TableName() string {
return "stake.cumulative_earning_snapshots"
}

func (s *StakerCumulativeEarningSnapshot) Import(snapshot schema.StakerCumulativeEarningSnapshot) error {
s.Date = snapshot.Date
s.EpochID = snapshot.EpochID
s.OwnerAddress = snapshot.OwnerAddress
s.CumulativeEarning = snapshot.CumulativeEarning
s.CreatedAt = snapshot.CreatedAt
s.UpdatedAt = snapshot.UpdatedAt

return nil
}

func (s *StakerCumulativeEarningSnapshot) Export() (*schema.StakerCumulativeEarningSnapshot, error) {
return &schema.StakerCumulativeEarningSnapshot{
ID: s.ID,
Date: s.Date,
EpochID: s.EpochID,
OwnerAddress: s.OwnerAddress,
CumulativeEarning: s.CumulativeEarning,
CreatedAt: s.CreatedAt,
UpdatedAt: s.UpdatedAt,
}, nil
}

type StakerCumulativeEarningSnapshots []StakerCumulativeEarningSnapshot

func (s *StakerCumulativeEarningSnapshots) Import(snapshots []*schema.StakerCumulativeEarningSnapshot) error {
for _, snapshot := range snapshots {
var imported StakerCumulativeEarningSnapshot

if err := imported.Import(*snapshot); err != nil {
return err
}

*s = append(*s, imported)
}

return nil
}

func (s *StakerCumulativeEarningSnapshots) Export() ([]*schema.StakerCumulativeEarningSnapshot, error) {
snapshots := make([]*schema.StakerCumulativeEarningSnapshot, 0)

for _, snapshot := range *s {
exported, err := snapshot.Export()
if err != nil {
return nil, err
}

snapshots = append(snapshots, exported)
}

return snapshots, nil
}
6 changes: 3 additions & 3 deletions internal/service/hub/handler/nta/stake_staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func (n *NTA) findStakerHistoryProfitSnapshots(ctx context.Context, owner common

// Calculate profit changes from staking transactions.
transactions, err := n.databaseClient.FindStakeTransactions(ctx, schema.StakeTransactionsQuery{
User: lo.ToPtr(owner),
BlockTimestamp: lo.ToPtr(blockTimestamp),
Order: "block_timestamp ASC",
User: lo.ToPtr(owner),
AfterBlockTimestamp: lo.ToPtr(blockTimestamp),
Order: "block_timestamp ASC",
})
if err != nil && !errors.Is(err, database.ErrorRowNotFound) {
return nil, fmt.Errorf("find stake transactions: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions internal/service/scheduler/snapshot/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
nodecount "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/node_count"
operatorprofit "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/operator_profit"
stakercount "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/staker_count"
stakercumulativeearnings "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/staker_cumulative_earnings"
stakerprofit "github.com/rss3-network/global-indexer/internal/service/scheduler/snapshot/staker_profit"
"github.com/sourcegraph/conc/pool"
)
Expand Down Expand Up @@ -67,6 +68,7 @@ func New(databaseClient database.Client, redis *redis.Client, ethereumClient *et
stakerprofit.New(databaseClient, redis, stakingContract),
operatorprofit.New(databaseClient, redis, stakingContract),
apy.New(databaseClient, redis, stakingContract),
stakercumulativeearnings.New(databaseClient, redis, stakingContract),
},
}, nil
}
Loading
Loading