Skip to content

Commit f930c36

Browse files
authored
Refactor sending and receiving of packets (#25)
1 parent a52d002 commit f930c36

17 files changed

+651
-666
lines changed

.travis.yml

-5
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,6 @@ go:
1111
- 1.8
1212
- tip
1313

14-
before_install:
15-
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then sudo apt-get install -q libpcap-dev ; fi
16-
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew update ; fi
17-
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew install libpcap ; fi
18-
1914
install: make install_ci
2015

2116
script: make test

README.md

+11-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ system. It provides fast and easy active end-to-end functional testing
55
of all the components in Data Center and Cloud infrastructures.
66
Arachne is able to detect intra-DC, inter-DC, DC-to-Cloud, and
77
DC-to-External-Services issues by generating minimal traffic:
8-
8+
99
- Reachability
1010
- Round-trip and 1-way latency
1111
- Silent packet drops and black holes
@@ -14,10 +14,6 @@ DC-to-External-Services issues by generating minimal traffic:
1414
(accidental or not)
1515
- Whether network-level SLAs are met
1616

17-
## Requirements
18-
19-
* libpcap
20-
2117
## Usage
2218

2319
There are two ways to use the Arachne package.
@@ -33,8 +29,8 @@ Import this package and call Arachne from your program/service with
3329
where the option provided above is among the few optional ones.
3430

3531

36-
Below is the list of all the CLI options available, when Arachne is
37-
used as a standalone program. The default options should be good
32+
Below is the list of all the CLI options available, when Arachne is
33+
used as a standalone program. The default options should be good
3834
enough for most users.
3935

4036
```
@@ -73,6 +69,14 @@ as root user, by being granted `CAP_NET_RAW` capability
7369
(see: [capabilities][]).
7470

7571

72+
### Note on BPF filtering
73+
74+
When receiving packets, Arachne attempts to apply a BPF filter to the raw socket
75+
so that processing of packets occurs on a much smaller set (ones destined
76+
specifically for Arachne agent testing). This is currently supported only on
77+
Linux and thus performance will be worse on BSD-based systems where a larger
78+
number of packets must be inspected.
79+
7680
<hr>
7781

7882
Released under the [MIT License](LICENSE.md).

bootstrap.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/uber/arachne/collector"
3030
"github.com/uber/arachne/config"
3131
d "github.com/uber/arachne/defines"
32+
"github.com/uber/arachne/internal/ip"
3233
"github.com/uber/arachne/internal/log"
3334
"github.com/uber/arachne/internal/tcp"
3435
"github.com/uber/arachne/internal/util"
@@ -86,12 +87,15 @@ func Run(ec *config.Extended, opts ...Option) {
8687
logger.Error("error initializing stats", zap.Error(err))
8788
}
8889

90+
// Hold raw socket connection for IPv4 packets
91+
var connIPv4 *ip.Conn
92+
8993
logger.Info("Starting up arachne")
9094

9195
for {
9296
var (
9397
err error
94-
currentDSCP tcp.DSCPValue
98+
currentDSCP ip.DSCPValue
9599
dnsWg sync.WaitGroup
96100
finishedCycleUpload sync.WaitGroup
97101
)
@@ -120,7 +124,7 @@ func Run(ec *config.Extended, opts ...Option) {
120124
dnsRefresh := time.NewTicker(d.DNSRefreshInterval)
121125
dnsWg.Add(1)
122126
killC.DNSRefresh = make(chan struct{})
123-
config.ResolveDnsTargets(gl.Remotes, gl.RemoteConfig, dnsRefresh, &dnsWg,
127+
config.ResolveDNSTargets(gl.Remotes, gl.RemoteConfig, dnsRefresh, &dnsWg,
124128
killC.DNSRefresh, logger)
125129
dnsWg.Wait()
126130
logger.Debug("Remotes after DNS resolution include",
@@ -132,6 +136,16 @@ func Run(ec *config.Extended, opts ...Option) {
132136
sentC := make(chan tcp.Message, d.ChannelOutBufferSize)
133137
rcvdC := make(chan tcp.Message, d.ChannelInBufferSize)
134138

139+
// Connection for IPv4 packets
140+
if connIPv4 == nil {
141+
connIPv4 = ip.NewConn(
142+
d.AfInet,
143+
gl.RemoteConfig.TargetTCPPort,
144+
gl.RemoteConfig.InterfaceName,
145+
gl.RemoteConfig.SrcAddress,
146+
logger)
147+
}
148+
135149
// Actual echoing is a percentage of the total configured batch cycle duration.
136150
realBatchInterval := time.Duration(float32(gl.RemoteConfig.BatchInterval) *
137151
d.BatchIntervalEchoingPerc)
@@ -150,8 +164,7 @@ func Run(ec *config.Extended, opts ...Option) {
150164
if !*gl.CLI.SenderOnlyMode {
151165
// Listen for responses or probes from other IPv4 arachne agents.
152166
killC.Receiver = make(chan struct{})
153-
err = tcp.Receiver("ip4", &gl.RemoteConfig.SrcAddress, gl.RemoteConfig.TargetTCPPort,
154-
gl.RemoteConfig.InterfaceName, sentC, rcvdC, killC.Receiver, logger)
167+
err = tcp.Receiver(connIPv4, sentC, rcvdC, killC.Receiver, logger)
155168
if err != nil {
156169
logger.Fatal("IPv4 receiver failed to start", zap.Error(err))
157170
}
@@ -163,23 +176,24 @@ func Run(ec *config.Extended, opts ...Option) {
163176
logger.Debug("Echoing...")
164177
// Start echoing all targets.
165178
killC.Echo = make(chan struct{})
166-
tcp.EchoTargets(gl.Remotes, &gl.RemoteConfig.SrcAddress, gl.RemoteConfig.TargetTCPPort,
179+
tcp.EchoTargets(gl.Remotes, connIPv4, gl.RemoteConfig.TargetTCPPort,
167180
gl.RemoteConfig.SrcTCPPortRange, gl.RemoteConfig.QoSEnabled, &currentDSCP,
168181
realBatchInterval, batchEndCycle, sentC, *gl.CLI.SenderOnlyMode,
169182
completeCycleUpload, &finishedCycleUpload, killC.Echo, logger)
170183
}
171184

172185
select {
173186
case <-configRefresh.C:
174-
util.CleanUpRefresh(killC, *gl.CLI.ReceiverOnlyMode, *gl.CLI.SenderOnlyMode, gl.RemoteConfig.ResolveDNS)
187+
util.CleanUpRefresh(killC, *gl.CLI.ReceiverOnlyMode,
188+
*gl.CLI.SenderOnlyMode, gl.RemoteConfig.ResolveDNS)
175189
log.ResetLogFiles(gl.App.Logging.OutputPaths, d.LogFileSizeMaxMB, d.LogFileSizeKeepKB, logger)
176190
logger.Info("Refreshing target list file, if needed")
177191
configRefresh.Stop()
178192
case <-sigC:
179193
logger.Debug("Received SIG")
180194
configRefresh.Stop()
181195
util.CleanUpAll(killC, *gl.CLI.ReceiverOnlyMode, *gl.CLI.SenderOnlyMode,
182-
gl.RemoteConfig.ResolveDNS, gl.App.PIDPath, sr, logger)
196+
gl.RemoteConfig.ResolveDNS, connIPv4, gl.App.PIDPath, sr, logger)
183197
logger.Info("Exiting")
184198
os.Exit(0)
185199
}

collector/collector.go

+33-30
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ import (
2929

3030
"github.com/uber/arachne/config"
3131
"github.com/uber/arachne/defines"
32+
"github.com/uber/arachne/internal/ip"
3233
"github.com/uber/arachne/internal/log"
3334
"github.com/uber/arachne/internal/tcp"
3435
"github.com/uber/arachne/metrics"
3536

3637
"github.com/fatih/color"
38+
"github.com/google/gopacket/layers"
3739
"go.uber.org/zap"
3840
"go.uber.org/zap/zapcore"
3941
)
@@ -49,14 +51,14 @@ type report struct {
4951
}
5052

5153
// map[target address string] => *[QOS_DCSP_VALUE] =>map[source port]
52-
type resultStore map[string]*[defines.NumQOSDCSPValues]map[uint16]report
54+
type resultStore map[string]*[defines.NumQOSDCSPValues]map[layers.TCPPort]report
5355
type messageStore map[string]*[defines.NumQOSDCSPValues]srcPortScopedMessageStore
5456

5557
type srcPortScopedMessageStore struct {
5658
sent srcPortScopedMessages
5759
rcvd srcPortScopedMessages
5860
}
59-
type srcPortScopedMessages map[uint16]tcp.Message
61+
type srcPortScopedMessages map[layers.TCPPort]tcp.Message
6062

6163
func (ms messageStore) target(target string, QosDSCPIndex uint8) *srcPortScopedMessageStore {
6264
// TODO: validate dscp is in range or create a dscp type alias
@@ -72,19 +74,19 @@ func (ms messageStore) target(target string, QosDSCPIndex uint8) *srcPortScopedM
7274
return &ms[target][QosDSCPIndex]
7375
}
7476

75-
func (spsm srcPortScopedMessages) add(srcPort uint16, message tcp.Message) {
77+
func (spsm srcPortScopedMessages) add(srcPort layers.TCPPort, message tcp.Message) {
7678
spsm[srcPort] = message
7779
}
7880

79-
func (ms messageStore) sentAdd(target string, QosDSCPIndex uint8, srcPort uint16, message tcp.Message) {
81+
func (ms messageStore) sentAdd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, message tcp.Message) {
8082
ms.target(target, QosDSCPIndex).sent.add(srcPort, message)
8183
}
8284

83-
func (ms messageStore) rcvdAdd(target string, QosDSCPIndex uint8, srcPort uint16, message tcp.Message) {
85+
func (ms messageStore) rcvdAdd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, message tcp.Message) {
8486
ms.target(target, QosDSCPIndex).rcvd.add(srcPort, message)
8587
}
8688

87-
func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort uint16) (tcp.Message, bool) {
89+
func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort layers.TCPPort) (tcp.Message, bool) {
8890

8991
if _, exists := ms[target]; !exists {
9092
return tcp.Message{}, false
@@ -99,7 +101,7 @@ func (ms messageStore) existsRcvd(target string, QosDSCPIndex uint8, srcPort uin
99101
return matchedMsg, true
100102
}
101103

102-
func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort uint16) (tcp.Message, bool) {
104+
func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort layers.TCPPort) (tcp.Message, bool) {
103105

104106
if _, exists := ms[target]; !exists {
105107
return tcp.Message{}, false
@@ -114,23 +116,23 @@ func (ms messageStore) existsSent(target string, QosDSCPIndex uint8, srcPort uin
114116
return matchedMsg, true
115117
}
116118

117-
func (rs resultStore) add(target string, QosDSCPIndex uint8, srcPort uint16, r report) {
119+
func (rs resultStore) add(target string, QosDSCPIndex uint8, srcPort layers.TCPPort, r report) {
118120

119121
if rs[target] == nil {
120-
var resDSCP [defines.NumQOSDCSPValues]map[uint16]report
122+
var resDSCP [defines.NumQOSDCSPValues]map[layers.TCPPort]report
121123
rs[target] = &resDSCP
122124
}
123125
if rs[target][QosDSCPIndex] == nil {
124-
rs[target][QosDSCPIndex] = make(map[uint16]report)
126+
rs[target][QosDSCPIndex] = make(map[layers.TCPPort]report)
125127
}
126128
rs[target][QosDSCPIndex][srcPort] = r
127129
}
128130

129-
type resultWalker func(report, string, string, uint16, bool, *log.Logger)
131+
type resultWalker func(report, string, string, layers.TCPPort, bool, *log.Logger)
130132

131133
func (rs resultStore) walkResults(
132134
remotes config.RemoteStore,
133-
currentDSCP *tcp.DSCPValue,
135+
currentDSCP *ip.DSCPValue,
134136
foreground bool,
135137
logger *log.Logger,
136138
walkerF ...resultWalker) {
@@ -144,9 +146,10 @@ func (rs resultStore) walkResults(
144146

145147
qos := *currentDSCP
146148
if remote.External {
147-
qos = tcp.DSCPBeLow
149+
qos = ip.DSCPBeLow
148150
}
149-
for srcPort, rep := range r[(tcp.GetDSCP).Pos(qos, logger)] {
151+
152+
for srcPort, rep := range r[(ip.GetDSCP).Pos(qos, logger)] {
150153
walkerF[0](rep, remote.Hostname, remote.Location, srcPort, foreground, logger)
151154
}
152155
if len(walkerF) > 1 {
@@ -184,7 +187,7 @@ func (rs resultStore) processResults(
184187

185188
// Store processed report to 'result' data structure for stdout, if needed
186189
if !*(gl.CLI.SenderOnlyMode) {
187-
QosDSCPIndex := (tcp.GetDSCP).Pos(req.QosDSCP, logger)
190+
QosDSCPIndex := (ip.GetDSCP).Pos(req.QosDSCP, logger)
188191
rs.add(target, QosDSCPIndex, req.SrcPort, r)
189192
}
190193

@@ -194,7 +197,7 @@ func (rs resultStore) processResults(
194197
func (rs resultStore) printResults(
195198
gl *config.Global,
196199
remotes config.RemoteStore,
197-
currentDSCP *tcp.DSCPValue,
200+
currentDSCP *ip.DSCPValue,
198201
logger *log.Logger,
199202
) {
200203
foreground := *gl.CLI.Foreground
@@ -210,7 +213,7 @@ func Run(
210213
sentC chan tcp.Message,
211214
rcvdC chan tcp.Message,
212215
remotes config.RemoteStore,
213-
currentDSCP *tcp.DSCPValue,
216+
currentDSCP *ip.DSCPValue,
214217
sr metrics.Reporter,
215218
completeCycleUpload chan bool,
216219
wg *sync.WaitGroup,
@@ -246,7 +249,7 @@ func batchWorker(
246249
remotes config.RemoteStore,
247250
ms messageStore,
248251
rs resultStore,
249-
currentDSCP *tcp.DSCPValue,
252+
currentDSCP *ip.DSCPValue,
250253
sfn statsUploader,
251254
sr metrics.Reporter,
252255
completeCycleUpload chan bool,
@@ -263,7 +266,7 @@ func batchWorker(
263266
zap.Any("type", out.Type))
264267
continue
265268
}
266-
QosDSCPIndex := (tcp.GetDSCP).Pos(out.QosDSCP, logger)
269+
QosDSCPIndex := (ip.GetDSCP).Pos(out.QosDSCP, logger)
267270

268271
// SYN sent
269272
targetKey := out.DstAddr.String()
@@ -285,7 +288,7 @@ func batchWorker(
285288
zap.Any("type", in.Type))
286289
continue
287290
}
288-
QosDSCPIndex := (tcp.GetDSCP).Pos(in.QosDSCP, logger)
291+
QosDSCPIndex := (ip.GetDSCP).Pos(in.QosDSCP, logger)
289292

290293
// SYN+ACK received
291294
targetKey := in.SrcAddr.String()
@@ -359,8 +362,8 @@ type statsUploader func(
359362
sr metrics.Reporter,
360363
target string,
361364
remotes config.RemoteStore,
362-
QOSDSCP tcp.DSCPValue,
363-
srcPort uint16,
365+
QOSDSCP ip.DSCPValue,
366+
srcPort layers.TCPPort,
364367
r *report,
365368
logger *log.Logger,
366369
)
@@ -370,8 +373,8 @@ func statsUpload(
370373
sr metrics.Reporter,
371374
target string,
372375
remotes config.RemoteStore,
373-
QOSDSCP tcp.DSCPValue,
374-
srcPort uint16,
376+
QOSDSCP ip.DSCPValue,
377+
srcPort layers.TCPPort,
375378
r *report,
376379
logger *log.Logger,
377380
) {
@@ -416,12 +419,12 @@ func zeroOutResults(
416419
for targetKey := range ms {
417420
_, existsTarget := rs[targetKey]
418421
if !existsTarget {
419-
var resDSCP [defines.NumQOSDCSPValues]map[uint16]report
422+
var resDSCP [defines.NumQOSDCSPValues]map[layers.TCPPort]report
420423
rs[targetKey] = &resDSCP
421424
}
422425
for qosDSCP := 0; qosDSCP < defines.NumQOSDCSPValues; qosDSCP++ {
423426
if rs[targetKey][qosDSCP] == nil {
424-
rs[targetKey][qosDSCP] = make(map[uint16]report)
427+
rs[targetKey][qosDSCP] = make(map[layers.TCPPort]report)
425428
}
426429
for srcPort := range ms[targetKey][qosDSCP].sent {
427430
if _, existsSrc := rs[targetKey][qosDSCP][srcPort]; existsSrc {
@@ -430,7 +433,7 @@ func zeroOutResults(
430433
rs[targetKey][qosDSCP][srcPort] = timedOutReport
431434

432435
// Upload timed out results
433-
sfn(glr, sr, targetKey, remotes, tcp.GetDSCP[qosDSCP], srcPort, &timedOutReport, logger)
436+
sfn(glr, sr, targetKey, remotes, ip.GetDSCP[qosDSCP], srcPort, &timedOutReport, logger)
434437
time.Sleep(1 * time.Millisecond)
435438
}
436439
}
@@ -465,7 +468,7 @@ func printTableHeader(gl *config.Global, currentDSCP string, logger *log.Logger)
465468
zap.String("version", defines.ArachneVersion),
466469
zap.String("host", gl.RemoteConfig.HostName),
467470
zap.String("host_location", gl.RemoteConfig.Location),
468-
zap.Uint16("target_TCP_port", gl.RemoteConfig.TargetTCPPort),
471+
zap.Any("target_TCP_port", gl.RemoteConfig.TargetTCPPort),
469472
zap.String("QoS_DSCP", currentDSCP),
470473
)
471474
}
@@ -486,7 +489,7 @@ func printTableEntry(
486489
r report,
487490
targetHost string,
488491
targetLocation string,
489-
srcPort uint16,
492+
srcPort layers.TCPPort,
490493
foreground bool,
491494
logger *log.Logger,
492495
) {
@@ -532,7 +535,7 @@ func printTableEntry(
532535
logger.Info("Result",
533536
zap.String("target", targetHost),
534537
zap.String("target_location", targetLocation),
535-
zap.Uint16("source_port", srcPort),
538+
zap.Any("source_port", srcPort),
536539
twoWay,
537540
oneWay,
538541
zap.String("timed_out", timedOut))

0 commit comments

Comments
 (0)