Skip to content

Commit

Permalink
Support partial snapshots
Browse files Browse the repository at this point in the history
stress tested with up to 4 parts on a local ddnet server
with lots of projectiles and dbg dummies
  • Loading branch information
ChillerDragon committed Jul 4, 2024
1 parent dae44a1 commit a650957
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 14 deletions.
1 change: 1 addition & 0 deletions object7/snap_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewObject(typeId int, itemId int, u *packer.Unpacker) SnapObject {
unknown := &Unknown{
ItemId: itemId,
ItemType: typeId,
ItemSize: u.GetInt(),
}
return unknown
}
1 change: 0 additions & 1 deletion object7/unknown.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (o *Unknown) Pack() []byte {
}

func (o *Unknown) Unpack(u *packer.Unpacker) error {
o.ItemSize = u.GetInt()
o.Fields = make([]int, o.Size())

for i := 0; i < o.Size(); i++ {
Expand Down
17 changes: 5 additions & 12 deletions snapshot7/snapshot7.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
)

const (
MaxType = 0x7fff
MaxId = 0xffff
MaxParts = 64
MaxSize = MaxParts * 1024
MaxType = 0x7fff
MaxId = 0xffff
MaxParts = 64
MaxSize = MaxParts * 1024
MaxPackSize = 900
)

type Snapshot struct {
Expand Down Expand Up @@ -233,14 +234,10 @@ func UnpackDelata(from *Snapshot, u *packer.Unpacker) (*Snapshot, error) {
//
// See also snapshot7.UnpackDelta()
func (snap *Snapshot) Unpack(u *packer.Unpacker) error {
// TODO: add all the error checking the C++ reference implementation has

snap.NumRemovedItems = u.GetInt()
snap.NumItemDeltas = u.GetInt()
u.GetInt() // _zero

// TODO: copy non deleted items from a delta snapshot

for i := 0; i < snap.NumRemovedItems; i++ {
deleted := u.GetInt()
slog.Debug("deleted item", "key", deleted)
Expand All @@ -252,17 +249,13 @@ func (snap *Snapshot) Unpack(u *packer.Unpacker) error {
itemType := u.GetInt()
itemId := u.GetInt()

slog.Debug("unpack item snap item ", "num", i, "total", snap.NumItemDeltas, "type", itemType, "id", itemId)

item := object7.NewObject(itemType, itemId, u)
err := item.Unpack(u)
if err != nil {
return err
}

snap.Items = append(snap.Items, item)

// TODO: update old items
}

if u.RemainingSize() > 0 {
Expand Down
31 changes: 31 additions & 0 deletions snapshot7/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,46 @@ type Storage struct {
holder map[int]*holder
OldestTick int
NewestTick int

// use to store and concatinate data
// of multi part snapshots
// use AddIncomingData() and IncomingData() to read and write
multiPartIncomingData []byte
}

func NewStorage() *Storage {
s := &Storage{}
s.holder = make(map[int]*holder)
s.OldestTick = -1
s.NewestTick = -1
s.multiPartIncomingData = make([]byte, 0, MaxSize)
return s
}
func (s *Storage) AddIncomingData(part int, numParts int, data []byte) error {
if part == 0 {
// reset length if we get a new snapshot
s.multiPartIncomingData = s.multiPartIncomingData[:0]
}
if part != numParts-1 {
// a snapshot payload can be 900 at most
// so unless it is the last part it should fill
// all 900 bytes
if len(data) != MaxPackSize {
return fmt.Errorf("part=%d num_parts=%d expected_size=900 got_size=%d", part, numParts, len(data))
}
}
if len(s.multiPartIncomingData)+len(data) > MaxSize {
return errors.New("reached the maximum amount of snapshot data")
}

s.multiPartIncomingData = append(s.multiPartIncomingData, data...)

return nil
}

func (s *Storage) IncomingData() []byte {
return s.multiPartIncomingData
}

func (s *Storage) First() (*Snapshot, error) {
if s.OldestTick == -1 {
Expand Down
21 changes: 21 additions & 0 deletions snapshot7/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,33 @@
package snapshot7_test

import (
"slices"
"testing"

"github.com/teeworlds-go/protocol/internal/testutils/require"
"github.com/teeworlds-go/protocol/snapshot7"
)

func TestMultiPartStorage(t *testing.T) {
t.Parallel()

storage := snapshot7.NewStorage()
err := storage.AddIncomingData(0, 2, []byte{0xaa, 0xbb})
require.NotNil(t, err)

err = storage.AddIncomingData(0, 1, []byte{0xff, 0xdd})
require.NoError(t, err)
require.Equal(t, []byte{0xff, 0xdd}, storage.IncomingData())

zeros := []byte{899: 0}
err = storage.AddIncomingData(0, 2, zeros)
require.NoError(t, err)
require.Equal(t, zeros, storage.IncomingData())
err = storage.AddIncomingData(1, 2, zeros)
require.NoError(t, err)
require.Equal(t, slices.Concat(zeros, zeros), storage.IncomingData())
}

func TestStorage(t *testing.T) {
t.Parallel()

Expand Down
56 changes: 55 additions & 1 deletion teeworlds7/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,61 @@ func (client *Client) processSystem(netMsg messages7.NetMessage, response *proto
})
case *messages7.Snap:
userMsgCallback(client.Callbacks.SysSnap, msg, func() {
response.Messages = append(response.Messages, &messages7.CtrlKeepAlive{})
deltaTick := msg.GameTick - msg.DeltaTick
slog.Debug("got snap", "delta_tick", deltaTick, "raw_delta_tick", msg.DeltaTick, "game_tick", msg.GameTick, "part", msg.Part, "num_parts", msg.NumParts)

err := client.SnapshotStorage.AddIncomingData(msg.Part, msg.NumParts, msg.Data)
if err != nil {
// TODO: dont panic
panic(err)
}

// TODO: this is as naive as it gets
// we should check if we actually received all the previous parts
// teeworlds does some fancy bit stuff here
// m_SnapshotParts |= 1<<Part;
if msg.Part != msg.NumParts-1 {
// TODO: remove this print
slog.Info("storing partial snap", "part", msg.Part, "num_parts", msg.NumParts)
return
}

prevSnap, err := client.SnapshotStorage.Get(deltaTick)

if err != nil {
// couldn't find the delta snapshots that the server used
// to compress this snapshot. force the server to resync
slog.Error("error, couldn't find the delta snapshot", "error", err)

// ack snapshot
// TODO:
// m_AckGameTick = -1;
return
}

u := &packer.Unpacker{}
u.Reset(client.SnapshotStorage.IncomingData())

newFullSnap, err := snapshot7.UnpackDelata(prevSnap, u)
if err != nil {
slog.Error("delta unpack failed!", "error", err)
return
}
err = client.SnapshotStorage.Add(msg.GameTick, newFullSnap)
if err != nil {
slog.Error("failed to store snap", "error", err)
}
client.SnapshotStorage.PurgeUntil(deltaTick)

for _, callback := range client.Callbacks.Snapshot {
callback(newFullSnap, func() {})
}

client.Game.Input.AckGameTick = msg.GameTick
client.Game.Input.PredictionTick = client.SnapshotStorage.NewestTick
client.Game.Snap.fill(newFullSnap)

response.Messages = append(response.Messages, client.Game.Input)
})
case *messages7.SnapSingle:
userMsgCallback(client.Callbacks.SysSnapSingle, msg, func() {
Expand Down

0 comments on commit a650957

Please sign in to comment.