Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add integration tests #503

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
code.cloudfoundry.org/go-log-cache/v3 v3.0.3
code.cloudfoundry.org/go-loggregator/v10 v10.0.1
github.com/go-chi/chi/v5 v5.1.0
github.com/google/go-cmp v0.6.0
github.com/shirou/gopsutil/v4 v4.24.8
)

Expand All @@ -52,7 +53,6 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect
Expand Down
2 changes: 2 additions & 0 deletions src/integration/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package integration contains integration tests for Log Cache components.
package integration
179 changes: 179 additions & 0 deletions src/integration/gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package integration_test

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os/exec"
"time"

logcache "code.cloudfoundry.org/go-log-cache/v3/rpc/logcache_v1"
"github.com/google/go-cmp/cmp/cmpopts"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"

"code.cloudfoundry.org/log-cache/integration/integrationfakes"
)

var _ = Describe("Gateway", func() {
var (
fakeLogCache *integrationfakes.FakeLogCache

gatewayPort int
gateway *gexec.Session
)

BeforeEach(func() {
port := 8000 + GinkgoParallelProcess()
fakeLogCache = integrationfakes.NewFakeLogCache(port, nil)
fakeLogCache.Start()

gatewayPort = 8080 + GinkgoParallelProcess()
})

JustBeforeEach(func() {
command := exec.Command(componentPaths.Gateway)
envVars := map[string]string{
"ADDR": fmt.Sprintf(":%d", gatewayPort),
"LOG_CACHE_ADDR": fakeLogCache.Address(),
"METRICS_PORT": "0",
}
for k, v := range envVars {
command.Env = append(command.Env, fmt.Sprintf("%s=%s", k, v))
}

var err error
gateway, err = gexec.Start(command, GinkgoWriter, GinkgoWriter)
Expect(err).ShouldNot(HaveOccurred())
})

JustAfterEach(func() {
gateway.Interrupt().Wait(2 * time.Second)
})

AfterEach(func() {
fakeLogCache.Stop()
})

Context("/api/v1/info endpoint", func() {
var resp *http.Response

JustBeforeEach(func() {
u := fmt.Sprintf("http://localhost:%d/api/v1/info", gatewayPort)
Eventually(func() error {
var err error
resp, err = http.Get(u)
return err
}, "5s").ShouldNot(HaveOccurred())
})

AfterEach(func() {
resp.Body.Close()
})

It("returns 200", func() {
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})

It("sets Content-Type header", func() {
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
})

It("sets Content-Length header", func() {
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
})

Context("response body", func() {
var body []byte

JustBeforeEach(func() {
var err error
body, err = io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())
})

It("is a JSON with version and uptime information", func() {
result := struct {
Version string `json:"version"`
VMUptime string `json:"vm_uptime"`
}{}
err := json.Unmarshal(body, &result)
Expect(err).ToNot(HaveOccurred())
Expect(result.Version).To(Equal("1.2.3"))
Expect(result.VMUptime).To(MatchRegexp("\\d+"))
})

It("has a newline at the end", func() {
Expect(string(body)).To(MatchRegexp(".*\\n$"))
})
})
})

Context("api/v1/read/:sourceID endpoint", func() {
DescribeTableSubtree("with valid source IDs",
func(sourceID string) {
var resp *http.Response

JustBeforeEach(func() {
u := fmt.Sprintf("http://localhost:%d/api/v1/read/%s", gatewayPort, sourceID)
Eventually(func() error {
var err error
resp, err = http.Get(u)
return err
}, "5s").ShouldNot(HaveOccurred())
})

AfterEach(func() {
resp.Body.Close()
})

It("returns 200", func() {
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})

It("sets Content-Type header", func() {
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
})

It("sets Content-Length header", func() {
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
})

It("forwards the request to Log Cache", func() {
reqs := fakeLogCache.ReadRequests()
Expect(len(reqs)).To(Equal(1))
Expect(reqs[0]).To(BeComparableTo(&logcache.ReadRequest{
SourceId: sourceID,
}, cmpopts.IgnoreUnexported(logcache.ReadRequest{})))
})

Context("response body", func() {
var body []byte

JustBeforeEach(func() {
var err error
body, err = io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())
})

PIt("is a JSON with envelopes", func() {
var rr logcache.ReadResponse
err := json.Unmarshal(body, &rr)
Expect(err).ToNot(HaveOccurred())
Expect(rr.Envelopes).To(HaveLen(0))
})

It("has a newline at the end", func() {
Expect(string(body)).To(MatchRegexp(".*\\n$"))
})
})
},
Entry("regular", "myid"),
Entry("URL encoded", "my%2Fid"),
Entry("with slash", "my/id"),
Entry("with dash", "my-id"),
)
})
})
63 changes: 63 additions & 0 deletions src/integration/integration_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package integration_test

import (
"encoding/json"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
)

type ComponentPaths struct {
Gateway string `json:"gateway_path"`
CFAuthProxy string `json:"cf_auth_proxy_path"`
LogCache string `json:"log_cache_path"`
SyslogServer string `json:"syslog_server_path"`
}

func NewComponentPaths() ComponentPaths {
cps := ComponentPaths{}

path, err := gexec.Build("code.cloudfoundry.org/log-cache/cmd/gateway", "-ldflags", "-X main.buildVersion=1.2.3")
Expect(err).NotTo(HaveOccurred())
cps.Gateway = path

path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/cf-auth-proxy")
Expect(err).NotTo(HaveOccurred())
cps.CFAuthProxy = path

path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/log-cache")
Expect(err).NotTo(HaveOccurred())
cps.LogCache = path

path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/syslog-server")
Expect(err).NotTo(HaveOccurred())
cps.SyslogServer = path

return cps
}

func (cps *ComponentPaths) Marshal() []byte {
data, err := json.Marshal(cps)
Expect(err).NotTo(HaveOccurred())
return data
}

func TestIntegration(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Integration Suite")
}

var componentPaths ComponentPaths

var _ = SynchronizedBeforeSuite(func() []byte {
cps := NewComponentPaths()
return cps.Marshal()
}, func(data []byte) {
Expect(json.Unmarshal(data, &componentPaths)).To(Succeed())
})

var _ = SynchronizedAfterSuite(func() {}, func() {
gexec.CleanupBuildArtifacts()
})
98 changes: 98 additions & 0 deletions src/integration/integrationfakes/fakelogcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package integrationfakes

import (
"context"
"crypto/tls"
"fmt"
"net"
"sync"

logcache "code.cloudfoundry.org/go-log-cache/v3/rpc/logcache_v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// FakeLogCache is a fake implementation of log-cache.
type FakeLogCache struct {
port int // Port to listen on.
addr string // Address of the net listener.
c *tls.Config // TLS config to apply to gRPC; no TLS if nil.
s *grpc.Server // gRPC server responding to Log Cache gRPC requests.
serveCh chan error // Channel to catch errors when the serve errors from the gRPC server.

readMu sync.Mutex // Mutex to prevent race conditions with FakeLogCache.Read().
readRequests []*logcache.ReadRequest // Slice of requests made to FakeLogCache.Read().
readResponse *logcache.ReadResponse
readErr error

logcache.UnimplementedEgressServer
logcache.UnimplementedIngressServer
logcache.UnimplementedPromQLQuerierServer
}

// NewFakeLogCache creates a new instance of FakeLogCache with the specified
// port and TLS configuration.
func NewFakeLogCache(port int, c *tls.Config) *FakeLogCache {
return &FakeLogCache{
port: port,
c: c,
serveCh: make(chan error),
}
}

// Start attempts to claim a net listener on FakeLogCache's port and
// start a log-cache gRPC server in a separate goroutine. The server uses
// FakeLogCache's TLS config if it was provided. This is a non-blocking
// operation and returns an error if it fails.
//
// If FakeLogCache is started, it must be stopped with Stop().
func (f *FakeLogCache) Start() error {
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", f.port))
if err != nil {
return err
}
f.addr = lis.Addr().String()

var opts []grpc.ServerOption
if f.c != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(f.c)))
}
f.s = grpc.NewServer(opts...)

logcache.RegisterEgressServer(f.s, f)
logcache.RegisterIngressServer(f.s, f)
logcache.RegisterPromQLQuerierServer(f.s, f)

go func() {
f.serveCh <- f.s.Serve(lis)
}()

return nil
}

// Address returns the address of the FakeLogCache.
func (f *FakeLogCache) Address() string {
return f.addr
}

// Read accepts incoming gRPC requests to read from Log Cache, captures the
// requests and returns a fake response.
func (f *FakeLogCache) Read(ctx context.Context, req *logcache.ReadRequest) (*logcache.ReadResponse, error) {
fmt.Printf("Read: %#v\n", req)
f.readMu.Lock()
defer f.readMu.Unlock()
f.readRequests = append(f.readRequests, req)
return f.readResponse, f.readErr
}

func (f *FakeLogCache) ReadRequests() []*logcache.ReadRequest {
f.readMu.Lock()
defer f.readMu.Unlock()
return f.readRequests
}

// Stop tells the FakeLogCache server to stop and waits for it to shutdown.
func (f *FakeLogCache) Stop() error {
f.s.Stop()
return <-f.serveCh
}
12 changes: 10 additions & 2 deletions src/internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"net/http"
"strconv"
"time"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
Expand Down Expand Up @@ -179,14 +180,21 @@ func (g *Gateway) listenAndServe() {
}

func (g *Gateway) handleInfoEndpoint(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn())))
b := []byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn()))

w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
_, err := w.Write(b)
if err != nil {
g.log.Println("Cannot send result for the info endpoint")
}
}

func uptimeInSeconds() int64 {
hostStats, _ := host.Info()
hostStats, err := host.Info()
if err != nil {
return -1
}
return int64(hostStats.Uptime) //#nosec G115
}

Expand Down
Loading
Loading