Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with unnecessary config reload in static mode #6977

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ v0.41.1 (2024-06-07)

- Updated pyroscope to v0.4.6 introducing `symbols_map_size` and `pid_map_size` configuration. (@simonswine)

### Bugfixes

- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed.
The bug only affected the "metrics" and "logs" subsystems in Static mode.

v0.41.0 (2024-05-31)
--------------------
Expand Down
26 changes: 2 additions & 24 deletions cmd/grafana-agent-service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os/exec"
"path/filepath"
"runtime"
"sync"
"testing"

"github.com/go-kit/log"
Expand Down Expand Up @@ -84,7 +83,7 @@ func Test_serviceManager(t *testing.T) {
t.Run("can forward to stdout", func(t *testing.T) {
listenHost := getListenHost(t)

var buf syncBuffer
var buf util.SyncBuffer

mgr := newServiceManager(l, serviceManagerConfig{
Path: serviceBinary,
Expand Down Expand Up @@ -112,7 +111,7 @@ func Test_serviceManager(t *testing.T) {
t.Run("can forward to stderr", func(t *testing.T) {
listenHost := getListenHost(t)

var buf syncBuffer
var buf util.SyncBuffer

mgr := newServiceManager(l, serviceManagerConfig{
Path: serviceBinary,
Expand Down Expand Up @@ -186,24 +185,3 @@ func makeServiceRequest(host string, path string, body []byte) ([]byte, error) {
}
return io.ReadAll(resp.Body)
}

// syncBuffer wraps around a bytes.Buffer and makes it safe to use from
// multiple goroutines.
type syncBuffer struct {
mut sync.RWMutex
buf bytes.Buffer
}

func (sb *syncBuffer) Bytes() []byte {
sb.mut.RLock()
defer sb.mut.RUnlock()

return sb.buf.Bytes()
}

func (sb *syncBuffer) Write(p []byte) (n int, err error) {
sb.mut.Lock()
defer sb.mut.Unlock()

return sb.buf.Write(p)
}
34 changes: 34 additions & 0 deletions internal/util/syncbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package util

import (
"bytes"
"sync"
)

// SyncBuffer wraps around a bytes.Buffer and makes it safe to use from
// multiple goroutines.
type SyncBuffer struct {
mut sync.RWMutex
buf bytes.Buffer
}

func (sb *SyncBuffer) Bytes() []byte {
sb.mut.RLock()
defer sb.mut.RUnlock()

return sb.buf.Bytes()
}

func (sb *SyncBuffer) String() string {
sb.mut.RLock()
defer sb.mut.RUnlock()

return sb.buf.String()
}

func (sb *SyncBuffer) Write(p []byte) (n int, err error) {
sb.mut.Lock()
defer sb.mut.Unlock()

return sb.buf.Write(p)
}
13 changes: 11 additions & 2 deletions static/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/wal"
"github.com/grafana/loki/pkg/tracing"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
)

func init() {
Expand Down Expand Up @@ -121,6 +122,8 @@ type Instance struct {
log log.Logger
reg *util.Unregisterer

previousConfig string

promtail *promtail.Promtail
}

Expand Down Expand Up @@ -155,14 +158,20 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e
defer i.mut.Unlock()

// No-op if the configs haven't changed.
if util.CompareYAML(c, i.cfg) {
newConfigByteArr, err := yaml.Marshal(c)
if err != nil {
return fmt.Errorf("failed to marshal new logs instance config: %w", err)
}
newConfig := string(newConfigByteArr)
if newConfig == i.previousConfig {
level.Debug(i.log).Log("msg", "instance config hasn't changed, not recreating Promtail")
return nil
}
i.previousConfig = newConfig
i.cfg = c

positionsDir := filepath.Dir(c.PositionsConfig.PositionsFile)
err := os.MkdirAll(positionsDir, 0775)
err = os.MkdirAll(positionsDir, 0775)
if err != nil {
level.Warn(i.log).Log("msg", "failed to create the positions directory. logs may be unable to save their position", "path", positionsDir, "err", err)
}
Expand Down
31 changes: 30 additions & 1 deletion static/logs/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package logs

import (
"bytes"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -30,6 +31,12 @@ func TestLogs_NilConfig(t *testing.T) {
defer l.Stop()
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug component=logs logs_config=default msg="instance config hasn't changed, not recreating Promtail"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func TestLogs(t *testing.T) {
//
// Create a temporary file to tail
Expand Down Expand Up @@ -87,7 +94,8 @@ configs:
dec.SetStrict(true)
require.NoError(t, dec.Decode(&cfg))
require.NoError(t, cfg.ApplyDefaults())
logger := log.NewSyncLogger(log.NewNopLogger())
logBuffer := bytes.Buffer{}
logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer))
l, err := New(prometheus.NewRegistry(), &cfg, logger, false)
require.NoError(t, err)
defer l.Stop()
Expand All @@ -103,6 +111,23 @@ configs:
require.Equal(t, "Hello, world!", req.Streams[0].Entries[0].Line)
}

// The config did change.
// We expect the config reload log line to not be printed.
checkConfigReloadLog(t, logBuffer.String(), 0)

//
// Apply the same config and try reloading.
// Recreate the config struct to make sure it's clean.
//
var sameCfg Config
dec = yaml.NewDecoder(strings.NewReader(cfgText))
dec.SetStrict(true)
require.NoError(t, dec.Decode(&sameCfg))
require.NoError(t, sameCfg.ApplyDefaults())
require.NoError(t, l.ApplyConfig(&sameCfg, false))

checkConfigReloadLog(t, logBuffer.String(), 1)

//
// Apply a new config and write a new line.
//
Expand Down Expand Up @@ -138,6 +163,10 @@ configs:
require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line)
}

// The config did change this time.
// We expect the config reload log line to not be printed again.
checkConfigReloadLog(t, logBuffer.String(), 1)

t.Run("update to nil", func(t *testing.T) {
// Applying a nil config should remove all instances.
err := l.ApplyConfig(nil, false)
Expand Down
12 changes: 11 additions & 1 deletion static/metrics/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/grafana/agent/internal/util"
"github.com/grafana/agent/static/metrics/cluster"
Expand Down Expand Up @@ -150,6 +151,8 @@ type Agent struct {
actor chan func()

initialBootDone atomic.Bool

previousConfig string
}

// New creates and starts a new Agent.
Expand Down Expand Up @@ -227,9 +230,16 @@ func (a *Agent) ApplyConfig(cfg Config) error {
a.mut.Lock()
defer a.mut.Unlock()

if util.CompareYAML(a.cfg, cfg) {
newConfigByteArr, err := yaml.Marshal(cfg)
if err != nil {
return fmt.Errorf("failed to marshal new config: %w", err)
}
newConfig := string(newConfigByteArr)
if newConfig == a.previousConfig {
level.Debug(a.logger).Log("msg", "not recreating metrics instance because config hasn't changed")
return nil
}
a.previousConfig = newConfig

if a.stopped {
return fmt.Errorf("agent stopped")
Expand Down
138 changes: 138 additions & 0 deletions static/metrics/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/grafana/agent/internal/util"
"github.com/grafana/agent/static/metrics/instance"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -113,6 +115,142 @@ configs:
require.Greater(t, int64(scrapeConfig.ScrapeInterval), int64(0))
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug agent=prometheus msg="not recreating metrics instance because config hasn't changed"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func TestConfigReload(t *testing.T) {
cfgText := `
wal_directory: /tmp/wal
configs:
- name: instance_a
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
`
var cfg Config

err := yaml.Unmarshal([]byte(cfgText), &cfg)
require.NoError(t, err)
err = cfg.ApplyDefaults()
require.NoError(t, err)

fact := newFakeInstanceFactory()

logBuffer := util.SyncBuffer{}
logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer))

reg := prometheus.NewRegistry()

a, err := newAgent(reg, cfg, logger, fact.factory)
require.NoError(t, err)

util.Eventually(t, func(t require.TestingT) {
require.NotNil(t, fact.created)
require.Equal(t, 1, int(fact.created.Load()))
require.Equal(t, 1, len(a.mm.ListInstances()))
})

t.Run("instances should be running", func(t *testing.T) {
for _, mi := range fact.Mocks() {
// Each instance should have wait called on it
util.Eventually(t, func(t require.TestingT) {
require.True(t, mi.running.Load())
})
}
})

util.Eventually(t, func(t require.TestingT) {
if err := testutil.GatherAndCompare(reg,
strings.NewReader(`
# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs
# TYPE agent_metrics_configs_changed_total counter
agent_metrics_configs_changed_total{event="created"} 1
`), "agent_metrics_configs_changed_total"); err != nil {
t.Errorf("mismatch metrics: %v", err)
t.FailNow()
}
})

// The config has changed (it used to be ""). The log line won't be printed.
checkConfigReloadLog(t, logBuffer.String(), 0)

//
// Try the same config.
//
var sameCfg Config

err = yaml.Unmarshal([]byte(cfgText), &sameCfg)
require.NoError(t, err)
err = sameCfg.ApplyDefaults()
require.NoError(t, err)

a.ApplyConfig(sameCfg)

util.Eventually(t, func(t require.TestingT) {
if err := testutil.GatherAndCompare(reg,
strings.NewReader(`
# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs
# TYPE agent_metrics_configs_changed_total counter
agent_metrics_configs_changed_total{event="created"} 1
`), "agent_metrics_configs_changed_total"); err != nil {
t.Errorf("mismatch metrics: %v", err)
t.FailNow()
}
})

// The config did not change. The log line should be printed.
checkConfigReloadLog(t, logBuffer.String(), 1)

//
// Try a different config.
//
cfgText = `
wal_directory: /tmp/wal
configs:
- name: instance_b
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
`
var differentCfg Config

err = yaml.Unmarshal([]byte(cfgText), &differentCfg)
require.NoError(t, err)
err = differentCfg.ApplyDefaults()
require.NoError(t, err)

a.ApplyConfig(differentCfg)

util.Eventually(t, func(t require.TestingT) {
if err := testutil.GatherAndCompare(reg,
strings.NewReader(`
# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs
# TYPE agent_metrics_configs_changed_total counter
agent_metrics_configs_changed_total{event="created"} 2
agent_metrics_configs_changed_total{event="deleted"} 1
`), "agent_metrics_configs_changed_total"); err != nil {
t.Errorf("mismatch metrics: %v", err)
t.FailNow()
}
})

// The config has changed. The log line won't be printed.
checkConfigReloadLog(t, logBuffer.String(), 1)

for _, mi := range fact.Mocks() {
util.Eventually(t, func(t require.TestingT) {
require.Equal(t, 1, int(mi.startedCount.Load()))
})
}

a.Stop()
}

func TestAgent(t *testing.T) {
// Launch two instances
cfg := Config{
Expand Down
Loading
Loading