Skip to content

Commit

Permalink
add createTCPClientServerPair testing helper
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed Nov 19, 2023
1 parent e59351f commit 3594fb1
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 114 deletions.
4 changes: 3 additions & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func (tcb *ControlBlock) validateIncomingSegment(seg Segment) (err error) {
err = errors.New(errPfx + "last not in receive window")

case checkSEQ && seg.SEQ != tcb.rcv.NXT:
// This part diverts from TCB as described in RFC 9293. We want to support
// only sequential segments to keep implementation simple and maintainable.
err = errors.New(errPfx + "seq != rcv.nxt (use sequential segments)")
}
if err != nil {
Expand Down Expand Up @@ -305,7 +307,7 @@ func (tcb *ControlBlock) validateIncomingSegment(seg Segment) (err error) {
tcb.pending = 0
tcb.state = StateListen
tcb.resetSnd(tcb.snd.ISS+rstJump, tcb.snd.WND)
tcb.resetRcv(tcb.rcv.WND, 3_14159_2653)
tcb.resetRcv(tcb.rcv.WND, 3_14159_2653^tcb.rcv.IRS)
tcb.debuglog += fmt.Sprintf("rcv %s: remote RST\n", tcb.state)
}
return err
Expand Down
3 changes: 3 additions & 0 deletions stack/port_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func (pkt *TCPPacket) InvertSrcDest() {

func (pkt *TCPPacket) CalculateHeaders(seg seqs.Segment, payload []byte) {
const ipLenInWords = 5
if int(seg.DATALEN) != len(payload) {
panic("seg.DATALEN != len(payload)")
}
// Ethernet frame.
pkt.Eth.SizeOrEtherType = uint16(eth.EtherTypeIPv4)

Expand Down
12 changes: 8 additions & 4 deletions stack/portstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"

Check failure on line 8 in stack/portstack.go

View workflow job for this annotation

GitHub Actions / build

package log/slog is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/log/slog)
"net"
"net/netip"
"slices"

Check failure on line 11 in stack/portstack.go

View workflow job for this annotation

GitHub Actions / build

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/slices)
"time"

"github.com/soypat/seqs/eth"
Expand All @@ -18,7 +19,7 @@ const (
)

type PortStackConfig struct {
MAC net.HardwareAddr
MAC [6]byte
IP netip.Addr
MaxOpenPortsUDP int
MaxOpenPortsTCP int
Expand All @@ -27,7 +28,7 @@ type PortStackConfig struct {
// NewPortStack creates a ready to use TCP/UDP Stack instance.
func NewPortStack(cfg PortStackConfig) *PortStack {
var s PortStack
s.MAC = cfg.MAC
s.mac = cfg.MAC
s.IP = cfg.IP
s.UDPv4 = make([]udpPort, cfg.MaxOpenPortsUDP)
s.TCPv4 = make([]tcpPort, cfg.MaxOpenPortsTCP)
Expand All @@ -42,7 +43,7 @@ func NewPortStack(cfg PortStackConfig) *PortStack {
type PortStack struct {
lastRx time.Time
lastRxSuccess time.Time
MAC net.HardwareAddr
mac [6]byte
// Set IP to non-nil to ignore packets not meant for us.
IP netip.Addr
UDPv4 []udpPort
Expand Down Expand Up @@ -72,6 +73,9 @@ var (
errIPVersion = errors.New("IP version not supported")
)

func (ps *PortStack) MAC() net.HardwareAddr { return slices.Clone(ps.mac[:]) }
func (ps *PortStack) MACAs6() [6]byte { return ps.mac }

// RecvEth validates an ethernet+ipv4 frame in payload. If it is OK then it
// defers response handling of the packets during a call to [Stack.HandleEth].
//
Expand Down Expand Up @@ -100,7 +104,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
// Ethernet parsing block
ehdr = eth.DecodeEthernetHeader(payload)
etype := ehdr.AssertType()
if ps.MAC != nil && !eth.IsBroadcastHW(ehdr.Destination[:]) && !bytes.Equal(ehdr.Destination[:], ps.MAC) {
if !eth.IsBroadcastHW(ehdr.Destination[:]) && !bytes.Equal(ehdr.Destination[:], ps.mac[:]) {
return nil // Ignore packet, is not for us.
} else if etype != eth.EtherTypeIPv4 && etype != eth.EtherTypeARP {
return nil // Ignore Non-IPv4 packets.
Expand Down
80 changes: 59 additions & 21 deletions stack/socket_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"cmp"

Check failure on line 4 in stack/socket_tcp.go

View workflow job for this annotation

GitHub Actions / build

package cmp is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/cmp)
"errors"
"io"
"log/slog"
"net"
"net/netip"
"time"

Expand All @@ -12,7 +14,7 @@ import (

const defaultSocketSize = 2048

type tcp struct {
type TCPSocket struct {
stack *PortStack
scb seqs.ControlBlock
localPort uint16
Expand All @@ -23,33 +25,56 @@ type tcp struct {
remoteMAC [6]byte
tx ring
rx ring
abortErr error
}

func (t *tcp) State() seqs.State {
func (t *TCPSocket) PortStack() *PortStack {
return t.stack
}

func (t *TCPSocket) AddrPort() netip.AddrPort {
return netip.AddrPortFrom(t.stack.IP, t.localPort)
}

func (t *TCPSocket) MAC() net.HardwareAddr {
return t.stack.MAC()
}

func (t *TCPSocket) State() seqs.State {
return t.scb.State()
}

func (t *tcp) Send(b []byte) error {
func (t *TCPSocket) Send(b []byte) error {
if t.scb.State() != seqs.StateEstablished {
return errors.New("connection not established")
}
if len(b) == 0 {
return nil
}
if t.tx.buf == nil {
t.tx = ring{
buf: make([]byte, max(defaultSocketSize, len(b))),
}
}
_, err := t.tx.Write(b)
err := t.stack.FlagTCPPending(t.localPort)
if err != nil {
return err
}
_, err = t.tx.Write(b)
if err != nil {
return err
}
return nil
}

func (t *tcp) Recv(b []byte) (int, error) {
func (t *TCPSocket) Recv(b []byte) (int, error) {
n, err := t.rx.Read(b)
return n, err
}

// DialTCP opens an active TCP connection to the given remote address.
func DialTCP(stack *PortStack, localPort uint16, remoteMAC [6]byte, remote netip.AddrPort, iss seqs.Value, window seqs.Size) (*tcp, error) {
t := tcp{
func DialTCP(stack *PortStack, localPort uint16, remoteMAC [6]byte, remote netip.AddrPort, iss seqs.Value, window seqs.Size) (*TCPSocket, error) {
t := TCPSocket{
stack: stack,
localPort: localPort,
remote: remote,
Expand All @@ -73,8 +98,8 @@ func DialTCP(stack *PortStack, localPort uint16, remoteMAC [6]byte, remote netip

// ListenTCP opens a passive TCP connection that listens on the given port.
// ListenTCP only handles one connection at a time, so API may change in future to accomodate multiple connections.
func ListenTCP(stack *PortStack, port uint16, iss seqs.Value, window seqs.Size) (*tcp, error) {
t := tcp{
func ListenTCP(stack *PortStack, port uint16, iss seqs.Value, window seqs.Size) (*TCPSocket, error) {
t := TCPSocket{
stack: stack,
localPort: port,
}
Expand All @@ -89,7 +114,14 @@ func ListenTCP(stack *PortStack, port uint16, iss seqs.Value, window seqs.Size)
return &t, nil
}

func (t *tcp) handleMain(response []byte, pkt *TCPPacket) (n int, err error) {
func (t *TCPSocket) handleMain(response []byte, pkt *TCPPacket) (n int, err error) {
defer func() {
if err != nil && t.abortErr == nil {
err = nil // Only close socket if socket is aborted.
} else if err != nil {
t.stack.error("tcp socket", slog.Int("port", int(t.localPort)), slog.String("err", err.Error()))
}
}()
hasPacket := pkt.HasPacket()
if !hasPacket && t.mustSendSyn() {
// Connection is still closed, we need to establish
Expand All @@ -110,7 +142,7 @@ func (t *tcp) handleMain(response []byte, pkt *TCPPacket) (n int, err error) {
return t.handleUser(response, pkt)
}

func (t *tcp) handleRecv(response []byte, pkt *TCPPacket) (n int, err error) {
func (t *TCPSocket) handleRecv(response []byte, pkt *TCPPacket) (n int, err error) {
// By this point we know that the packet is valid and contains data, we process it.
payload := pkt.Payload()
segIncoming := pkt.TCP.Segment(len(payload))
Expand Down Expand Up @@ -154,7 +186,7 @@ func (t *tcp) handleRecv(response []byte, pkt *TCPPacket) (n int, err error) {
return 54, nil
}

func (t *tcp) handleUser(response []byte, pkt *TCPPacket) (n int, err error) {
func (t *TCPSocket) handleUser(response []byte, pkt *TCPPacket) (n int, err error) {
available := t.tx.Buffered()
if available == 0 {
return 0, nil // No data to send.
Expand All @@ -168,18 +200,18 @@ func (t *tcp) handleUser(response []byte, pkt *TCPPacket) (n int, err error) {
return 0, err
}
t.setSrcDest(pkt)
pkt.PutHeaders(response)
payloadPlace := response[54:]
n, err = t.tx.Read(payloadPlace[:seg.DATALEN])
if err != nil || n != int(seg.DATALEN) {
panic("bug in handleUser") // This is a bug in ring buffer or a race condition.
}

pkt.CalculateHeaders(seg, payloadPlace[:seg.DATALEN])
pkt.PutHeaders(response)
return 54 + n, err
}

func (t *tcp) setSrcDest(pkt *TCPPacket) {
copy(pkt.Eth.Source[:], t.stack.MAC)
func (t *TCPSocket) setSrcDest(pkt *TCPPacket) {
pkt.Eth.Source = t.stack.mac
pkt.IP.Source = t.stack.IP.As4()
pkt.TCP.SourcePort = t.localPort

Expand All @@ -188,30 +220,30 @@ func (t *tcp) setSrcDest(pkt *TCPPacket) {
pkt.Eth.Destination = t.remoteMAC
}

func (t *tcp) handleInitSyn(response []byte, pkt *TCPPacket) (n int, err error) {
func (t *TCPSocket) handleInitSyn(response []byte, pkt *TCPPacket) (n int, err error) {
// Uninitialized TCB, we start the handshake.
t.setSrcDest(pkt)
pkt.CalculateHeaders(t.synsentSegment(), nil)
pkt.PutHeaders(response)
return 54, nil
}

func (t *tcp) awaitingSyn() bool {
func (t *TCPSocket) awaitingSyn() bool {
return t.scb.State() == seqs.StateSynSent && t.remote != (netip.AddrPort{})
}

func (t *tcp) mustSendSyn() bool {
func (t *TCPSocket) mustSendSyn() bool {
return t.awaitingSyn() && time.Since(t.lastTx) > 3*time.Second
}

func (t *tcp) close() {
func (t *TCPSocket) close() {
t.remote = netip.AddrPort{}
t.scb = seqs.ControlBlock{}
t.lastTx = time.Time{}
t.lastRx = time.Time{}
}

func (t *tcp) synsentSegment() seqs.Segment {
func (t *TCPSocket) synsentSegment() seqs.Segment {
return seqs.Segment{
SEQ: t.scb.ISS(),
ACK: 0,
Expand All @@ -220,6 +252,12 @@ func (t *tcp) synsentSegment() seqs.Segment {
}
}

func (t *TCPSocket) abort(err error) error {
t.abortErr = err
t.close()
return err
}

type ring struct {
buf []byte
off int
Expand Down
Loading

0 comments on commit 3594fb1

Please sign in to comment.