Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Jun 21, 2024
1 parent caf7ac2 commit 0f501fb
Show file tree
Hide file tree
Showing 25 changed files with 1,069 additions and 430 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/flowlogs-pipeline
/confgenerator
/k8s-cache
/bin/
cover.out
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ IMAGE_TAG_BASE ?= quay.io/$(IMAGE_ORG)/flowlogs-pipeline

# Image URL to use all building/pushing image targets
IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)
IMAGE_CACHE ?= $(IMAGE_TAG_BASE)-cache:$(VERSION)
OCI_BUILD_OPTS ?=

# Image building tool (docker / podman) - docker is preferred in CI
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null)

MIN_GO_VERSION := 1.20.0
FLP_BIN_FILE=flowlogs-pipeline
CG_BIN_FILE=confgenerator
K8S_CACHE_BIN_FILE=k8s-cache
NETFLOW_GENERATOR=nflow-generator
CMD_DIR=./cmd/
FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml
Expand All @@ -61,18 +63,21 @@ FORCE: ;
define build_target
echo 'building image for arch $(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE}-$(1) -f contrib/docker/Dockerfile .;
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE_CACHE}-$(1) -f contrib/docker/cache.Dockerfile .;
endef

# push a single arch target image
define push_target
echo 'pushing image ${IMAGE}-$(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE}-$(1);
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE_CACHE}-$(1);
endef

# manifest create a single arch target provided as argument
define manifest_add_target
echo 'manifest add target $(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE} ${IMAGE}-$(1);
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE_CACHE} ${IMAGE_CACHE}-$(1);
endef

##@ General
Expand Down Expand Up @@ -114,8 +119,12 @@ build_code:
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${FLP_BIN_FILE}"
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${CG_BIN_FILE}"

.PHONY: build_k8s_cache
build_k8s_cache:
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${K8S_CACHE_BIN_FILE}"

.PHONY: build
build: validate_go lint build_code docs ## Build flowlogs-pipeline executable and update the docs
build: validate_go lint build_code build_k8s_cache docs ## Build flowlogs-pipeline executables and update the docs

.PHONY: docs
docs: FORCE ## Update flowlogs-pipeline documentation
Expand Down Expand Up @@ -187,16 +196,20 @@ image-push: ## Push MULTIARCH_TARGETS images
.PHONY: manifest-build
manifest-build: ## Build MULTIARCH_TARGETS manifest
@echo 'building manifest $(IMAGE)'
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f || true
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE_CACHE} -f || true
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE}-$(target));
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE_CACHE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE_CACHE}-$(target));

.PHONY: manifest-push
manifest-push: ## Push MULTIARCH_TARGETS manifest
@echo 'publish manifest $(IMAGE)'
ifeq (${OCI_BIN}, docker)
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE};
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE};
else
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE} docker://${IMAGE};
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE} docker://${IMAGE_CACHE};
endif

include .mk/development.mk
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ General
Develop
lint Lint the code
build Build flowlogs-pipeline executable and update the docs
build Build flowlogs-pipeline executables and update the docs
docs Update flowlogs-pipeline documentation
clean Clean
tests-unit Unit tests
Expand Down
78 changes: 78 additions & 0 deletions cmd/k8s-cache/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"flag"
"fmt"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

var (
buildVersion = "unknown"
buildDate = "unknown"
app = "flp-cache"
configPath = flag.String("config", "", "path to a config file")
versionFlag = flag.Bool("v", false, "print version")
log = logrus.WithField("module", "main")
)

type Config struct {
KubeConfigPath string `yaml:"kubeConfigPath"`
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
PProfPort int32 `yaml:"pprofPort"` // TODO: manage pprof
LogLevel string `yaml:"logLevel"`
}

func main() {
flag.Parse()

appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate)
if *versionFlag {
fmt.Println(appVersion)
os.Exit(0)
}

cfg, err := readConfig(*configPath)
if err != nil {
log.WithError(err).Fatal("error reading config file")
}

lvl, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
log.Errorf("Log level %s not recognized, using info", cfg.LogLevel)
lvl = logrus.InfoLevel
}
logrus.SetLevel(lvl)
log.Infof("Starting %s at log level %s", appVersion, lvl)
log.Infof("Configuration: %#v", cfg)

err = kubernetes.InitInformerDatasource(cfg.KubeConfigPath, &cfg.KafkaConfig)
if err != nil {
log.WithError(err).Fatal("error initializing Kubernetes & informers")
}

stopCh := utils.SetupElegantExit()
<-stopCh
}

func readConfig(path string) (*Config, error) {
var cfg Config
if len(path) == 0 {
return &cfg, nil
}
yamlFile, err := os.ReadFile(path)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(yamlFile, &cfg)
if err != nil {
return nil, err
}

return &cfg, err
}
72 changes: 72 additions & 0 deletions cmd/k8s-cache/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2021 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package main

import (
"encoding/json"
"errors"
"os"
"os/exec"
"testing"

"github.com/stretchr/testify/require"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
)

func TestTheMain(t *testing.T) {
if os.Getenv("BE_CRASHER") == "1" {
main()
return
}
cmd := exec.Command(os.Args[0], "-test.run=TestTheMain")
cmd.Env = append(os.Environ(), "BE_CRASHER=1")
err := cmd.Run()
var castErr *exec.ExitError
if errors.As(err, &castErr) && !castErr.Success() {
return
}
t.Fatalf("process ran with err %v, want exit status 1", err)
}

func TestPipelineConfigSetup(t *testing.T) {
// Kube init mock
kubernetes.MockInformers()

js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Health": {
"Port": "8080"
},
"Profile": {
"Port": 0
}
}`
var opts config.Options
err := json.Unmarshal([]byte(js), &opts)
require.NoError(t, err)
cfg, err := config.ParseConfig(&opts)
require.NoError(t, err)
require.NotNil(t, cfg)
mainPipeline, err := pipeline.NewPipeline(&cfg)
require.NoError(t, err)
require.NotNil(t, mainPipeline)
}
29 changes: 29 additions & 0 deletions contrib/docker/cache.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# We do not use --platform feature to auto fill this ARG because of incompatibility between podman and docker
ARG TARGETPLATFORM=linux/amd64
ARG BUILDPLATFORM=linux/amd64
FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder

ARG TARGETPLATFORM
ARG TARGETARCH=amd64
WORKDIR /app

# Copy source code
COPY go.mod .
COPY go.sum .
COPY Makefile .
COPY .mk/ .mk/
COPY .bingo/ .bingo/
COPY vendor/ vendor/
COPY .git/ .git/
COPY cmd/ cmd/
COPY pkg/ pkg/

RUN git status --porcelain
RUN GOARCH=$TARGETARCH make build_k8s_cache

# final stage
FROM --platform=$TARGETPLATFORM registry.access.redhat.com/ubi9/ubi-minimal:9.4

COPY --from=builder /app/k8s-cache /app/

ENTRYPOINT ["/app/k8s-cache"]
26 changes: 26 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ Following is the supported API format for network transformations:
output: entry output field
protocol: entry protocol field
kubeConfigPath: path to kubeconfig file (optional)
kafkaCacheConfig: Kafka config for informers cache (optional)
brokers: list of kafka broker addresses
topic: kafka topic to listen on
groupid: separate groupid for each consumer on specified topic
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchReadTimeout: how often (in milliseconds) to process input
decoder: decoder to use (E.g. json or protobuf)
type: (enum) one of the following:
json: JSON decoder
protobuf: Protobuf decoder
batchMaxLen: the number of accumulated flows before being forwarded for processing
pullQueueCapacity: the capacity of the queue use to store pulled flows
pullMaxBytes: the maximum number of bytes being pulled from kafka
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
tls: TLS client configuration (optional)
insecureSkipVerify: skip client verifying the server's certificate chain and host name
caCertPath: path to the CA certificate
userCertPath: path to the user certificate
userKeyPath: path to the user private key
sasl: SASL configuration (optional)
type: SASL type
plain: Plain SASL
scramSHA512: SCRAM/SHA512 SASL
clientIDPath: path to the client ID / SASL username
clientSecretPath: path to the client secret / SASL password
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package api

type TransformNetwork struct {
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
Expand Down
Loading

0 comments on commit 0f501fb

Please sign in to comment.