Skip to content

Commit

Permalink
Merge pull request #567 from alfred-bratterud/master
Browse files Browse the repository at this point in the history
TCP hotfix cherry-picks
  • Loading branch information
alfreb committed Jun 8, 2016
2 parents dcecc01 + 5ec9b7f commit 9be49d8
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 52 deletions.
18 changes: 14 additions & 4 deletions api/net/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ namespace net {
return *this;
}

inline bool isset(TCP::Flag f) { return ntohs(header().offset_flags.whole) & f; }
inline bool isset(TCP::Flag f) const { return ntohs(header().offset_flags.whole) & f; }

//TCP::Flag flags() const { return (htons(header().offset_flags.whole) << 8) & 0xFF; }

Expand Down Expand Up @@ -439,9 +439,11 @@ namespace net {
return total;
}

bool is_acked_by(const Seq ack) const {
return ack >= (seq() + data_length());
}
bool is_acked_by(const Seq ack) const
{ return ack >= (seq() + data_length()); }

bool should_rtx() const
{ return has_data() or isset(SYN) or isset(FIN); }

inline std::string to_string() {
std::ostringstream os;
Expand Down Expand Up @@ -1723,6 +1725,14 @@ namespace net {
Number of retransmission attempts on the packet first in RT-queue
*/
size_t rto_attempt = 0;
// number of retransmitted SYN packets.
size_t syn_rtx_ = 0;

/*
Retransmission timeout limit reached
*/
inline bool rto_limit_reached() const
{ return rto_attempt >= 15 or syn_rtx_ >= 5; };

/*
Remove all packets acknowledge by ACK in retransmission queue
Expand Down
5 changes: 4 additions & 1 deletion src/net/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ void TCP::bottom(net::Packet_ptr packet_ptr) {
conn_it->second->segment_arrived(packet);
}
// No connection found
else {
else if(packet->isset(SYN)) {
// Is there a listener?
auto listen_conn_it = listeners_.find(packet->dst_port());
debug("<TCP::bottom> No connection found - looking for listener..\n");
Expand All @@ -208,6 +208,9 @@ void TCP::bottom(net::Packet_ptr packet_ptr) {
drop(packet);
}
}
else {
drop(packet);
}
}

void TCP::process_writeq(size_t packets) {
Expand Down
53 changes: 46 additions & 7 deletions src/net/tcp_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ void Connection::limited_tx() {
void Connection::writeq_reset() {
debug2("<Connection::writeq_reset> Reseting.\n");
writeq.reset();
if(rtx_timer.active)
rtx_stop();
}

void Connection::open(bool active) {
Expand Down Expand Up @@ -362,7 +364,7 @@ void Connection::transmit(TCP::Packet_ptr packet) {
debug2("<TCP::Connection::transmit> TX %s\n", packet->to_string().c_str());

host_.transmit(packet);
if(packet->has_data() and !rtx_timer.active) {
if(packet->should_rtx() and !rtx_timer.active) {
rtx_start();
}
}
Expand Down Expand Up @@ -420,8 +422,8 @@ bool Connection::handle_ack(TCP::Packet_ptr in) {
size_t bytes_acked = in->ack() - cb.SND.UNA;
cb.SND.UNA = in->ack();

// ack everything in write queue
if(!writeq.empty())
// ack everything in rtx queue
if(rtx_timer.active)
rtx_ack(in->ack());

// update cwnd when congestion avoidance?
Expand Down Expand Up @@ -564,6 +566,7 @@ void Connection::on_dup_ack() {
*/
void Connection::rtx_ack(const Seq ack) {
auto acked = ack - prev_highest_ack_;
// what if ack is from handshake / fin?
writeq.acknowledge(acked);
/*
When all outstanding data has been acknowledged, turn off the
Expand All @@ -587,12 +590,41 @@ void Connection::rtx_ack(const Seq ack) {
// x-rtx_q.size(), rtx_q.size());
}

/*
Assumption
Retransmission will only occur when one of the following are true:
* There is data to be sent (!SYN-SENT || !SYN-RCV) <- currently not supports
* Last packet had SYN (SYN-SENT || SYN-RCV)
* Last packet had FIN (FIN-WAIT-1 || LAST-ACK)
*/
void Connection::retransmit() {
auto packet = create_outgoing_packet();
auto& buf = writeq.una();
fill_packet(packet, (char*)buf.pos(), buf.remaining, cb.SND.UNA);
packet->set_flag(ACK);
// If not retransmission of a pure SYN packet, add ACK
if(!is_state(SynSent::instance())) {
packet->set_flag(ACK);
}
// If retransmission from either SYN-SENT or SYN-RCV, add SYN
if(is_state(SynSent::instance()) or is_state(SynReceived::instance())) {
packet->set_flag(SYN);
packet->set_seq(cb.SND.UNA);
syn_rtx_++;
}
// If not, check if there is data and retransmit
else if(writeq.size()) {
auto& buf = writeq.una();
fill_packet(packet, (char*)buf.pos(), buf.remaining, cb.SND.UNA);
}
// if no data
else {
packet->set_seq(cb.SND.UNA);
}

// If retransmission of a FIN packet
if(is_state(FinWait1::instance()) or is_state(LastAck::instance())) {
packet->set_flag(FIN);
}

//printf("<TCP::Connection::retransmit> rseq=%u \n", packet->seq() - cb.ISS);
debug("<TCP::Connection::retransmit> RT %s\n", packet->to_string().c_str());
host_.transmit(packet);
Expand All @@ -602,7 +634,7 @@ void Connection::retransmit() {
so that it will expire after RTO seconds (for the current value
of RTO).
*/
if(packet->has_data() and !rtx_timer.active) {
if(packet->should_rtx() and !rtx_timer.active) {
rtx_start();
}
}
Expand Down Expand Up @@ -654,6 +686,13 @@ void Connection::rtx_clear() {
begins (i.e., after the three-way handshake completes).
*/
void Connection::rtx_timeout() {
// experimental
if(rto_limit_reached()) {
printf("<TCP::Connection::rtx_timeout> RTX attempt limit reached, closing.\n");
close();
return;
}

// retransmit SND.UNA
retransmit();

Expand Down
6 changes: 4 additions & 2 deletions src/net/tcp_connection_states.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,8 @@ State::Result Connection::SynSent::handle(Connection& tcp, TCP::Packet_ptr in) {
tcb.IRS = in->seq();
tcb.SND.UNA = in->ack();

//tcp.rtx_ack(in->ack());
if(tcp.rtx_timer.active)
tcp.rtx_stop();

// (our SYN has been ACKed)
if(tcb.SND.UNA > tcb.ISS) {
Expand Down Expand Up @@ -1008,7 +1009,8 @@ State::Result Connection::SynReceived::handle(Connection& tcp, TCP::Packet_ptr i
tcb.SND.UNA = in->ack();
if(tcp.rttm.active)
tcp.rttm.stop();
//tcp.rtx_ack(in->ack());
if(tcp.rtx_timer.active)
tcp.rtx_stop();

// 7. proccess the segment text
if(in->has_data()) {
Expand Down
73 changes: 35 additions & 38 deletions test/tcp/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ void FINISH_TEST() {
INFO("TEST", "Started 3 x MSL timeout.");
hw::PIT::instance().onTimeout(3 * MSL_TEST, [] {
INFO("TEST", "Verify release of resources");
CHECK(inet->tcp().activeConnections() == 0,
"tcp.activeConnections() == 0");
CHECK(inet->buffers_available() == buffers_available,
"inet->buffers_available() == buffers_available");
CHECKSERT(inet->tcp().activeConnections() == 0,
"No (0) active connections");
CHECKSERT(inet->buffers_available() == buffers_available,
"No hogged buffer (%u available)", buffers_available);
printf("# TEST SUCCESS #\n");
});
}
Expand All @@ -74,8 +74,9 @@ void FINISH_TEST() {
*/
void OUTGOING_TEST_INTERNET(const HostAddress& address) {
auto port = address.second;
// This needs correct setup to work
INFO("TEST", "Outgoing Internet Connection (%s:%u)", address.first.c_str(), address.second);
inet->resolve(address.first,
inet->resolve(address.first,
[port](auto ip_address) {
CHECK(ip_address != 0, "Resolved host");

Expand All @@ -84,7 +85,7 @@ void OUTGOING_TEST_INTERNET(const HostAddress& address) {
->onConnect([](Connection_ptr conn) {
CHECK(true, "Connected");
conn->read(1024, [](buffer_t, size_t n) {
CHECK(n > 0, "Received data");
CHECK(n > 0, "Received a response");
});
})
.onError([](Connection_ptr, TCP::TCPException err) {
Expand All @@ -103,12 +104,13 @@ void OUTGOING_TEST(TCP::Socket outgoing) {
->onConnect([](Connection_ptr conn) {
conn->write(small.data(), small.size());
conn->read(small.size(), [](buffer_t buffer, size_t n) {
CHECK(std::string((char*)buffer.get(), n) == small, "conn->read() == small");
CHECKSERT(std::string((char*)buffer.get(), n) == small, "Received SMALL");
});
})
.onDisconnect([](Connection_ptr, TCP::Connection::Disconnect) {
.onDisconnect([](Connection_ptr conn, TCP::Connection::Disconnect) {
CHECK(true, "Connection closed by server");

CHECKSERT(conn->is_state({"CLOSE-WAIT"}), "State: CLOSE-WAIT");
conn->close();
OUTGOING_TEST_INTERNET(TEST_ADDR_TIME);
});
}
Expand All @@ -118,16 +120,16 @@ void OUTGOING_TEST(TCP::Socket outgoing) {
auto* eth = reinterpret_cast<net::Ethernet::header*>(p->buffer());
if (eth->type == net::Ethernet::ETH_IP4) {
auto ip4 = net::view_packet_as<PacketIP4>(p);
auto& hdr = reinterpret_cast<net::IP4::full_header*>(p->buffer())->ip_hdr;
if (hdr.protocol == net::IP4::IP4_TCP) {
auto tcp = net::view_packet_as<TCP::Packet>(p);
printf("%s\n", tcp->to_string().c_str());
printf("%s\n", tcp->to_string().c_str());
}
}
}*/

// Used to send big data
Expand All @@ -154,7 +156,7 @@ void print_stuff()
void Service::start()
{
//hw::PIT::on_timeout(5.0, print_stuff);

IP4::addr A1 (255, 255, 255, 255);
IP4::addr B1 ( 0, 255, 255, 255);
IP4::addr C1 ( 0, 0, 255, 255);
Expand All @@ -166,7 +168,7 @@ void Service::start()
printf("D: %s\n", D1.str().c_str());
printf("E: %s\n", E1.str().c_str());
printf("D & A: %s\n", (D1 & A1).str().c_str());

for(int i = 0; i < S; i++) small += TEST_STR;

big += "start-";
Expand All @@ -184,7 +186,7 @@ void Service::start()
{ 255,255,255, 0 }, // Netmask
{ 10, 0, 0, 1 }, // Gateway
{ 8, 8, 8, 8 } );// DNS

buffers_available = inet->buffers_available();
INFO("Buffers available", "%u", inet->buffers_available());
auto& tcp = inet->tcp();
Expand All @@ -199,25 +201,22 @@ void Service::start()
/*
TEST: Nothing should be allocated.
*/
CHECK(tcp.openPorts() == 0, "tcp.openPorts() == 0");
CHECK(tcp.activeConnections() == 0, "tcp.activeConnections() == 0");
CHECK(tcp.openPorts() == 0, "No (0) open ports (listening connections)");
CHECK(tcp.activeConnections() == 0, "No (0) active connections");

tcp.bind(TEST1).onConnect([](Connection_ptr conn) {
INFO("TEST", "SMALL string (%u)", small.size());
conn->read(small.size(), [conn](buffer_t buffer, size_t n) {
CHECK(inet->buffers_available() < buffers_available,
"inet->buffers_available() < buffers_available");
CHECK(std::string((char*)buffer.get(), n) == small, "conn.read() == small");
CHECKSERT(std::string((char*)buffer.get(), n) == small, "Received SMALL");
conn->close();
});
conn->write(small.data(), small.size());
INFO("Buffers available", "%u", inet->buffers_available());
});

/*
TEST: Server should be bound.
*/
CHECK(tcp.openPorts() == 1, "tcp.openPorts() == 1");
CHECK(tcp.openPorts() == 1, "One (1) open port");

/*
TEST: Send and receive big string.
Expand All @@ -229,12 +228,11 @@ void Service::start()
*response += std::string((char*)buffer.get(), n);
if(response->size() == big.size()) {
bool OK = (*response == big);
CHECK(OK, "conn.read() == big");
CHECKSERT(OK, "Received BIG");
conn->close();
}
});
conn->write(big.data(), big.size());
INFO("Buffers available", "%u", inet->buffers_available());
});

/*
Expand All @@ -250,45 +248,44 @@ void Service::start()
// when all expected data is read
if(temp->written == huge.size()) {
bool OK = (temp->str() == huge);
CHECK(OK, "conn.read() == huge");
CHECKSERT(OK, "Received HUGE");
conn->close();
}
});
conn->write(huge.data(), huge.size(), [](size_t n) {
printf("Finished write request! %u bytes written\n", n);
CHECKSERT(n == huge.size(), "Wrote HUGE (%u bytes)", n);
}, true);
INFO("Buffers available", "%u", inet->buffers_available());
});

/*
TEST: More servers should be bound.
*/
CHECK(tcp.openPorts() == 3, "tcp.openPorts() == 3");
CHECK(tcp.openPorts() == 3, "Three (3) open ports");

/*
TEST: Connection (Status etc.) and Active Close
*/
tcp.bind(TEST4).onConnect([](Connection_ptr conn) {
INFO("TEST","Connection");
INFO("TEST","Connection/TCP state");
// There should be at least one connection.
CHECK(inet->tcp().activeConnections() > 0, "tcp.activeConnections() > 0");
CHECKSERT(inet->tcp().activeConnections() > 0, "There is (>0) open connection(s)");
// Test if connected.
CHECK(conn->is_connected(), "conn.is_connected()");
CHECKSERT(conn->is_connected(), "Is connected");
// Test if writable.
CHECK(conn->is_writable(), "conn.is_writable()");
CHECKSERT(conn->is_writable(), "Is writable");
// Test if state is ESTABLISHED.
CHECK(conn->is_state({"ESTABLISHED"}), "conn.is_state(ESTABLISHED)");
CHECKSERT(conn->is_state({"ESTABLISHED"}), "State: ESTABLISHED");

INFO("TEST", "Active close");
// Test for active close.
conn->close();
CHECK(!conn->is_writable(), "!conn->is_writable()");
CHECK(conn->is_state({"FIN-WAIT-1"}), "conn.is_state(FIN-WAIT-1)");
CHECKSERT(!conn->is_writable(), "Is NOT writable");
CHECKSERT(conn->is_state({"FIN-WAIT-1"}), "State: FIN-WAIT-1");
})
.onDisconnect([](Connection_ptr conn, TCP::Connection::Disconnect) {
CHECK(conn->is_state({"FIN-WAIT-2"}), "conn.is_state(FIN-WAIT-2)");
CHECKSERT(conn->is_state({"FIN-WAIT-2"}), "State: FIN-WAIT-2");
hw::PIT::instance().onTimeout(1s,[conn]{
CHECK(conn->is_state({"TIME-WAIT"}), "conn.is_state(TIME-WAIT)");
CHECKSERT(conn->is_state({"TIME-WAIT"}), "State: TIME-WAIT");

OUTGOING_TEST({inet->router(), TEST5});
});
Expand Down

0 comments on commit 9be49d8

Please sign in to comment.