diff --git a/pkg/builder/vector.go b/pkg/builder/vector.go index d084af6..bf7f16f 100644 --- a/pkg/builder/vector.go +++ b/pkg/builder/vector.go @@ -91,16 +91,20 @@ func MakeVectorYaml( role string, groupName string, vectorAggregatorDiscovery string) (string, error) { + vectorAggregatorDiscoveryURI := vectorAggregatorDiscoveryURI(ctx, client, namespace, vectorAggregatorDiscovery) data := map[string]interface{}{ "LogDir": LogDir, "Namespace": namespace, "Cluster": cluster, "Role": role, "GroupName": groupName, - "VectorAggregatorAddress": vectorAggregatorDiscoveryURI(ctx, client, namespace, vectorAggregatorDiscovery), + "VectorAggregatorAddress": vectorAggregatorDiscoveryURI, } - var tmpl = ` -api: + return ParseVectorYaml(data) +} + +func ParseVectorYaml(data map[string]interface{}) (string, error) { + var tmpl = `api: enabled: true data_dir: /zncdata/vector/var log_schema: @@ -110,7 +114,6 @@ sources: type: "file" include: - "{{.LogDir}}/*/*.airlift.json" - transforms: processed_files_airlift: inputs: @@ -124,27 +127,27 @@ transforms: .level = parsed_event.level .thread = parsed_event.thread extended_logs_files: - inputs: - - processed_files_* - type: remap - source: | - . |= parse_regex!(.file, r'^/zncdata/log/(?P.*?)/(?P.*?)$') - del(.source_type) + inputs: + - processed_files_* + type: remap + source: | + . |= parse_regex!(.file, r'^/zncdata/log/(?P.*?)/(?P.*?)$') + del(.source_type) extended_logs: - inputs: - - extended_logs_* - type: remap - source: | - .namespace = "{{.Namespace}}" - .cluster = "{{.Cluster}}" - .role = "{{.Role}}" - .roleGroup = "{{.GroupName}}" + inputs: + - extended_logs_* + type: remap + source: | + .namespace = "{{.Namespace}}" + .cluster = "{{.Cluster}}" + .role = "{{.Role}}" + .roleGroup = "{{.GroupName}}" sinks: aggregator: - inputs: - - extended_logs - type: vector - address: "{{.VectorAggregatorAddress}}" + inputs: + - extended_logs + type: vector + address: "{{.VectorAggregatorAddress}}" ` parser := config.TemplateParser{ Value: data, diff --git a/pkg/builder/vector_test.go b/pkg/builder/vector_test.go index 1bda257..4941371 100644 --- a/pkg/builder/vector_test.go +++ b/pkg/builder/vector_test.go @@ -66,3 +66,61 @@ mkdir -p /zncdata/log/_vector && touch /zncdata/log/_vector/shutdown assert.NoError(t, err) assert.Equal(t, expectedArgs, args) } + +func TestVectorYamlFormatter(t *testing.T) { + actualYaml, err := builder.ParseVectorYaml(map[string]interface{}{ + "LogDir": "zncdata/log", + "Namespace": "default", + "Cluster": "simple-trino", + "Role": "coordinator", + "GroupName": "default", + "VectorAggregatorAddress": "localhost:8080", + }) + expectYaml := `api: + enabled: true +data_dir: /zncdata/vector/var +log_schema: + host_key: "pod" +sources: + files_airlift: + type: "file" + include: + - "zncdata/log/*/*.airlift.json" +transforms: + processed_files_airlift: + inputs: + - files_airlift + type: remap + source: | + parsed_event = parse_json!(string!(.message)) + .message = join!(compact([parsed_event.message, parsed_event.stackTrace]), "\n") + .timestamp = parse_timestamp!(parsed_event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + .logger = parsed_event.logger + .level = parsed_event.level + .thread = parsed_event.thread + extended_logs_files: + inputs: + - processed_files_* + type: remap + source: | + . |= parse_regex!(.file, r'^/zncdata/log/(?P.*?)/(?P.*?)$') + del(.source_type) + extended_logs: + inputs: + - extended_logs_* + type: remap + source: | + .namespace = "default" + .cluster = "simple-trino" + .role = "coordinator" + .roleGroup = "default" +sinks: + aggregator: + inputs: + - extended_logs + type: vector + address: "localhost:8080" +` + assert.Equal(t, expectYaml, actualYaml) + assert.NoError(t, err) +}