Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: add evaluation coverage collection support (
Browse files Browse the repository at this point in the history
…#41884) (#41949)

This allows collecting the execution coverage data from a running input.
Coverage data is logged as JSON data at debug level when this option is
enabled. Currently no tooling is available for rendering the data in a
more human-friendly format, but the backing infrastructure for this
exists in the mito repository.

(cherry picked from commit db21309)

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
mergify[bot] and efd6 authored Dec 8, 2024
1 parent 43aaf04 commit 424e2b5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add field redaction package. {pull}40997[40997]
- Add support for marked redaction to x-pack/filebeat/input/internal/private {pull}41212[41212]
- Add support for collecting Okta role and factor data for users with filebeat entityanalytics input. {pull}41044[41044]
- Add CEL input program evaluation coverage collection support. {pull}41884[41884]

==== Deprecated

Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,13 @@ dump is configured, it is recommended that data input sizes be reduced to avoid
making dumps that are intractable to analysis. To delete existing failure dumps, set `failure_dump.enabled` to
false without unsetting the filename option.

[[cel-record-coverage]]
[float]
==== `record_coverage`

This specifies that CEL code evaluation coverage should be recorded and logged in debug logs. This is a
developer-only option.

[float]
=== Metrics

Expand Down
12 changes: 10 additions & 2 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type config struct {

// FailureDump configures failure dump behaviour.
FailureDump *dumpConfig `config:"failure_dump"`

// RecordCoverage indicates whether a program should
// record and log execution coverage.
RecordCoverage bool `config:"record_coverage"`
}

type redact struct {
Expand All @@ -86,9 +90,13 @@ func (t *dumpConfig) enabled() bool {
}

func (c config) Validate() error {
if c.RecordCoverage {
logp.L().Named("input.cel").Warn("execution coverage enabled: " +
"see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-cel.html#cel-record-coverage")
}
if c.Redact == nil {
logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " +
"see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-cel.html#_redact")
"see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-cel.html#cel-state-redact")
}
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
Expand All @@ -106,7 +114,7 @@ func (c config) Validate() error {
patterns = map[string]*regexp.Regexp{".": nil}
}
wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != ""
_, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump)
_, _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump, false)
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
Expand Down
32 changes: 24 additions & 8 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
}
}
wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump)
doCov := cfg.RecordCoverage && log.IsDebug()
prg, ast, cov, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump, doCov)
if err != nil {
return err
}
Expand Down Expand Up @@ -228,6 +229,14 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
)
// Keep track of whether CEL is degraded for this periodic run.
var isDegraded bool
if doCov {
defer func() {
// If doCov is true, log the updated coverage details.
// Updates are a running aggregate for each call to run
// as cov is shared via the program compilation.
log.Debugw("coverage", "details", cov.Details())
}()
}
for {
if wait := time.Until(waitUntil); wait > 0 {
// We have a special-case wait for when we have a zero limit.
Expand Down Expand Up @@ -1039,10 +1048,10 @@ func getEnv(allowed []string) map[string]string {
return env
}

func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details bool) (cel.Program, *cel.Ast, error) {
func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details, coverage bool) (cel.Program, *cel.Ast, *lib.Coverage, error) {
xml, err := lib.XML(nil, xsd)
if err != nil {
return nil, nil, fmt.Errorf("failed to build xml type hints: %w", err)
return nil, nil, nil, fmt.Errorf("failed to build xml type hints: %w", err)
}
opts := []cel.EnvOption{
cel.Declarations(decls.NewVar(root, decls.Dyn)),
Expand Down Expand Up @@ -1070,23 +1079,30 @@ func newProgram(ctx context.Context, src, root string, vars map[string]string, c
}
env, err := cel.NewEnv(opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create env: %w", err)
return nil, nil, nil, fmt.Errorf("failed to create env: %w", err)
}

ast, iss := env.Compile(src)
if iss.Err() != nil {
return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err())
return nil, nil, nil, fmt.Errorf("failed compilation: %w", iss.Err())
}

var progOpts []cel.ProgramOption
var (
progOpts []cel.ProgramOption
cov *lib.Coverage
)
if coverage {
cov = lib.NewCoverage(ast)
progOpts = []cel.ProgramOption{cov.ProgramOption()}
}
if details {
progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)}
}
prg, err := env.Program(ast, progOpts...)
if err != nil {
return nil, nil, fmt.Errorf("failed program instantiation: %w", err)
return nil, nil, nil, fmt.Errorf("failed program instantiation: %w", err)
}
return prg, ast, nil
return prg, ast, cov, nil
}

func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, any) {
Expand Down
43 changes: 43 additions & 0 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,6 +1769,49 @@ var inputTests = []struct {
},
},

// Coverage
{
name: "coverage",
config: map[string]interface{}{
"interval": 1,
"program": `int(state.n).as(n, {
"events": [{"n": n+1}],
"n": n+1,
"want_more": n+1 < 5,
"probe": n < 2 ?
"little"
:
"big",
"fail_probe": n < 0 ?
"negative"
:
"non-negative",
})`,
"record_coverage": true,
"state": map[string]any{"n": 0},
"resource": map[string]interface{}{
"url": "",
},
},
time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
// The program will be evaluated five times in the first periodic
// run and then once for all subsequent runs. We depend here on
// the test construction that asks that we get at least as many
// results from the input as there are elements in the want slice
// and then stop.
want: []map[string]interface{}{
// First periodic run.
{"n": float64(1)},
{"n": float64(2)},
{"n": float64(3)},
{"n": float64(4)},
{"n": float64(5)},
// Second and subsequent periodic runs.
{"n": float64(6)},
{"n": float64(7)},
},
},

// not yet done from httpjson (some are redundant since they are compositional products).
//
// cursor/pagination (place above auth test block)
Expand Down

0 comments on commit 424e2b5

Please sign in to comment.