diff --git a/cmd/soroban-rpc/internal/events/cursor.go b/cmd/soroban-rpc/internal/events/cursor.go index 9f37b5130..3fbfbecb8 100644 --- a/cmd/soroban-rpc/internal/events/cursor.go +++ b/cmd/soroban-rpc/internal/events/cursor.go @@ -20,6 +20,8 @@ type Cursor struct { // Tx is the index of the transaction within the ledger which emitted the event. Tx uint32 // Op is the index of the operation within the transaction which emitted the event. + // Note: Currently, there is no use for it (events are transaction-wide and not operation-specific) + // but we keep it in order to make the API future-proof. Op uint32 // Event is the index of the event within in the operation which emitted the event. Event uint32 diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index 0c5fdc839..e2c9030bf 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -15,24 +15,16 @@ import ( "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow" ) -type bucket struct { - ledgerSeq uint32 - ledgerCloseTimestamp int64 - events []event -} - type event struct { - contents xdr.DiagnosticEvent - txIndex uint32 - opIndex uint32 - eventIndex uint32 + diagnosticEventXDR []byte + txIndex uint32 + eventIndex uint32 } func (e event) cursor(ledgerSeq uint32) Cursor { return Cursor{ Ledger: ledgerSeq, Tx: e.txIndex, - Op: e.opIndex, Event: e.eventIndex, } } @@ -129,7 +121,12 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, if eventRange.End.Cmp(cur) <= 0 { return lastLedgerInWindow, nil } - if !f(event.contents, cur, timestamp) { + var diagnosticEvent xdr.DiagnosticEvent + err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent) + if err != nil { + return 0, err + } + if !f(diagnosticEvent, cur, timestamp) { return lastLedgerInWindow, nil } } @@ -201,7 +198,9 @@ func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error { BucketContent: events, } m.lock.Lock() - m.eventsByLedger.Append(bucket) + if _, err = m.eventsByLedger.Append(bucket); err != nil { + return err + } m.lock.Unlock() m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}). Observe(time.Since(startTime).Seconds()) @@ -241,15 +240,14 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) ( return nil, err } for index, e := range txEvents { + diagnosticEventXDR, err := e.MarshalBinary() + if err != nil { + return nil, err + } events = append(events, event{ - contents: e, - txIndex: tx.Index, - // NOTE: we cannot really index by operation since all events - // are provided as part of the transaction. However, - // that shouldn't matter in practice since a transaction - // can only contain a single Host Function Invocation. - opIndex: 0, - eventIndex: uint32(index), + diagnosticEventXDR: diagnosticEventXDR, + txIndex: tx.Index, + eventIndex: uint32(index), }) } } diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go index 9f6a3fe0d..fc4d3c6b9 100644 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -1,6 +1,7 @@ package events import ( + "bytes" "testing" "github.com/stellar/go/xdr" @@ -13,24 +14,24 @@ import ( var ( ledger5CloseTime = ledgerCloseTime(5) ledger5Events = []event{ - newEvent(1, 0, 0, 100), - newEvent(1, 0, 1, 200), - newEvent(2, 0, 0, 300), - newEvent(2, 1, 0, 400), + newEvent(1, 0, 100), + newEvent(1, 1, 200), + newEvent(2, 0, 300), + newEvent(2, 1, 400), } ledger6CloseTime = ledgerCloseTime(6) ledger6Events []event = nil ledger7CloseTime = ledgerCloseTime(7) ledger7Events = []event{ - newEvent(1, 0, 0, 500), + newEvent(1, 0, 500), } ledger8CloseTime = ledgerCloseTime(8) ledger8Events = []event{ - newEvent(1, 0, 0, 600), - newEvent(2, 0, 0, 700), - newEvent(2, 0, 1, 800), - newEvent(2, 0, 2, 900), - newEvent(2, 1, 0, 1000), + newEvent(1, 0, 600), + newEvent(2, 0, 700), + newEvent(2, 1, 800), + newEvent(2, 2, 900), + newEvent(2, 3, 1000), } ) @@ -38,43 +39,39 @@ func ledgerCloseTime(seq uint32) int64 { return int64(seq)*25 + 100 } -func newEvent(txIndex, opIndex, eventIndex, val uint32) event { +func newEvent(txIndex, eventIndex, val uint32) event { v := xdr.Uint32(val) - return event{ - contents: xdr.DiagnosticEvent{ - InSuccessfulContractCall: true, - Event: xdr.ContractEvent{ - Type: xdr.ContractEventTypeSystem, - Body: xdr.ContractEventBody{ - V: 0, - V0: &xdr.ContractEventV0{ - Data: xdr.ScVal{ - Type: xdr.ScValTypeScvU32, - U32: &v, - }, + + e := xdr.DiagnosticEvent{ + InSuccessfulContractCall: true, + Event: xdr.ContractEvent{ + Type: xdr.ContractEventTypeSystem, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Data: xdr.ScVal{ + Type: xdr.ScValTypeScvU32, + U32: &v, }, }, }, }, - txIndex: txIndex, - opIndex: opIndex, - eventIndex: eventIndex, } -} - -func mustMarshal(e xdr.DiagnosticEvent) string { - result, err := xdr.MarshalBase64(e) + diagnosticEventXDR, err := e.MarshalBinary() if err != nil { panic(err) } - return result + return event{ + diagnosticEventXDR: diagnosticEventXDR, + txIndex: txIndex, + eventIndex: eventIndex, + } } func (e event) equals(other event) bool { return e.txIndex == other.txIndex && - e.opIndex == other.opIndex && e.eventIndex == other.eventIndex && - mustMarshal(e.contents) == mustMarshal(other.contents) + bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR) } func eventsAreEqual(t *testing.T, a, b []event) { @@ -291,7 +288,7 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 5, Tx: 1, Op: 2}, + Start: Cursor{Ledger: 5, Tx: 2}, ClampStart: false, End: Cursor{Ledger: 9}, ClampEnd: false, @@ -327,7 +324,7 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0}, + Start: Cursor{Ledger: 8, Tx: 2, Event: 3}, ClampStart: false, End: MaxCursor, ClampEnd: true, @@ -336,7 +333,7 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0}, + Start: Cursor{Ledger: 8, Tx: 2, Event: 3}, ClampStart: false, End: Cursor{Ledger: 9}, ClampEnd: false, @@ -354,9 +351,9 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 5, Tx: 1, Op: 2}, + Start: Cursor{Ledger: 5, Tx: 2}, ClampStart: false, - End: Cursor{Ledger: 8, Tx: 1, Op: 4}, + End: Cursor{Ledger: 8, Tx: 2}, ClampEnd: false, }, concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]), @@ -367,11 +364,12 @@ func TestScan(t *testing.T) { iterateAll := true f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool { require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp) + diagnosticEventXDR, err := contractEvent.MarshalBinary() + require.NoError(t, err) events = append(events, event{ - contents: contractEvent, - txIndex: cursor.Tx, - opIndex: cursor.Op, - eventIndex: cursor.Event, + diagnosticEventXDR: diagnosticEventXDR, + txIndex: cursor.Tx, + eventIndex: cursor.Event, }) return iterateAll }