Skip to content

Commit

Permalink
Wrappers (#45)
Browse files Browse the repository at this point in the history
* add new client interface

* tidy go mod

* add tests for new client send functions

* fix SendRaw check for transport phase

* SendRaw share code path with Send

* Revert "SendRaw share code path with Send"

This reverts commit 54d04cd.
  • Loading branch information
ericmillin authored Jun 27, 2022
1 parent 904b1e1 commit 372bc3c
Show file tree
Hide file tree
Showing 7 changed files with 835 additions and 105 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@ The `record` object must be a `map` or `struct`. Objects that implement the [`ms
record := map[string]interface{}{
"Hello": "World",
}
msg := protocol.NewMessage("tag", record)
err := c.Send(msg)
err := c.SendMessage("tag", record)
```

### Send a byte-encoded message

```go
raw := protocol.RawMessage(myMessageBytes)
err := c.Send(raw)
err := c.SendRaw(myMessageBytes)
```

### Message confirmation
Expand Down
27 changes: 5 additions & 22 deletions cmd/forward/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,48 +85,31 @@ func main() {
},
}

msg := protocol.NewMessage(tagVar, record)
mne := protocol.NewMessageExt(tagVar, record)
fwd := protocol.NewForwardMessage(tagVar, entries)
packedFwd, _ := protocol.NewPackedForwardMessage(tagVar+".packed", entries)
compressed, _ := protocol.NewCompressedPackedForwardMessage(tagVar+".compressed",
fwd.Entries)

err = c.Send(msg)
err = c.SendMessage(tagVar, record)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

err = c.Send(mne)
err = c.SendMessageExt(tagVar, record)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

err = c.Send(fwd)
err = c.SendForward(tagVar, entries)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

err = c.Send(packedFwd)
err = c.SendPacked(tagVar+".packed", entries)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

err = c.Send(compressed)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

_, _ = compressed.Chunk()
b, _ := compressed.MarshalMsg(nil)
rm := protocol.RawMessage(b)

err = c.Send(rm)
err = c.SendCompressed(tagVar+".compressed", entries)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
147 changes: 109 additions & 38 deletions fluent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type MessageClient interface {
Disconnect() (err error)
Reconnect() error
Send(e protocol.ChunkEncoder) error
SendCompressed(tag string, entries protocol.EntryList) error
SendCompressedFromBytes(tag string, entries []byte) error
SendForward(tag string, entries protocol.EntryList) error
SendMessage(tag string, record interface{}) error
SendMessageExt(tag string, record interface{}) error
SendPacked(tag string, entries protocol.EntryList) error
SendPackedFromBytes(tag string, entries []byte) error
SendRaw(raw []byte) error
}

Expand Down Expand Up @@ -140,6 +147,62 @@ func (c *Client) connect() error {
return nil
}

// Handshake initiates handshake mode. Users must call this before attempting
// to send any messages when the server is configured with a shared key, otherwise
// the server will reject any message events. Successful completion of the
// handshake puts the connection into message (or forward) mode, at which time
// the client is free to send event messages.
func (c *Client) Handshake() error {
c.sessionLock.RLock()
defer c.sessionLock.RUnlock()

if c.session == nil {
return errors.New("not connected")
}

var helo protocol.Helo

r := msgp.NewReader(c.session.Connection)
err := helo.DecodeMsg(r)

if err != nil {
return err
}

salt := make([]byte, 16)

_, err = rand.Read(salt)
if err != nil {
return err
}

ping, err := protocol.NewPing(c.Hostname, c.AuthInfo.SharedKey, salt, helo.Options.Nonce)
if err != nil {
return err
}

err = msgp.Encode(c.session.Connection, ping)
if err != nil {
return err
}

var pong protocol.Pong

err = pong.DecodeMsg(r)
if err != nil {
return err
}

if err := protocol.ValidatePongDigest(&pong, c.AuthInfo.SharedKey,
helo.Options.Nonce, salt); err != nil {
return err
}

c.session.TransportPhase = true

return nil
}

// Connect initializes the Session and Connection objects by opening
// a client connect to the target configured in the ConnectionFactory
func (c *Client) Connect() error {
Expand Down Expand Up @@ -239,61 +302,69 @@ func (c *Client) Send(e protocol.ChunkEncoder) error {
// is not yet in transport phase, an error is returned,
// and no message is sent.
func (c *Client) SendRaw(m []byte) error {
return c.Send(protocol.RawMessage(m))
}

// Handshake initiates handshake mode. Users must call this before attempting
// to send any messages when the server is configured with a shared key, otherwise
// the server will reject any message events. Successful completion of the
// handshake puts the connection into message (or forward) mode, at which time
// the client is free to send event messages.
func (c *Client) Handshake() error {
c.sessionLock.RLock()
defer c.sessionLock.RUnlock()

if c.session == nil {
return errors.New("not connected")
return errors.New("no active session")
}

var helo protocol.Helo
if !c.session.TransportPhase {
return errors.New("session handshake not completed")
}

r := msgp.NewReader(c.session.Connection)
err := helo.DecodeMsg(r)
_, err := c.session.Connection.Write(m)

if err != nil {
return err
return err
}

func (c *Client) SendPacked(tag string, entries protocol.EntryList) error {
msg, err := protocol.NewPackedForwardMessage(tag, entries)
if err == nil {
err = c.Send(msg)
}

salt := make([]byte, 16)
return err
}

_, err = rand.Read(salt)
if err != nil {
return err
}
func (c *Client) SendPackedFromBytes(tag string, entries []byte) error {
msg := protocol.NewPackedForwardMessageFromBytes(tag, entries)

ping, err := protocol.NewPing(c.Hostname, c.AuthInfo.SharedKey, salt, helo.Options.Nonce)
if err != nil {
return err
}
return c.Send(msg)
}

err = msgp.Encode(c.session.Connection, ping)
if err != nil {
return err
}
func (c *Client) SendMessage(tag string, record interface{}) error {
msg := protocol.NewMessage(tag, record)

var pong protocol.Pong
return c.Send(msg)
}

err = pong.DecodeMsg(r)
if err != nil {
return err
}
func (c *Client) SendMessageExt(tag string, record interface{}) error {
msg := protocol.NewMessageExt(tag, record)

if err := protocol.ValidatePongDigest(&pong, c.AuthInfo.SharedKey,
helo.Options.Nonce, salt); err != nil {
return err
return c.Send(msg)
}

func (c *Client) SendForward(tag string, entries protocol.EntryList) error {
msg := protocol.NewForwardMessage(tag, entries)

return c.Send(msg)
}

func (c *Client) SendCompressed(tag string, entries protocol.EntryList) error {
msg, err := protocol.NewCompressedPackedForwardMessage(tag, entries)
if err == nil {
err = c.Send(msg)
}

c.session.TransportPhase = true
return err
}

return nil
func (c *Client) SendCompressedFromBytes(tag string, entries []byte) error {
msg, err := protocol.NewCompressedPackedForwardMessageFromBytes(tag, entries)
if err == nil {
err = c.Send(msg)
}

return err
}
Loading

0 comments on commit 372bc3c

Please sign in to comment.