Skip to content

Commit

Permalink
refactor txStacks->exchangeStacks
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed Nov 19, 2023
1 parent fbaec70 commit af8ab03
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 49 deletions.
9 changes: 6 additions & 3 deletions stack/socket_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ func (r *ring) Buffered() int {
return len(r.buf) - r.Free()
}

func (r *ring) Reset() {
r.off = 0
r.end = 0
}

func (r *ring) Free() int {
if r.end >= r.off {
// start off end len(buf)
Expand All @@ -341,9 +346,7 @@ func (r *ring) midFree() int {

func (r *ring) onReadEnd() {
if r.off == r.end {
// We read everything, reset.
r.off = 0
r.end = 0
r.Reset() // We read everything, reset.
}
}

Expand Down
103 changes: 57 additions & 46 deletions stack/stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,131 +12,142 @@ import (
"github.com/soypat/seqs/stack"
)

const exchangesToEstablish = 4

func TestStackEstablish(t *testing.T) {
client, server := createTCPClientServerPair(t)

// 3 way handshake needs 3 exchanges to complete.
const maxTransactions = 3
txDone, numBytesSent := txStacks(t, maxTransactions, client.PortStack(), server.PortStack())
const maxTransactions = exchangesToEstablish + 1
txDone, numBytesSent := exchangeStacks(t, maxTransactions, client.PortStack(), server.PortStack())

const expectedData = (eth.SizeEthernetHeader + eth.SizeIPv4Header + eth.SizeTCPHeader) * 4
if numBytesSent < expectedData {
t.Error("too little data exchanged", numBytesSent, " want>=", expectedData)
}
if txDone >= 3 {
t.Error("too many exchanges for a 3 way handshake")
} else if txDone <= 1 {
t.Error("too few exchanges for a 3 way handshake")
if txDone > exchangesToEstablish {
t.Errorf("too many exchanges for a 3 way handshake: got %d want %d", txDone, exchangesToEstablish)
} else if txDone < exchangesToEstablish {
t.Errorf("too few exchanges for a 3 way handshake: got %d want %d", txDone, exchangesToEstablish)
}
if client.State() != seqs.StateEstablished {
t.Error("client not established: got", server.State(), "want", seqs.StateEstablished)
t.Errorf("client not established: got %s want %s", client.State(), seqs.StateEstablished)
}
if server.State() != seqs.StateEstablished {
t.Error("server not established: got", server.State(), "want", seqs.StateEstablished)
t.Errorf("server not established: got %s want %s", server.State(), seqs.StateEstablished)
}
}

func TestStackSendReceive_simplex(t *testing.T) {
client, server := createTCPClientServerPair(t)

// 3 way handshake needs2 exchanges to complete.
txStacks(t, 2, client.PortStack(), server.PortStack())
exchangeStacks(t, 4, client.PortStack(), server.PortStack())
if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished {
t.Fatal("not established")
}

// Send data from client to server.
const data = "hello world"
socketSendString(client, data)
txStacks(t, 1, client.PortStack(), server.PortStack())
exchangeStacks(t, 1, client.PortStack(), server.PortStack())
if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished {
t.Fatal("not established")
t.Fatalf("not established: client=%s server=%s", client.State(), server.State())
}
got := socketReadAllString(server)
if got != data {
t.Error("got", got, "want", data)
t.Errorf("got %q want %q", got, data)
}
}

func TestStackSendReceive_duplex(t *testing.T) {
client, server := createTCPClientServerPair(t)
cstack, sstack := client.PortStack(), server.PortStack()
// 3 way handshake needs2 exchanges to complete.
txStacks(t, 2, cstack, sstack)
exchangeStacks(t, 4, cstack, sstack)
if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished {
t.Fatal("not established")
t.Fatalf("not established: client=%s server=%s", client.State(), server.State())
}

// Send data from client to server.
const data = "hello world"
socketSendString(client, data)
socketSendString(server, data)
tx, bytes := txStacks(t, 2, cstack, sstack)
tx, bytes := exchangeStacks(t, 3, cstack, sstack)
if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished {
t.Fatal("not established")
t.Fatalf("not established: client=%s server=%s", client.State(), server.State())
}
t.Logf("tx=%d bytes=%d", tx, bytes)
clientstr := socketReadAllString(client)
serverstr := socketReadAllString(server)
if clientstr != data {
t.Error("got", clientstr, "want", data)
t.Errorf("got %q want %q", clientstr, data)
}
if serverstr != data {
t.Error("got", serverstr, "want", data)
t.Errorf("got %q want %q", serverstr, data)
}
}

func isDroppedPacket(err error) bool {
return err != nil && (errors.Is(err, stack.ErrDroppedPacket) || strings.HasPrefix(err.Error(), "drop"))
}

func txStacks(t *testing.T, maxTransactions int, stacks ...*stack.PortStack) (tx, bytesSent int) {
func exchangeStacks(t *testing.T, maxExchanges int, stacks ...*stack.PortStack) (ex, bytesSent int) {
t.Helper()
var pipe [2048]byte
zeroPipe := func() { pipe = [2048]byte{} }
sprintErr := func(err error) (s string) {
return err.Error()
}
for ; tx <= maxTransactions; tx++ {
pipeN := make([]int, len(stacks))
pipes := make([][2048]byte, len(stacks))
zeroPayload := func(i int) {
pipeN[i] = 0
pipes[i] = [2048]byte{}
}
getPayload := func(i int) []byte { return pipes[i][:pipeN[i]] }
var err error
for ; ex <= maxExchanges; ex++ {
sentInTx := 0
for isend := 0; isend < len(stacks); isend++ {
n, err := stacks[isend].HandleEth(pipe[:])
pipeN[isend], err = stacks[isend].HandleEth(pipes[isend][:])
if err != nil && !isDroppedPacket(err) {
t.Errorf("tx[%d] send[%d]: %s", tx, isend, sprintErr(err))
return tx, bytesSent
t.Errorf("ex[%d] send[%d]: %s", ex, isend, sprintErr(err))
return ex, bytesSent
} else if isDroppedPacket(err) {
t.Logf("tx[%d] send[%d]: %s", tx, isend, sprintErr(err))
t.Logf("ex[%d] send[%d]: %s", ex, isend, sprintErr(err))
}
if n > 0 {
pkt, err := stack.ParseTCPPacket(pipe[:n])
if pipeN[isend] > 0 {
pkt, err := stack.ParseTCPPacket(getPayload(isend))
if err != nil {
t.Errorf("tx[%d] send[%d]: malformed packet: %v", tx, isend, sprintErr(err))
return tx, bytesSent
t.Errorf("ex[%d] send[%d]: malformed packet: %v", ex, isend, sprintErr(err))
return ex, bytesSent
}

t.Logf("tx[%d] send[%d]: %+v", tx, isend, pkt.TCP.Segment(len(pkt.Payload())))
t.Logf("ex[%d] send[%d]: %+v", ex, isend, pkt.TCP.Segment(len(pkt.Payload())))
}
bytesSent += n
sentInTx += n
for irecv := 0; n > 0 && irecv < len(stacks); irecv++ {
if irecv == isend {
continue // Don't send to self!
}
err = stacks[irecv].RecvEth(pipe[:n])
bytesSent += pipeN[isend]
sentInTx += pipeN[isend]
}
if sentInTx == 0 {
break // No more data being sent.
}
for isend := 0; isend < len(stacks); isend++ {
payload := getPayload(isend)
if len(payload) == 0 {
continue
}
for irecv := 0; irecv < len(stacks); irecv++ {
err = stacks[irecv].RecvEth(payload)
if err != nil && !isDroppedPacket(err) {
t.Errorf("tx[%d] recv[%d]: %s", tx, irecv, sprintErr(err))
return tx, bytesSent
t.Errorf("ex[%d] recv[%d]: %s", ex, irecv, sprintErr(err))
return ex, bytesSent
} else if isDroppedPacket(err) {
t.Logf("tx[%d] recv[%d]: %s", tx, irecv, sprintErr(err))
t.Logf("ex[%d] recv[%d]: %s", ex, irecv, sprintErr(err))
}
}
zeroPipe()
}
if sentInTx == 0 {
break // No more data being interchanged.
zeroPayload(isend)
}
}
return tx, bytesSent
return ex, bytesSent
}

func createTCPClientServerPair(t *testing.T) (client, server *stack.TCPSocket) {
Expand Down

0 comments on commit af8ab03

Please sign in to comment.