Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #110 from DataDog/kevin.georges/no-b64
Browse files Browse the repository at this point in the history
store data as raw instead of b64
  • Loading branch information
d33d33 authored Aug 25, 2021
2 parents 11f521d + 7e32b96 commit 6bf55fe
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
KVEXPRESS_VERSION="1.15"
KVEXPRESS_VERSION="1.16"
GIT_COMMIT=$(shell git rev-parse HEAD)
COMPILE_DATE=$(shell date -u +%Y%m%d.%H%M%S)
BUILD_FLAGS=-X main.CompileDate=$(COMPILE_DATE) -X main.GitCommit=$(GIT_COMMIT) -X main.Version=$(KVEXPRESS_VERSION)
Expand Down
44 changes: 27 additions & 17 deletions commands/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ package commands

import (
"fmt"
consul "github.com/hashicorp/consul/api"
"strings"
"time"

consul "github.com/hashicorp/consul/api"
)

const (
Expand Down Expand Up @@ -49,13 +50,23 @@ func cleanupToken(token string) string {

// Get the value from a key in the Consul KV store.
func Get(c *consul.Client, key string) string {
var str string
data, _ := GetRaw(c, key)
return string(data)
}

// Get the raw value + flags from a key in the Consul KV store.
func GetRaw(c *consul.Client, key string) ([]byte, uint64) {
var data []byte
var flags uint64
Retry(func() error {
var err error
str, err = consulGet(c, key)
pair, err := consulGet(c, key)
if pair != nil {
data = pair.Value[:]
flags = pair.Flags
}
return err
}, consulTries)
return str
return data, flags
}

// Retry loops through the callback func and tries several times to do the thing.
Expand All @@ -77,29 +88,28 @@ func Retry(callback func() error, tries int) {
}

// consulGet the value from a key in the Consul KV store.
func consulGet(c *consul.Client, key string) (string, error) {
var value string
func consulGet(c *consul.Client, key string) (*consul.KVPair, error) {
kv := c.KV()
key = strings.TrimPrefix(key, "/")
pair, _, err := kv.Get(key, nil)
if err != nil {
return "", err
}
if pair != nil {
value = string(pair.Value[:])
} else {
value = ""
return nil, err
}
Log(fmt.Sprintf("action='consulGet' key='%s'", key), "debug")
return value, err
return pair, err
}

// Set the value for a key in the Consul KV store.
func Set(c *consul.Client, key string, value string) bool {
return SetRaw(c, key, []byte(value), 0)
}

// Set the value for a key in the Consul KV store.
func SetRaw(c *consul.Client, key string, value []byte, flags uint64) bool {
var success bool
Retry(func() error {
var err error
success, err = consulSet(c, key, value)
success, err = consulSet(c, key, value, flags)
if success != true {
StatsdConsul(key, "set")
}
Expand All @@ -109,9 +119,9 @@ func Set(c *consul.Client, key string, value string) bool {
}

// consulSet a value for a key in the Consul KV store.
func consulSet(c *consul.Client, key string, value string) (bool, error) {
func consulSet(c *consul.Client, key string, value []byte, flags uint64) (bool, error) {
key = strings.TrimPrefix(key, "/")
p := &consul.KVPair{Key: key, Value: []byte(value)}
p := &consul.KVPair{Key: key, Value: value, Flags: flags}
kv := c.KV()
_, err := kv.Put(p, nil)
if err != nil {
Expand Down
21 changes: 14 additions & 7 deletions commands/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package commands

import (
"fmt"
"github.com/spf13/cobra"
"gopkg.in/zorkian/go-datadog-api.v1"
"os"
"time"

"github.com/spf13/cobra"
"gopkg.in/zorkian/go-datadog-api.v1"
)

var copyCmd = &cobra.Command{
Expand Down Expand Up @@ -39,11 +40,14 @@ func copyRun(cmd *cobra.Command, args []string) {
}

// Get the KV data out of Consul.
KVData := Get(c, KeyData)
KVRaw, KVFlags := GetRaw(c, KeyData)

// Decompress here if necessary.
var KVData string
if Compress {
KVData = DecompressData(KVData)
KVData = DecompressData(KVRaw, KVFlags)
} else {
KVData = string(KVRaw)
}

// Get the Checksum data out of Consul.
Expand All @@ -60,16 +64,19 @@ func copyRun(cmd *cobra.Command, args []string) {
// If the data is long enough and the checksum matches, save to the new key location.
if longEnough && checksumMatch {
Log(fmt.Sprintf("copy='true' keyFrom='%s' keyTo='%s'", KeyFrom, KeyTo), "info")
var cFlags uint64
if Compress {
KVData = CompressData(KVData)
KVRaw, cFlags = CompressData(KVData)
} else {
KVRaw = []byte(KVData)
}
// New destination key Locations
KeyData = KeyPath(KeyTo, "data")
KeyChecksum = KeyPath(KeyTo, "checksum")
// Save it.
saved := Set(c, KeyData, KVData)
saved := SetRaw(c, KeyData, KVRaw, cFlags)
if saved {
KVDataBytes := len(KVData)
KVDataBytes := len(KVRaw)
Log(fmt.Sprintf("consul KeyData='%s' saved='true' size='%d'", KeyData, KVDataBytes), "info")
Set(c, KeyChecksum, Checksum)
if DatadogAPIKey != "" && DatadogAPPKey != "" {
Expand Down
5 changes: 0 additions & 5 deletions commands/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ func StatsdIn(key string, dataLength int, data string) {
tags := makeTags(key, "complete")
statsd.Incr("kvexpress.in", tags)
statsd.Gauge("kvexpress.bytes", float64(dataLength), tags)
// If the data is compressed - then LineCount will always return 1.
// That's not useful or accurate, so let's decompress and count that.
if Compress {
data = DecompressData(data)
}
statsd.Gauge("kvexpress.lines", float64(LineCount(data)), tags)
}
}
Expand Down
15 changes: 10 additions & 5 deletions commands/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package commands

import (
"fmt"
"github.com/spf13/cobra"
"gopkg.in/zorkian/go-datadog-api.v1"
"os"
"time"

"github.com/spf13/cobra"
"gopkg.in/zorkian/go-datadog-api.v1"
)

var inCmd = &cobra.Command{
Expand Down Expand Up @@ -134,12 +135,16 @@ func inRun(cmd *cobra.Command, args []string) {
if CurrentChecksum != CompareChecksum {
Log("consul checksum='different' update='true'", "info")
// Compress data here.
var CompareRaw []byte
var cFlags uint64
if Compress {
CompareData = CompressData(CompareData)
CompareRaw, cFlags = CompressData(CompareData)
} else {
CompareRaw = []byte(CompareData)
}
saved := Set(c, KeyData, CompareData)
saved := SetRaw(c, KeyData, CompareRaw, cFlags)
if saved {
CompareDataBytes := len(CompareData)
CompareDataBytes := len(CompareRaw)
Log(fmt.Sprintf("consul KeyData='%s' saved='true' size='%d'", KeyData, CompareDataBytes), "info")
Set(c, KeyChecksum, CompareChecksum)
if DatadogAPIKey != "" && DatadogAPPKey != "" {
Expand Down
10 changes: 7 additions & 3 deletions commands/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ package commands

import (
"fmt"
"github.com/spf13/cobra"
"os"
"time"

"github.com/spf13/cobra"
)

var outCmd = &cobra.Command{
Expand Down Expand Up @@ -57,11 +58,14 @@ func outRun(cmd *cobra.Command, args []string) {
}

// Get the KV data out of Consul.
KVData := Get(c, KeyData)
KVRaw, KVFlags := GetRaw(c, KeyData)

// Decompress here if necessary.
var KVData string
if Compress {
KVData = DecompressData(KVData)
KVData = DecompressData(KVRaw, KVFlags)
} else {
KVData = string(KVRaw)
}

// Get the Checksum data out of Consul.
Expand Down
57 changes: 36 additions & 21 deletions commands/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ import (
"compress/gzip"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/user"
"strconv"
"strings"
"time"

"github.com/andybalholm/brotli"
)

const (
GZIPb64 = iota
BROTLI
)

// ReturnCurrentUTC returns the current UTC time in RFC3339 format.
Expand Down Expand Up @@ -134,45 +142,52 @@ func GetGroupID(owner string) int {
return int(gidInt)
}

// CompressData compresses and base64 encodes a string to place into Consul's KV store.
func CompressData(data string) string {
// CompressData compresses a string to place into Consul's KV store.
func CompressData(data string) ([]byte, uint64) {
var compressed bytes.Buffer
gz, _ := gzip.NewWriterLevel(&compressed, gzip.BestCompression)
gz := brotli.NewWriterLevel(&compressed, brotli.BestCompression)
gz.Write([]byte(data))
gz.Flush()
gz.Close()
encoded := base64.StdEncoding.EncodeToString(compressed.Bytes())
encoded := compressed.Bytes()
Log(fmt.Sprintf("compressing='true' full_size='%d' compressed_size='%d'", len(data), len(encoded)), "info")
return encoded
return encoded, BROTLI
}

// DecompressData base64 decodes and decompresses a string taken from Consul's KV store.
func DecompressData(data string) string {
if data != "" {
// If it's been compressed, it's been base64 encoded.
raw, err := base64.StdEncoding.DecodeString(data)
// DecompressData decompresses a string taken from Consul's KV store.
func DecompressData(data []byte, compressFlags uint64) string {
if data == nil {
return ""
}

var r io.Reader
switch compressFlags {
case GZIPb64:
raw, err := base64.StdEncoding.DecodeString(string(data))
if err != nil {
Log("function='DecompressData' panic='true' method='base64.StdEncoding.DecodeString'", "info")
fmt.Println("Panic: Could not base64 decode string.")
StatsdPanic("key", "DecompressData")
}
data = raw
// gunzip the string.
unzipped, err := gzip.NewReader(strings.NewReader(string(raw)))
unzipped, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
Log("function='DecompressData' panic='true' method='gzip.NewReader'", "info")
fmt.Println("Panic: Could not gunzip string.")
StatsdPanic("key", "DecompressData")
}
uncompressed, err := ioutil.ReadAll(unzipped)
if err != nil {
Log("function='DecompressData' panic='true' method='ioutil.ReadAll'", "info")
fmt.Println("Panic: Could not ioutil.ReadAll string.")
StatsdPanic("key", "DecompressData")
}
Log(fmt.Sprintf("decompressing='true' size='%d'", len(uncompressed)), "info")
return string(uncompressed)
r = unzipped
case BROTLI:
r = brotli.NewReader(bytes.NewReader(data))
}
uncompressed, err := ioutil.ReadAll(r)
if err != nil {
Log("function='DecompressData' panic='true' method='ioutil.ReadAll'", "info")
fmt.Println("Panic: Could not ioutil.ReadAll string.")
StatsdPanic("key", "DecompressData")
}
return ""
Log(fmt.Sprintf("decompressing='true' size='%d'", len(uncompressed)), "info")
return string(uncompressed)
}

// GetHostname returns the hostname.
Expand Down

0 comments on commit 6bf55fe

Please sign in to comment.