diff --git a/CHANGELOG.md b/CHANGELOG.md index 28edf54749fb..89662ebf9695 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,9 @@ Main (unreleased) - Expose track_timestamps_staleness on Prometheus scraping, to fix the issue where container metrics live for 5 minutes after the container disappears. (@ptodev) +- Introduce the `remotecfg` service that enables loading configuration from a + remote endpoint. (@tpaschalis) + ### Enhancements - Include line numbers in profiles produced by `pyrsocope.java` component. (@korniltsev) diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index 82fa10153035..2c143e3989b2 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -33,6 +33,7 @@ import ( httpservice "github.com/grafana/agent/service/http" "github.com/grafana/agent/service/labelstore" otel_service "github.com/grafana/agent/service/otel" + remotecfgservice "github.com/grafana/agent/service/remotecfg" uiservice "github.com/grafana/agent/service/ui" "github.com/grafana/ckit/advertise" "github.com/grafana/ckit/peer" @@ -243,6 +244,14 @@ func (fr *flowRun) Run(configPath string) error { EnablePProf: fr.enablePprof, }) + remoteCfgService, err := remotecfgservice.New(remotecfgservice.Options{ + Logger: log.With(l, "service", "remotecfg"), + StoragePath: fr.storagePath, + }) + if err != nil { + return fmt.Errorf("failed to create the remotecfg service: %w", err) + } + uiService := uiservice.New(uiservice.Options{ UIPrefix: fr.uiPrefix, Cluster: clusterService.Data().(cluster.Cluster), @@ -267,6 +276,7 @@ func (fr *flowRun) Run(configPath string) error { clusterService, otelService, labelService, + remoteCfgService, }, }) diff --git a/docs/sources/flow/reference/config-blocks/remotecfg.md b/docs/sources/flow/reference/config-blocks/remotecfg.md new file mode 100644 index 000000000000..209cb4648602 --- /dev/null +++ b/docs/sources/flow/reference/config-blocks/remotecfg.md @@ -0,0 +1,97 @@ +--- +aliases: +- /docs/grafana-cloud/agent/flow/reference/config-blocks/remotecfg/ +- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/config-blocks/remotecfg/ +- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/config-blocks/remotecfg/ +- /docs/grafana-cloud/send-data/agent/flow/reference/config-blocks/remotecfg/ +canonical: remotecfgs://grafana.com/docs/agent/latest/flow/reference/config-blocks/remotecfg/ +description: Learn about the remotecfg configuration block +menuTitle: remotecfg +title: remotecfg block +--- + +# remotecfg block + +`remotecfg` is an optional configuration block that enables {{< param "PRODUCT_NAME" >}} +to fetch and load the configuration from a remote endpoint. +`remotecfg` is specified without a label and can only be provided once per +configuration file. + +The [API definition][] for managing and fetching configuration that the +`remotecfg` block uses is available under the Apache 2.0 license. + +[API definition]: https://github.com/grafana/agent-remote-config + +## Example + +```river +remotecfg { + url = "SERVICE_URL" + basic_auth { + username = "USERNAME" + password_file = "PASSWORD_FILE" + } + + id = constants.hostname + metadata = {"cluster" = "dev", "namespace" = "otlp-dev"} + poll_frequency = "5m" +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +-----------------|----------------------|---------------------------------------------------|-------------|--------- +`url` | `string` | The address of the API to poll for configuration. | `""` | no +`id` | `string` | A self-reported ID. | `see below` | no +`metadata` | `map(string)` | A set of self-reported metadata. | `{}` | no +`poll_frequency` | `duration` | How often to poll the API for new configuration. | `"1m"` | no + +If the `url` is not set, then the service block is a no-op. + +If not set, the self-reported `id` that the Agent uses is a randomly generated, +anonymous unique ID (UUID) that is stored as an `agent_seed.json` file in the +Agent's storage path so that it can persist across restarts. + +The `id` and `metadata` fields are used in the periodic request sent to the +remote endpoint so that the API can decide what configuration to serve. + +## Blocks + +The following blocks are supported inside the definition of `remotecfg`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +basic_auth | [basic_auth][] | Configure basic_auth for authenticating to the endpoint. | no +authorization | [authorization][] | Configure generic authorization to the endpoint. | no +oauth2 | [oauth2][] | Configure OAuth2 for authenticating to the endpoint. | no +oauth2 > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no +tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no + +The `>` symbol indicates deeper levels of nesting. For example, +`oauth2 > tls_config` refers to a `tls_config` block defined inside +an `oauth2` block. + +[basic_auth]: #basic_auth-block +[authorization]: #authorization-block +[oauth2]: #oauth2-block +[tls_config]: #tls_config-block + +### basic_auth block + +{{< docs/shared lookup="flow/reference/components/basic-auth-block.md" source="agent" version="" >}} + +### authorization block + +{{< docs/shared lookup="flow/reference/components/authorization-block.md" source="agent" version="" >}} + +### oauth2 block + +{{< docs/shared lookup="flow/reference/components/oauth2-block.md" source="agent" version="" >}} + +### tls_config block + +{{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" version="" >}} + diff --git a/go.mod b/go.mod index c9800748482b..c0c9bbbbc498 100644 --- a/go.mod +++ b/go.mod @@ -608,6 +608,7 @@ require github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab require ( connectrpc.com/connect v1.14.0 github.com/githubexporter/github-exporter v0.0.0-20231025122338-656e7dc33fe7 + github.com/grafana/agent-remote-config v0.0.2 github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361 github.com/natefinch/atomic v1.0.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.87.0 diff --git a/go.sum b/go.sum index 6710608f0f42..051dd158c864 100644 --- a/go.sum +++ b/go.sum @@ -1044,6 +1044,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gosnmp/gosnmp v1.36.0 h1:1Si+MImHcKIqFc3/kJEs2LOULP1nlFKlzPFyrMOk5Qk= github.com/gosnmp/gosnmp v1.36.0/go.mod h1:iLcZxN2MxKhH0jPQDVMZaSNypw1ykqVi27O79koQj6w= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= +github.com/grafana/agent-remote-config v0.0.2 h1:s3FKgVzfY5Ij+xG0wVKgVvtDrh/Bz0ZvB3D5MM7LJxU= +github.com/grafana/agent-remote-config v0.0.2/go.mod h1:amyG3pVNXKcMo+kNN46yhnAXAz/m/9Ew9MRf53n7XBg= github.com/grafana/cadvisor v0.0.0-20231110094609-5f7917925dea h1:Q5f5/nJJ0SbusZjA6F6XkJuHDbl2/PqdTGw6wHsuccA= github.com/grafana/cadvisor v0.0.0-20231110094609-5f7917925dea/go.mod h1:XjiOCFjmxXIWwauV5p39Mr2Yxlpyk72uKQH1UZvd4fQ= github.com/grafana/ckit v0.0.0-20230906125525-c046c99a5c04 h1:tG8Qxq4dN1WqakMmsPaxaH4+OQhYg5HVsarw5acLBX8= diff --git a/pkg/flow/flow_services.go b/pkg/flow/flow_services.go index 46a1c3526128..a1002d29a21a 100644 --- a/pkg/flow/flow_services.go +++ b/pkg/flow/flow_services.go @@ -1,8 +1,11 @@ package flow import ( + "context" + "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/dag" + "github.com/grafana/agent/pkg/flow/internal/worker" "github.com/grafana/agent/service" ) @@ -52,3 +55,38 @@ func serviceConsumersForGraph(graph *dag.Graph, serviceName string, includePeerS return consumers } + +// NewController returns a new, unstarted, isolated Flow controller so that +// services can instantiate their own components. +func (f *Flow) NewController(id string) service.Controller { + return serviceController{ + f: newController(controllerOptions{ + Options: Options{ + ControllerID: id, + Logger: f.opts.Logger, + Tracer: f.opts.Tracer, + DataPath: f.opts.DataPath, + Reg: f.opts.Reg, + Services: f.opts.Services, + OnExportsChange: nil, // NOTE(@tpaschalis, @wildum) The isolated controller shouldn't be able to export any values. + }, + IsModule: true, + ModuleRegistry: newModuleRegistry(), + WorkerPool: worker.NewDefaultWorkerPool(), + }), + } +} + +type serviceController struct { + f *Flow +} + +func (sc serviceController) Run(ctx context.Context) { sc.f.Run(ctx) } +func (sc serviceController) LoadSource(b []byte, args map[string]any) error { + source, err := ParseSource("", b) + if err != nil { + return err + } + return sc.f.LoadSource(source, args) +} +func (sc serviceController) Ready() bool { return sc.f.Ready() } diff --git a/service/http/http_test.go b/service/http/http_test.go index 660e0deee43a..649af93c05b6 100644 --- a/service/http/http_test.go +++ b/service/http/http_test.go @@ -215,3 +215,5 @@ func (fakeHost) ListComponents(moduleID string, opts component.InfoOptions) ([]* } func (fakeHost) GetServiceConsumers(serviceName string) []service.Consumer { return nil } + +func (fakeHost) NewController(id string) service.Controller { return nil } diff --git a/service/remotecfg/noop.go b/service/remotecfg/noop.go new file mode 100644 index 000000000000..493c3b69b762 --- /dev/null +++ b/service/remotecfg/noop.go @@ -0,0 +1,41 @@ +package remotecfg + +import ( + "context" + "errors" + + "connectrpc.com/connect" + agentv1 "github.com/grafana/agent-remote-config/api/gen/proto/go/agent/v1" +) + +type noopClient struct{} + +// GetConfig returns the agent's configuration. +func (c noopClient) GetConfig(context.Context, *connect.Request[agentv1.GetConfigRequest]) (*connect.Response[agentv1.GetConfigResponse], error) { + return nil, errors.New("noop client") +} + +// GetAgent returns information about the agent. +func (c noopClient) GetAgent(context.Context, *connect.Request[agentv1.GetAgentRequest]) (*connect.Response[agentv1.Agent], error) { + return nil, errors.New("noop client") +} + +// ListAgents returns information about all agents. +func (c noopClient) ListAgents(context.Context, *connect.Request[agentv1.ListAgentsRequest]) (*connect.Response[agentv1.Agents], error) { + return nil, errors.New("noop client") +} + +// CreateAgent registers a new agent. +func (c noopClient) CreateAgent(context.Context, *connect.Request[agentv1.CreateAgentRequest]) (*connect.Response[agentv1.Agent], error) { + return nil, errors.New("noop client") +} + +// UpdateAgent updates an existing agent. +func (c noopClient) UpdateAgent(context.Context, *connect.Request[agentv1.UpdateAgentRequest]) (*connect.Response[agentv1.Agent], error) { + return nil, errors.New("noop client") +} + +// DeleteAgent deletes an existing agent. +func (c noopClient) DeleteAgent(context.Context, *connect.Request[agentv1.DeleteAgentRequest]) (*connect.Response[agentv1.DeleteAgentResponse], error) { + return nil, errors.New("noop client") +} diff --git a/service/remotecfg/remotecfg.go b/service/remotecfg/remotecfg.go new file mode 100644 index 000000000000..528a229f7e3e --- /dev/null +++ b/service/remotecfg/remotecfg.go @@ -0,0 +1,332 @@ +package remotecfg + +import ( + "context" + "fmt" + "hash/fnv" + "math" + "os" + "path/filepath" + "reflect" + "sync" + "time" + + "connectrpc.com/connect" + "github.com/go-kit/log" + agentv1 "github.com/grafana/agent-remote-config/api/gen/proto/go/agent/v1" + "github.com/grafana/agent-remote-config/api/gen/proto/go/agent/v1/agentv1connect" + "github.com/grafana/agent/component/common/config" + "github.com/grafana/agent/internal/agentseed" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/service" + "github.com/grafana/river" + commonconfig "github.com/prometheus/common/config" +) + +func getHash(in []byte) string { + fnvHash := fnv.New32() + fnvHash.Write(in) + return fmt.Sprintf("%x", fnvHash.Sum(nil)) +} + +// Service implements a service for remote configuration. +// The default value of ch is nil; this means it will block forever if the +// remotecfg service is not configured. In addition, we're keeping track of +// the ticker so we can avoid leaking goroutines. +// The datapath field is where the service looks for the local cache location. +// It is defined as a hash of the Arguments field. +type Service struct { + opts Options + args Arguments + + ctrl service.Controller + + mut sync.RWMutex + asClient agentv1connect.AgentServiceClient + ch <-chan time.Time + ticker *time.Ticker + dataPath string + currentConfigHash string +} + +// ServiceName defines the name used for the remotecfg service. +const ServiceName = "remotecfg" + +// Options are used to configure the remotecfg service. Options are +// constant for the lifetime of the remotecfg service. +type Options struct { + Logger log.Logger // Where to send logs. + StoragePath string // Where to cache configuration on-disk. +} + +// Arguments holds runtime settings for the remotecfg service. +type Arguments struct { + URL string `river:"url,attr,optional"` + ID string `river:"id,attr,optional"` + Metadata map[string]string `river:"metadata,attr,optional"` + PollFrequency time.Duration `river:"poll_frequency,attr,optional"` + HTTPClientConfig *config.HTTPClientConfig `river:",squash"` +} + +// GetDefaultArguments populates the default values for the Arguments struct. +func GetDefaultArguments() Arguments { + return Arguments{ + ID: agentseed.Get().UID, + Metadata: make(map[string]string), + PollFrequency: 1 * time.Minute, + HTTPClientConfig: config.CloneDefaultHTTPClientConfig(), + } +} + +// SetToDefault implements river.Defaulter. +func (a *Arguments) SetToDefault() { + *a = GetDefaultArguments() +} + +// Validate implements river.Validator. +func (a *Arguments) Validate() error { + // We must explicitly Validate because HTTPClientConfig is squashed and it + // won't run otherwise + if a.HTTPClientConfig != nil { + return a.HTTPClientConfig.Validate() + } + + return nil +} + +// Hash marshals the Arguments and returns a hash representation. +func (a *Arguments) Hash() (string, error) { + b, err := river.Marshal(a) + if err != nil { + return "", fmt.Errorf("failed to marshal arguments: %w", err) + } + return getHash(b), nil +} + +// New returns a new instance of the remotecfg service. +func New(opts Options) (*Service, error) { + basePath := filepath.Join(opts.StoragePath, ServiceName) + err := os.MkdirAll(basePath, 0750) + if err != nil { + return nil, err + } + + return &Service{ + opts: opts, + ticker: time.NewTicker(math.MaxInt64), + }, nil +} + +// Data is a no-op for the remotecfg service. +func (s *Service) Data() any { + return nil +} + +// Definition returns the definition of the remotecfg service. +func (s *Service) Definition() service.Definition { + return service.Definition{ + Name: ServiceName, + ConfigType: Arguments{}, + DependsOn: nil, // remotecfg has no dependencies. + } +} + +var _ service.Service = (*Service)(nil) + +// Run implements [service.Service] and starts the remotecfg service. It will +// run until the provided context is canceled or there is a fatal error. +func (s *Service) Run(ctx context.Context, host service.Host) error { + s.ctrl = host.NewController(ServiceName) + + s.fetch() + + // Run the service's own controller. + go func() { + s.ctrl.Run(ctx) + }() + + for { + select { + case <-s.ch: + err := s.fetchRemote() + if err != nil { + level.Error(s.opts.Logger).Log("msg", "failed to fetch remote configuration from the API", "err", err) + } + case <-ctx.Done(): + s.ticker.Stop() + return nil + } + } +} + +// Update implements [service.Service] and applies settings. +func (s *Service) Update(newConfig any) error { + newArgs := newConfig.(Arguments) + + // We either never set the block on the first place, or recently removed + // it. Make sure we stop everything gracefully before returning. + if newArgs.URL == "" { + s.mut.Lock() + s.ch = nil + s.ticker.Reset(math.MaxInt64) + s.asClient = noopClient{} + s.args.HTTPClientConfig = config.CloneDefaultHTTPClientConfig() + s.mut.Unlock() + + s.setCfgHash("") + return nil + } + + s.mut.Lock() + hash, err := newArgs.Hash() + if err != nil { + return err + } + s.dataPath = filepath.Join(s.opts.StoragePath, ServiceName, hash) + s.ticker.Reset(newArgs.PollFrequency) + s.ch = s.ticker.C + // Update the HTTP client last since it might fail. + if !reflect.DeepEqual(s.args.HTTPClientConfig, newArgs.HTTPClientConfig) { + httpClient, err := commonconfig.NewClientFromConfig(*newArgs.HTTPClientConfig.Convert(), "remoteconfig") + if err != nil { + return err + } + s.asClient = agentv1connect.NewAgentServiceClient( + httpClient, + newArgs.URL, + ) + } + s.args = newArgs // Update the args as the last step to avoid polluting any comparisons + s.mut.Unlock() + + // If we've already called Run, then immediately trigger an API call with + // the updated Arguments, and/or fall back to the updated cache location. + if s.ctrl != nil && s.ctrl.Ready() { + s.fetch() + } + + return nil +} + +// fetch attempts to read configuration from the API and the local cache +// and then parse/load their contents in order of preference. +func (s *Service) fetch() { + if err := s.fetchRemote(); err != nil { + s.fetchLocal() + } +} +func (s *Service) fetchRemote() error { + if !s.isEnabled() { + return nil + } + + b, err := s.getAPIConfig() + if err != nil { + return err + } + + // API return the same configuration, no need to reload. + newConfigHash := getHash(b) + if s.getCfgHash() == newConfigHash { + level.Debug(s.opts.Logger).Log("msg", "skipping over API response since it contained the same hash") + return nil + } + + err = s.parseAndLoad(b) + if err != nil { + return err + } + + // If successful, flush to disk and keep a copy. + s.setCachedConfig(b) + s.setCfgHash(newConfigHash) + return nil +} + +func (s *Service) fetchLocal() { + b, err := s.getCachedConfig() + if err != nil { + level.Error(s.opts.Logger).Log("msg", "failed to read from cache", "err", err) + return + } + + err = s.parseAndLoad(b) + if err != nil { + level.Error(s.opts.Logger).Log("msg", "failed to load from cache", "err", err) + } +} + +func (s *Service) getAPIConfig() ([]byte, error) { + s.mut.RLock() + req := connect.NewRequest(&agentv1.GetConfigRequest{ + Id: s.args.ID, + Metadata: s.args.Metadata, + }) + client := s.asClient + s.mut.RUnlock() + + gcr, err := client.GetConfig(context.Background(), req) + if err != nil { + return nil, err + } + + return []byte(gcr.Msg.GetContent()), nil +} + +func (s *Service) getCachedConfig() ([]byte, error) { + s.mut.RLock() + p := s.dataPath + s.mut.RUnlock() + + return os.ReadFile(p) +} + +func (s *Service) setCachedConfig(b []byte) { + s.mut.RLock() + p := s.dataPath + s.mut.RUnlock() + + err := os.WriteFile(p, b, 0750) + if err != nil { + level.Error(s.opts.Logger).Log("msg", "failed to flush remote configuration contents the on-disk cache", "err", err) + } +} + +func (s *Service) parseAndLoad(b []byte) error { + s.mut.RLock() + ctrl := s.ctrl + s.mut.RUnlock() + + if len(b) == 0 { + return nil + } + + err := ctrl.LoadSource(b, nil) + if err != nil { + return err + } + + s.setCfgHash(getHash(b)) + return nil +} + +func (s *Service) getCfgHash() string { + s.mut.RLock() + defer s.mut.RUnlock() + + return s.currentConfigHash +} + +func (s *Service) setCfgHash(h string) { + s.mut.Lock() + defer s.mut.Unlock() + + s.currentConfigHash = h +} + +func (s *Service) isEnabled() bool { + s.mut.RLock() + defer s.mut.RUnlock() + + return s.args.URL != "" && s.asClient != nil +} diff --git a/service/remotecfg/remotecfg_test.go b/service/remotecfg/remotecfg_test.go new file mode 100644 index 000000000000..90313ee6866e --- /dev/null +++ b/service/remotecfg/remotecfg_test.go @@ -0,0 +1,215 @@ +package remotecfg + +import ( + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "connectrpc.com/connect" + agentv1 "github.com/grafana/agent-remote-config/api/gen/proto/go/agent/v1" + "github.com/grafana/agent/component" + _ "github.com/grafana/agent/component/loki/process" + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/flow/logging" + "github.com/grafana/agent/pkg/util" + "github.com/grafana/agent/service" + "github.com/grafana/river" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOnDiskCache(t *testing.T) { + ctx := componenttest.TestContext(t) + url := "https://example.com/" + + // The contents of the on-disk cache. + cacheContents := `loki.process "default" { forward_to = [] }` + cacheHash := getHash([]byte(cacheContents)) + + // Create a new service. + env := newTestEnvironment(t) + require.NoError(t, env.ApplyConfig(fmt.Sprintf(` + url = "%s" + `, url))) + + client := &agentClient{} + env.svc.asClient = client + + // Mock client to return an unparseable response. + client.getConfigFunc = buildGetConfigHandler("unparseable river config") + + // Write the cache contents, and run the service. + err := os.WriteFile(env.svc.dataPath, []byte(cacheContents), 0644) + require.NoError(t, err) + + go func() { + require.NoError(t, env.Run(ctx)) + }() + + // As the API response was unparseable, verify that the service has loaded + // the on-disk cache contents. + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, cacheHash, env.svc.getCfgHash()) + }, time.Second, 10*time.Millisecond) +} + +func TestAPIResponse(t *testing.T) { + ctx := componenttest.TestContext(t) + url := "https://example.com/" + cfg1 := `loki.process "default" { forward_to = [] }` + cfg2 := `loki.process "updated" { forward_to = [] }` + + // Create a new service. + env := newTestEnvironment(t) + require.NoError(t, env.ApplyConfig(fmt.Sprintf(` + url = "%s" + poll_frequency = "10ms" + `, url))) + + client := &agentClient{} + env.svc.asClient = client + + // Mock client to return a valid response. + client.getConfigFunc = buildGetConfigHandler(cfg1) + + // Run the service. + go func() { + require.NoError(t, env.Run(ctx)) + }() + + // As the API response was successful, verify that the service has loaded + // the valid response. + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, getHash([]byte(cfg1)), env.svc.getCfgHash()) + }, time.Second, 10*time.Millisecond) + + // Update the response returned by the API. + env.svc.mut.Lock() + client.getConfigFunc = buildGetConfigHandler(cfg2) + env.svc.mut.Unlock() + + // Verify that the service has loaded the updated response. + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, getHash([]byte(cfg2)), env.svc.getCfgHash()) + }, time.Second, 10*time.Millisecond) +} + +func buildGetConfigHandler(in string) func(context.Context, *connect.Request[agentv1.GetConfigRequest]) (*connect.Response[agentv1.GetConfigResponse], error) { + return func(context.Context, *connect.Request[agentv1.GetConfigRequest]) (*connect.Response[agentv1.GetConfigResponse], error) { + rsp := &connect.Response[agentv1.GetConfigResponse]{ + Msg: &agentv1.GetConfigResponse{ + Content: in, + }, + } + return rsp, nil + } +} + +type testEnvironment struct { + t *testing.T + svc *Service +} + +func newTestEnvironment(t *testing.T) *testEnvironment { + svc, err := New(Options{ + Logger: util.TestLogger(t), + StoragePath: t.TempDir(), + }) + svc.asClient = nil + require.NoError(t, err) + + return &testEnvironment{ + t: t, + svc: svc, + } +} + +func (env *testEnvironment) ApplyConfig(config string) error { + var args Arguments + if err := river.Unmarshal([]byte(config), &args); err != nil { + return err + } + return env.svc.Update(args) +} + +func (env *testEnvironment) Run(ctx context.Context) error { + return env.svc.Run(ctx, fakeHost{}) +} + +type fakeHost struct{} + +var _ service.Host = (fakeHost{}) + +func (fakeHost) GetComponent(id component.ID, opts component.InfoOptions) (*component.Info, error) { + return nil, fmt.Errorf("no such component %s", id) +} + +func (fakeHost) ListComponents(moduleID string, opts component.InfoOptions) ([]*component.Info, error) { + if moduleID == "" { + return nil, nil + } + return nil, fmt.Errorf("no such module %q", moduleID) +} + +func (fakeHost) GetServiceConsumers(serviceName string) []service.Consumer { return nil } + +func (f fakeHost) NewController(id string) service.Controller { + logger, _ := logging.New(io.Discard, logging.DefaultOptions) + ctrl := flow.New(flow.Options{ + ControllerID: ServiceName, + Logger: logger, + Tracer: nil, + DataPath: "", + Reg: prometheus.NewRegistry(), + OnExportsChange: func(map[string]interface{}) {}, + Services: []service.Service{}, + }) + + return serviceController{ctrl} +} + +type agentClient struct { + getConfigFunc func(context.Context, *connect.Request[agentv1.GetConfigRequest]) (*connect.Response[agentv1.GetConfigResponse], error) +} + +func (ag agentClient) GetConfig(ctx context.Context, req *connect.Request[agentv1.GetConfigRequest]) (*connect.Response[agentv1.GetConfigResponse], error) { + if ag.getConfigFunc != nil { + return ag.getConfigFunc(ctx, req) + } + + panic("getConfigFunc not set") +} +func (ag agentClient) GetAgent(context.Context, *connect.Request[agentv1.GetAgentRequest]) (*connect.Response[agentv1.Agent], error) { + return nil, nil +} +func (ag agentClient) CreateAgent(context.Context, *connect.Request[agentv1.CreateAgentRequest]) (*connect.Response[agentv1.Agent], error) { + return nil, nil +} +func (ag agentClient) UpdateAgent(context.Context, *connect.Request[agentv1.UpdateAgentRequest]) (*connect.Response[agentv1.Agent], error) { + return nil, nil +} +func (ag agentClient) DeleteAgent(context.Context, *connect.Request[agentv1.DeleteAgentRequest]) (*connect.Response[agentv1.DeleteAgentResponse], error) { + return nil, nil +} +func (ag agentClient) ListAgents(context.Context, *connect.Request[agentv1.ListAgentsRequest]) (*connect.Response[agentv1.Agents], error) { + return nil, nil +} + +type serviceController struct { + f *flow.Flow +} + +func (sc serviceController) Run(ctx context.Context) { sc.f.Run(ctx) } +func (sc serviceController) LoadSource(b []byte, args map[string]any) error { + source, err := flow.ParseSource("", b) + if err != nil { + return err + } + return sc.f.LoadSource(source, args) +} +func (sc serviceController) Ready() bool { return sc.f.Ready() } diff --git a/service/service.go b/service/service.go index 564bb23b86b9..abbb6e21ac40 100644 --- a/service/service.go +++ b/service/service.go @@ -54,6 +54,17 @@ type Host interface { // GetServiceConsumers gets the list of services which depend on a service by // name. GetServiceConsumers(serviceName string) []Consumer + + // NewController returns an unstarted, isolated Controller that a Service + // can use to instantiate its own components. + NewController(id string) Controller +} + +// Controller is implemented by flow.Flow. +type Controller interface { + Run(ctx context.Context) + LoadSource(source []byte, args map[string]any) error + Ready() bool } type Consumer struct {