Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ctlong committed Sep 20, 2024
1 parent e04800e commit 1371eef
Show file tree
Hide file tree
Showing 13 changed files with 1,451 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/integration/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package integration contains integration tests for Log Cache components.
package integration
170 changes: 170 additions & 0 deletions src/integration/gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package integration_test

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os/exec"
"time"

logcache "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
"code.cloudfoundry.org/log-cache/integration/integrationfakes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
)

var _ = Describe("Gateway", func() {
var (
fakeLogCache *integrationfakes.FakeLogCache

gatewayPort int
gateway *gexec.Session
)

BeforeEach(func() {
port := 8000 + GinkgoParallelProcess()
fakeLogCache = integrationfakes.NewFakeLogCache(port, nil)

gatewayPort = 8081 + GinkgoParallelProcess()
})

JustBeforeEach(func() {
fakeLogCache.Start()

command := exec.Command(componentPaths.Gateway)
envVars := map[string]string{
"ADDR": fmt.Sprintf(":%d", gatewayPort),
"LOG_CACHE_ADDR": fakeLogCache.Address(),
"METRICS_PORT": "0",
}
for k, v := range envVars {
command.Env = append(command.Env, fmt.Sprintf("%s=%s", k, v))
}

var err error
gateway, err = gexec.Start(command, GinkgoWriter, GinkgoWriter)
Expect(err).ShouldNot(HaveOccurred())
})

JustAfterEach(func() {
gateway.Interrupt().Wait(2 * time.Second)
fakeLogCache.Stop()
})

Context("/api/v1/info endpoint", func() {
var resp *http.Response

JustBeforeEach(func() {
u := fmt.Sprintf("http://localhost:%d/api/v1/info", gatewayPort)
Eventually(func() error {
var err error
resp, err = http.Get(u)
return err
}, "5s").ShouldNot(HaveOccurred())
})

AfterEach(func() {
resp.Body.Close()
})

It("returns 200", func() {
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})

It("sets Content-Type header", func() {
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
})

It("sets Content-Length header", func() {
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
})

Context("response body", func() {
var body []byte

JustBeforeEach(func() {
var err error
body, err = io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())
})

It("is a JSON with version and uptime information", func() {
result := struct {
Version string `json:"version"`
VMUptime string `json:"vm_uptime"`
}{}
err := json.Unmarshal(body, &result)
Expect(err).ToNot(HaveOccurred())
Expect(result.Version).To(Equal("1.2.3"))
Expect(result.VMUptime).To(MatchRegexp("\\d+"))
})

It("has a newline at the end", func() {
Expect(string(body)).To(MatchRegexp(".*\\n$"))
})
})
})

Context("api/v1/read/:sourceID endpoint", func() {
DescribeTableSubtree("with valid source IDs", func(sourceID string) {
var resp *http.Response

JustBeforeEach(func() {
u := fmt.Sprintf("http://localhost:%d/api/v1/read/%s", gatewayPort, sourceID)
Eventually(func() error {
var err error
resp, err = http.Get(u)
return err
}, "5s").ShouldNot(HaveOccurred())
})

AfterEach(func() {
resp.Body.Close()
})

It("returns 200", func() {
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})

It("sets Content-Type header", func() {
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
})

It("sets Content-Length header", func() {
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
})

PIt("forwards the request to Log Cache", func() {
// Expect(fakeLogCache.requestCount).To(Equal(1))
})

Context("response body", func() {
var body []byte

JustBeforeEach(func() {
var err error
body, err = io.ReadAll(resp.Body)
Expect(err).ToNot(HaveOccurred())
})

PIt("is a JSON with envelopes", func() {
var rr logcache.ReadResponse
err := json.Unmarshal(body, &rr)
Expect(err).ToNot(HaveOccurred())
Expect(rr.Envelopes).To(HaveLen(0))
})

It("has a newline at the end", func() {
Expect(string(body)).To(MatchRegexp(".*\\n$"))
})
})
},
Entry("regular", "myid"),
Entry("URL encoded", "my%2Fid"),
Entry("with slash", "my/id"),
Entry("with dash", "my-id"),
)
})
})
63 changes: 63 additions & 0 deletions src/integration/integration_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package integration_test

import (
"encoding/json"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
)

type ComponentPaths struct {
Gateway string `json:"gateway_path"`
CFAuthProxy string `json:"cf_auth_proxy_path"`
LogCache string `json:"log_cache_path"`
SyslogServer string `json:"syslog_server_path"`
}

func NewComponentPaths() ComponentPaths {
cps := ComponentPaths{}

path, err := gexec.Build("code.cloudfoundry.org/log-cache/cmd/gateway", "-ldflags", "-X main.buildVersion=1.2.3")
Expect(err).NotTo(HaveOccurred())
cps.Gateway = path

path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/cf-auth-proxy")
Expect(err).NotTo(HaveOccurred())
cps.CFAuthProxy = path

path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/log-cache")
Expect(err).NotTo(HaveOccurred())
cps.LogCache = path

path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/syslog-server")
Expect(err).NotTo(HaveOccurred())
cps.SyslogServer = path

return cps
}

func (cps *ComponentPaths) Marshal() []byte {
data, err := json.Marshal(cps)
Expect(err).NotTo(HaveOccurred())
return data
}

func TestIntegration(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Integration Suite")
}

var componentPaths ComponentPaths

var _ = SynchronizedBeforeSuite(func() []byte {
cps := NewComponentPaths()
return cps.Marshal()
}, func(data []byte) {
Expect(json.Unmarshal(data, &componentPaths)).To(Succeed())
})

var _ = SynchronizedAfterSuite(func() {}, func() {
gexec.CleanupBuildArtifacts()
})
81 changes: 81 additions & 0 deletions src/integration/integrationfakes/fakelogcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package integrationfakes

import (
"context"
"crypto/tls"
"fmt"
"net"

logcache "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// FakeLogCache is a fake implementation of log-cache.
type FakeLogCache struct {
port int
addr string
c *tls.Config
s *grpc.Server

serveCh chan error

logcache.UnimplementedEgressServer
logcache.UnimplementedIngressServer
logcache.UnimplementedPromQLQuerierServer
}

// NewFakeLogCache creates a new instance of FakeLogCache with the specified
// port and TLS configuration.
func NewFakeLogCache(port int, c *tls.Config) *FakeLogCache {
return &FakeLogCache{
port: port,
c: c,
serveCh: make(chan error),
}
}

// Start attempts to claim a net listener on FakeLogCache's port and
// start a log-cache gRPC server in a separate goroutine. The server uses
// FakeLogCache's TLS config if it was provided. This is a non-blocking
// operation and returns an error if it fails.
//
// If FakeLogCache is started, it must be stopped with Stop().
func (f *FakeLogCache) Start() error {
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", f.port))
if err != nil {
return err
}
f.addr = lis.Addr().String()

f.s = grpc.NewServer()
if f.c != nil {
f.s = grpc.NewServer(grpc.Creds(credentials.NewTLS(f.c)))
}

logcache.RegisterEgressServer(f.s, f)
logcache.RegisterIngressServer(f.s, f)
logcache.RegisterPromQLQuerierServer(f.s, f)

go func() {
f.serveCh <- f.s.Serve(lis)
}()

return nil
}

// Address returns the address of the FakeLogCache.
func (f *FakeLogCache) Address() string {
return f.addr
}

func (f *FakeLogCache) Read(ctx context.Context, req *logcache.ReadRequest) (*logcache.ReadResponse, error) {
fmt.Printf("Read: %#v\n", req)
return nil, nil
}

// Stop tells the FakeLogCache server to stop and waits for it to shutdown.
func (f *FakeLogCache) Stop() error {
f.s.Stop()
return <-f.serveCh
}
7 changes: 6 additions & 1 deletion src/internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"net/http"
"strconv"
"time"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
Expand Down Expand Up @@ -179,7 +180,11 @@ func (g *Gateway) listenAndServe() {
}

func (g *Gateway) handleInfoEndpoint(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn())))
b := []byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn()))

w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
_, err := w.Write(b)
if err != nil {
g.log.Println("Cannot send result for the info endpoint")
}
Expand Down
Loading

0 comments on commit 1371eef

Please sign in to comment.