Skip to content

Commit

Permalink
feat(netsim): implement routing (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone authored Nov 23, 2024
1 parent 7fb5a7d commit 52508f3
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 11 deletions.
120 changes: 120 additions & 0 deletions netsim/example_router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-License-Identifier: GPL-3.0-or-later

package netsim_test

import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"net/netip"
"time"

"github.com/rbmk-project/x/connpool"
"github.com/rbmk-project/x/netsim"
"github.com/rbmk-project/x/netsim/router"
"github.com/rbmk-project/x/netsim/simpki"
)

// This example shows how to use a router to simulate a network
// topology consisting of a client and multiple servers.
func Example_router() {
// Create a pool to close resources when done.
cpool := connpool.New()
defer cpool.Close()

// Create the server stack.
serverAddr := netip.MustParseAddr("8.8.8.8")
serverStack := netsim.NewStack(serverAddr)
cpool.Add(serverStack)

// Create the client stack.
clientAddr := netip.MustParseAddr("130.192.91.211")
clientStack := netsim.NewStack(clientAddr)
cpool.Add(clientStack)

// Create and configure router
r := router.New()
r.Attach(clientStack)
r.Attach(serverStack)
r.AddRoute(clientStack)
r.AddRoute(serverStack)

// Create a context with a watchdog timeout.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Create a PKI for the server and obtain the certificate.
pki := simpki.MustNewPKI("testdata")
serverCert := pki.MustNewCert(&simpki.PKICertConfig{
CommonName: "dns.google",
DNSNames: []string{
"dns.google.com",
"dns.google",
},
IPAddrs: []net.IP{
net.IPv4(8, 8, 8, 8),
net.IPv4(8, 8, 4, 4),
},
})

// Create the HTTP server.
serverEndpoint := netip.AddrPortFrom(serverAddr, 443)
listener, err := serverStack.Listen(ctx, "tcp", serverEndpoint.String())
if err != nil {
log.Fatal(err)
}
cpool.Add(listener)
serverHTTP := &http.Server{
Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Write([]byte("Bonsoir, Elliot!\n"))
}),
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{serverCert},
},
}
go serverHTTP.ServeTLS(listener, "", "")
cpool.Add(serverHTTP)

// Create the HTTP client
clientTxp := &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := clientStack.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
cpool.Add(conn)
return conn, nil
},
TLSClientConfig: &tls.Config{
RootCAs: pki.CertPool(),
},
}
clientHTTP := &http.Client{Transport: clientTxp}

// Get the response body.
resp, err := clientHTTP.Get("https://8.8.8.8/")
if err != nil {
log.Fatal(err)
}
if resp.StatusCode != http.StatusOK {
log.Fatalf("HTTP request failed: %d", resp.StatusCode)
}
cpool.Add(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}

// Print the response body
fmt.Printf("%s", string(body))

// Explicitly close the connections
cpool.Close()

// Output:
// Bonsoir, Elliot!
}
3 changes: 0 additions & 3 deletions netsim/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package netsim

import (
"log"
"sync"

"github.com/rbmk-project/x/netsim/packet"
Expand Down Expand Up @@ -68,7 +67,6 @@ func (lnk *Link) move(left readableStack, right writableStack) {
case pkt := <-left.Output():

// Write to right stack.
log.Printf("INFLIGHT %s", pkt)
select {
case <-lnk.eof:
return
Expand All @@ -77,7 +75,6 @@ func (lnk *Link) move(left readableStack, right writableStack) {
case right.Input() <- pkt:
// success
}

}
}
}
19 changes: 19 additions & 0 deletions netsim/packet/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ const (

// Packet is a network packet.
type Packet struct {
// TTL is the packet time to live.
TTL uint8

// SrcAddr is the source address.
SrcAddr netip.Addr

Expand Down Expand Up @@ -149,6 +152,10 @@ func (p *Packet) stringTCP() string {
)
}

// DefaultBufferChannel is the required buffering
// for [NetworkDevice] channels.
const DefaultBufferChannel = 128

// NetworkDevice is a network device to read/write [*Packet].
type NetworkDevice interface {
// Addresses returns the device addresses.
Expand All @@ -158,8 +165,20 @@ type NetworkDevice interface {
EOF() <-chan struct{}

// Input returns a channel to send [*Packet] to the device.
//
// The channel must have size >= [DefaultBufferChannel].
Input() chan<- *Packet

// Output returns a channel to receive [*Packet] from the device.
//
// The channel must have size >= [DefaultBufferChannel].
Output() <-chan *Packet
}

// NewNetworkDeviceIOChannels constructs two channels
// with size == [DefaultBufferChannel].
func NewNetworkDeviceIOChannels() (chan *Packet, chan *Packet) {
input := make(chan *Packet, DefaultBufferChannel)
output := make(chan *Packet, DefaultBufferChannel)
return input, output
}
2 changes: 2 additions & 0 deletions netsim/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ func (gp *Port) WritePacket(payload []byte, flags TCPFlags, raddr netip.AddrPort
// Build and send the packet.
//
// As documented, copy the payload.
const linuxDefaultTTL = 64
pkt := &Packet{
TTL: linuxDefaultTTL,
SrcAddr: gp.addr.LocalAddr.Addr(),
DstAddr: raddr.Addr(),
IPProtocol: gp.addr.Protocol,
Expand Down
87 changes: 87 additions & 0 deletions netsim/router/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// SPDX-License-Identifier: GPL-3.0-or-later

// Package router provides network routing capabilities for testing.
package router

import (
"errors"
"net/netip"

"github.com/rbmk-project/x/netsim/packet"
)

// Router provides routing capabilities.
type Router struct {
// devs tracks attached [packet.NetworkDevice].
devs []packet.NetworkDevice

// srt is the static routing table.
srt map[netip.Addr]packet.NetworkDevice
}

// New creates a new [*Router].
func New() *Router {
return &Router{
devs: make([]packet.NetworkDevice, 0),
srt: make(map[netip.Addr]packet.NetworkDevice),
}
}

// Attach attaches a [packet.NetworkDevice] to the [*Router].
func (r *Router) Attach(dev packet.NetworkDevice) {
r.devs = append(r.devs, dev)
go r.readLoop(dev)
}

// AddRoute adds routes for all addresses of the given [packet.NetworkDevice].
func (r *Router) AddRoute(dev packet.NetworkDevice) {
for _, addr := range dev.Addresses() {
r.srt[addr] = dev
}
}

// readLoop reads packets from a [packet.NetworkDevice] until EOF.
func (r *Router) readLoop(dev packet.NetworkDevice) {
for {
select {
case <-dev.EOF():
return
case pkt := <-dev.Output():
r.route(pkt)
}
}
}

var (
// errTTLExceeded is returned when a packet's TTL is exceeded.
errTTLExceeded = errors.New("TTL exceeded in transit")

// errNoRouteToHost is returned when there is no route to the host.
errNoRouteToHost = errors.New("no route to host")

// errBufferFull is returned when the buffer is full.
errBufferFull = errors.New("buffer full")
)

// route routes a given packet to its destination.
func (r *Router) route(pkt *packet.Packet) error {
// Decrement TTL.
if pkt.TTL <= 0 {
return errTTLExceeded
}
pkt.TTL--

// Find next hop.
nextHop := r.srt[pkt.DstAddr]
if nextHop == nil {
return errNoRouteToHost
}

// Forward packet (non-blocking)
select {
case nextHop.Input() <- pkt:
return nil
default:
return errBufferFull
}
}
23 changes: 15 additions & 8 deletions netsim/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"

"github.com/rbmk-project/common/runtimex"
"github.com/rbmk-project/x/netsim/packet"
)

// Stack models a network stack.
Expand Down Expand Up @@ -49,26 +50,27 @@ type Stack struct {
// Close to stop any muxing/demuxing goroutine.
func NewStack(addrs ...netip.Addr) *Stack {
const (
// outputBufSize adds some buffering to the output
// channel to allow for nonblocking writes of packets
// containing the RST flag in response to SYN for
// ports that aren't listening.
outputBufSize = 128

// firstEphemeralPort is the first ephemeral port
// to use according to RFC6335.
firstEphemeralPort = 49152
)
// We use buffered channels for I/O because that allows
// routers to use nonblocking writes w/o dropping packets
// unless the receiver is actively ignoring them.
//
// The buffer also allows us to send RST after SYN to
// closed port using nonblocking I/O.
input, output := packet.NewNetworkDeviceIOChannels()
ns := &Stack{
addrs: addrs,
eof: make(chan struct{}),
eofOnce: sync.Once{},
input: make(chan *Packet),
input: input,
nextport: map[IPProtocol]uint16{
IPProtocolTCP: firstEphemeralPort,
IPProtocolUDP: firstEphemeralPort,
},
output: make(chan *Packet, outputBufSize),
output: output,
portmu: sync.RWMutex{},
ports: map[PortAddr]*Port{},
}
Expand Down Expand Up @@ -185,6 +187,11 @@ func (ns *Stack) resetNonblocking(pkt *Packet) {

// demux demuxes a single incoming [*Packet].
func (ns *Stack) demux(pkt *Packet) error {
// Discard packet if the TTL is zero.
if pkt.TTL <= 0 {
return EHOSTUNREACH
}

// Discard packet if the address is not local.
if !ns.isLocalAddr(pkt.DstAddr) {
return EHOSTUNREACH
Expand Down

0 comments on commit 52508f3

Please sign in to comment.