Skip to content

Commit

Permalink
refactor TCB pending flags; keep working on TCP Close
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed Nov 19, 2023
1 parent 6a1bb58 commit 5a61e23
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 85 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ How to use `seqs`
```sh
go mod download github.com/soypat/seqs@latest
```

38 changes: 19 additions & 19 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type ControlBlock struct {
rcv recvSpace
// When FlagRST is set in pending flags rstPtr will contain the sequence number of the RST segment to make it "believable" (See RFC9293)
rstPtr Value
// pending and state are modified by rcv* methods and Close method.
// The pending flags are only updated if the Recv method finishes with no error.
pending Flags
state State

// pending is the queue of pending flags to be sent in the next 2 segments.
// On a call to Send the queue is advanced and flags set in the segment are unset.
// The second position of the queue is used for FIN segments.
pending [2]Flags
state State
debuglog string
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (seg *Segment) Last() Value {
// PendingSegment calculates a suitable next segment to send from a payload length.
// It does not modify the ControlBlock state.
func (tcb *ControlBlock) PendingSegment(payloadLen int) (_ Segment, ok bool) {
if (payloadLen == 0 && tcb.pending == 0) || (payloadLen > 0 && tcb.state != StateEstablished) {
if (payloadLen == 0 && tcb.pending[0] == 0) || (payloadLen > 0 && tcb.state != StateEstablished) {
return Segment{}, false // No pending segment.
}
if payloadLen > math.MaxUint16 || Size(payloadLen) > tcb.snd.WND {
Expand All @@ -116,24 +116,24 @@ func (tcb *ControlBlock) PendingSegment(payloadLen int) (_ Segment, ok bool) {

pending := tcb.pending
if payloadLen > 0 {
pending |= FlagPSH
pending[0] |= FlagPSH
}

var ack Value
if tcb.pending.HasAny(FlagACK) {
if tcb.pending[0].HasAny(FlagACK) {
ack = tcb.rcv.NXT
}

var seq Value = tcb.snd.NXT
if tcb.pending.HasAny(FlagRST) {
if tcb.pending[0].HasAny(FlagRST) {
seq = tcb.rstPtr
}

seg := Segment{
SEQ: seq,
ACK: ack,
WND: tcb.rcv.WND,
Flags: pending,
Flags: pending[0],
DATALEN: Size(payloadLen),
}
return seg, true
Expand All @@ -152,7 +152,7 @@ func (tcb *ControlBlock) rcvListen(seg Segment) (pending Flags, err error) {
tcb.resetRcv(tcb.rcv.WND, seg.SEQ)

// We must respond with SYN|ACK frame after receiving SYN in listen state (three way handshake).
tcb.pending = synack
tcb.pending[0] = synack
tcb.state = StateSynRcvd
return synack, nil
}
Expand Down Expand Up @@ -287,24 +287,24 @@ func (tcb *ControlBlock) validateIncomingSegment(seg Segment) (err error) {
// https://www.rfc-editor.org/rfc/rfc9293.html#section-3.10.7.4-2.5.2.2.2.3.2.1
case established && acksOld && !ctlOrDataSegment:
err = errDropSegment
tcb.pending = 0 // Completely ignore duplicate ACKs.
tcb.debuglog += fmt.Sprintf("rcv %s: duplicate ACK %x\n", tcb.state, seg.ACK)
tcb.pending[0] &= FlagFIN // Completely ignore duplicate ACKs but do not erase fin bit.
tcb.debuglog += fmt.Sprintf("rcv %s: duplicate ACK %d\n", tcb.state, seg.ACK)

case established && acksUnsentData:
err = errDropSegment
tcb.pending = FlagACK // Send ACK for unsent data.
tcb.debuglog += fmt.Sprintf("rcv %s: ACK %x of unsent data\n", tcb.state, seg.ACK)
tcb.pending[0] = FlagACK // Send ACK for unsent data.
tcb.debuglog += fmt.Sprintf("rcv %s: ACK %d of unsent data\n", tcb.state, seg.ACK)

case preestablished && (acksOld || acksUnsentData):
err = errDropSegment
tcb.pending = FlagRST
tcb.pending[0] = FlagRST
tcb.rstPtr = seg.ACK
tcb.resetSnd(tcb.snd.ISS, seg.WND)
tcb.debuglog += fmt.Sprintf("rcv %s: RST %x of old data\n", tcb.state, seg.ACK)
tcb.debuglog += fmt.Sprintf("rcv %s: RST %d of old data\n", tcb.state, seg.ACK)

case preestablished && flags.HasAny(FlagRST):
err = errDropSegment
tcb.pending = 0
tcb.pending[0] = 0
tcb.state = StateListen
tcb.resetSnd(tcb.snd.ISS+rstJump, tcb.snd.WND)
tcb.resetRcv(tcb.rcv.WND, 3_14159_2653^tcb.rcv.IRS)
Expand Down Expand Up @@ -338,7 +338,7 @@ func (tcb *ControlBlock) validateOutgoingSegment(seg Segment) (err error) {
// close sets ControlBlock state to closed and resets all sequence numbers and pending flag.
func (tcb *ControlBlock) close() {
tcb.state = StateClosed
tcb.pending = 0
tcb.pending = [2]Flags{}
tcb.resetRcv(0, 0)
tcb.resetSnd(0, 0)
tcb.debuglog += "close tcb\n"
Expand Down
47 changes: 39 additions & 8 deletions control_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,33 @@ func (tcb *ControlBlock) Open(iss Value, wnd Size, state State) (err error) {
tcb.state = state
tcb.resetRcv(wnd, 0)
tcb.resetSnd(iss, 1)
tcb.pending = 0
tcb.pending = [2]Flags{}
if state == StateSynSent {
tcb.pending = FlagSYN
tcb.pending[0] = FlagSYN
}
return nil
}

func (tcb *ControlBlock) Close() (err error) {
// See RFC 9293: 3.10.4 CLOSE call.
switch tcb.state {
case StateClosed:
err = errors.New("connection does not exist")
case StateCloseWait:
tcb.state = StateLastAck
tcb.pending = [2]Flags{FlagFIN, FlagACK}
case StateListen, StateSynSent:
tcb.close()
case StateSynRcvd, StateEstablished:
// We suppose user has no more pending data to send, so we flag FIN to be sent.
// Users of this API should call Close only when they have no more data to send.
tcb.pending[0] = (tcb.pending[0] & FlagACK) | FlagFIN
case StateFinWait2, StateTimeWait:
err = errors.New("connection closing")
}
return err
}

// Send processes a segment that is being sent to the network. It updates the TCB
// if there is no error.
func (tcb *ControlBlock) Send(seg Segment) error {
Expand All @@ -46,13 +66,13 @@ func (tcb *ControlBlock) Send(seg Segment) error {
return err
}

// The segment is valid, we can update TCB state.
seglen := seg.LEN()
tcb.snd.NXT.UpdateForward(seglen)
tcb.rcv.WND = seg.WND
hasFIN := seg.Flags.HasAny(FlagFIN)
hasACK := seg.Flags.HasAny(FlagACK)
switch tcb.state {
case StateSynRcvd:
if hasFIN {
tcb.state = StateFinWait1 // RFC 9293: 3.10.4 CLOSE call.
}
case StateClosing:
if hasACK {
tcb.state = StateTimeWait
Expand All @@ -65,9 +85,20 @@ func (tcb *ControlBlock) Send(seg Segment) error {
if hasFIN {
tcb.state = StateLastAck
} else if hasACK {
tcb.pending = finack
tcb.pending[1] = finack // Queue finack.
}
}

// Advance pending flags queue.
tcb.pending[0] &^= seg.Flags
if tcb.pending[0] == 0 {
tcb.pending = [2]Flags{tcb.pending[1], 0}
}

// The segment is valid, we can update TCB state.
seglen := seg.LEN()
tcb.snd.NXT.UpdateForward(seglen)
tcb.rcv.WND = seg.WND
return nil
}

Expand Down Expand Up @@ -108,7 +139,7 @@ func (tcb *ControlBlock) Recv(seg Segment) (err error) {
return err
}

tcb.pending = pending
tcb.pending[0] = pending
if prevNxt != 0 && tcb.snd.NXT != prevNxt {
tcb.debuglog += fmt.Sprintf("rcv %s: snd.nxt changed from %x to %x on segment %+v\n", tcb.state, prevNxt, tcb.snd.NXT, seg)
}
Expand Down
5 changes: 5 additions & 0 deletions eth/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ func (a *ARPv4Header) String() string {
"I have ", net.IP(a.ProtoSender[:]).String(), "! Tell ", net.IP(a.ProtoTarget[:]).String(), ", aka ", net.HardwareAddr(a.HardwareTarget[:]).String())
}

// AssertEtherType returns the ProtoType field of the ARP header as EtherType.
func (a *ARPv4Header) AssertEtherType() EtherType {
return EtherType(a.ProtoType)
}

// DecodeTCPHeader decodes a TCP header from buf and returns the TCPHeader
// and the offset in bytes to the payload. Panics if buf is less than 20 bytes in length.
func DecodeTCPHeader(buf []byte) (tcphdr TCPHeader, payloadOffset uint8) {
Expand Down
39 changes: 35 additions & 4 deletions stack/portstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stack

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -23,6 +24,7 @@ type PortStackConfig struct {
IP netip.Addr
MaxOpenPortsUDP int
MaxOpenPortsTCP int
Logger *slog.Logger
}

// NewPortStack creates a ready to use TCP/UDP Stack instance.
Expand All @@ -32,6 +34,7 @@ func NewPortStack(cfg PortStackConfig) *PortStack {
s.IP = cfg.IP
s.UDPv4 = make([]udpPort, cfg.MaxOpenPortsUDP)
s.TCPv4 = make([]tcpPort, cfg.MaxOpenPortsTCP)
s.logger = cfg.Logger
return &s
}

Expand All @@ -53,7 +56,8 @@ type PortStack struct {
pendingTCPv4 uint32
droppedPackets uint32
processedPackets uint32
level slog.Level
pendingARP eth.ARPv4Header
logger *slog.Logger
}

// Common errors.
Expand Down Expand Up @@ -111,7 +115,18 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
}

if etype == eth.EtherTypeARP {

ahdr := eth.DecodeARPv4Header(payload[eth.SizeEthernetHeader:])
if ps.hasPendingARP() || ahdr.HardwareLength != 6 || ahdr.ProtoLength != 4 || ahdr.HardwareType != 1 || ahdr.AssertEtherType() != eth.EtherTypeIPv4 {
return nil // Ignore ARP replies and unsupported requests.
}
if ahdr.ProtoTarget != ps.IP.As4() {
return nil // Not for us.
}
// We need to respond to this ARP request.
ahdr.HardwareTarget = ps.MACAs6()
ahdr.Operation = 2 // Set as reply. This also flags the packet as pending.
ps.pendingARP = ahdr
return nil
}

// IP parsing block.
Expand Down Expand Up @@ -239,6 +254,17 @@ func (ps *PortStack) HandleEth(dst []byte) (n int, err error) {
return 0, io.ErrShortBuffer
case ps.pendingUDPv4 == 0 && ps.pendingTCPv4 == 0:
return 0, nil // No packets to handle
case ps.hasPendingARP():
// We need to respond to an ARP request.
ehdr := eth.EthernetHeader{
Destination: ps.pendingARP.HardwareSender,
Source: ps.MACAs6(),
SizeOrEtherType: uint16(eth.EtherTypeARP),
}
ehdr.Put(dst)
ps.pendingARP.Put(dst[eth.SizeEthernetHeader:])
ps.pendingARP.Operation = 0 // Clear pending ARP.
return eth.SizeEthernetHeader + eth.SizeARPv4Header, nil
}

ps.info("HandleEth", slog.Int("dstlen", len(dst)))
Expand Down Expand Up @@ -299,6 +325,10 @@ func (ps *PortStack) HandleEth(dst []byte) (n int, err error) {
return n, err
}

func (ps *PortStack) hasPendingARP() bool {
return ps.pendingARP.Operation == 2 // 2 means reply.
}

// OpenUDP opens a UDP port and sets the handler. If the port is already open
// or if there is no socket available it returns an error.
func (ps *PortStack) OpenUDP(port uint16, handler func([]byte, *UDPPacket) (int, error)) error {
Expand Down Expand Up @@ -447,11 +477,12 @@ func (ps *PortStack) debug(msg string, attrs ...slog.Attr) {
}

func (ps *PortStack) logAttrsPrint(level slog.Level, msg string, attrs ...slog.Attr) {
if ps.level <= level {
logAttrsPrint(level, msg, attrs...)
if ps.logger != nil {
ps.logger.LogAttrs(context.Background(), level, msg, attrs...)
}
}

// logAttrsPrint is a hand-rolled slog.Handler implementation for use in memory contrained systems.
func logAttrsPrint(level slog.Level, msg string, attrs ...slog.Attr) {
var levelStr string = level.String()

Expand Down
Loading

0 comments on commit 5a61e23

Please sign in to comment.