Skip to content

Commit

Permalink
Start gRPC and HTTP servers using buildbarn lib
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenmj committed Oct 13, 2024
1 parent d93c625 commit 9c4ef54
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 111 deletions.
26 changes: 18 additions & 8 deletions cmd/bb_portal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,30 @@ go_library(
"//ent/gen/ent",
"//ent/gen/ent/migrate",
"//internal/api",
"//internal/api/grpc",
"//internal/api/grpc/bes",
"//internal/graphql",
"//pkg/cas",
"//pkg/processing",
"//pkg/proto/configuration/bb_portal",
"@com_github_99designs_gqlgen//graphql/handler",
"@com_github_99designs_gqlgen//graphql/handler/debug",
"@com_github_99designs_gqlgen//graphql/playground",
"@com_github_buildbarn_bb_storage//pkg/global",
"@com_github_buildbarn_bb_storage//pkg/grpc",
"@com_github_buildbarn_bb_storage//pkg/http",
"@com_github_buildbarn_bb_storage//pkg/program",
"@com_github_buildbarn_bb_storage//pkg/util",
"@com_github_fsnotify_fsnotify//:fsnotify",
"@com_github_gorilla_mux//:mux",
"@com_github_mattn_go_sqlite3//:go-sqlite3",
"@io_entgo_contrib//entgql",
"@org_golang_google_genproto//googleapis/devtools/build/v1:build",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)

go_binary(
name = "bb_portal",
cgo = True, # keep
embed = [":bb_portal_lib"],
visibility = ["//visibility:public"],
)

multiarch_go_image(
name = "bb_portal_container",
binary = ":bb_portal",
Expand All @@ -41,3 +45,9 @@ container_push_official(
component = "bb-portal",
image = ":bb_portal_container",
)

go_binary(
name = "bb_portal",
embed = [":bb_portal_lib"],
visibility = ["//visibility:public"],
)
147 changes: 80 additions & 67 deletions cmd/bb_portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package main
import (
"context"
"flag"
"log"
"log/slog"
"net"
"net/http"
"net/http/httputil"
"net/url"
Expand All @@ -17,15 +15,26 @@ import (
"github.com/99designs/gqlgen/graphql/handler/debug"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/fsnotify/fsnotify"
"github.com/gorilla/mux"
_ "github.com/mattn/go-sqlite3"
build "google.golang.org/genproto/googleapis/devtools/build/v1"
go_grpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/buildbarn/bb-portal/ent/gen/ent"
"github.com/buildbarn/bb-portal/ent/gen/ent/migrate"
"github.com/buildbarn/bb-portal/internal/api"
"github.com/buildbarn/bb-portal/internal/api/grpc"
"github.com/buildbarn/bb-portal/internal/api/grpc/bes"
"github.com/buildbarn/bb-portal/internal/graphql"
"github.com/buildbarn/bb-portal/pkg/cas"
"github.com/buildbarn/bb-portal/pkg/processing"
"github.com/buildbarn/bb-portal/pkg/proto/configuration/bb_portal"
"github.com/buildbarn/bb-storage/pkg/global"
bb_grpc "github.com/buildbarn/bb-storage/pkg/grpc"
bb_http "github.com/buildbarn/bb-storage/pkg/http"
"github.com/buildbarn/bb-storage/pkg/program"
"github.com/buildbarn/bb-storage/pkg/util"
)

const (
Expand All @@ -34,8 +43,6 @@ const (
)

var (
httpBindAddr = flag.String("bind-http", ":8081", "Bind address for the HTTP server.")
grpcBindAddr = flag.String("bind-grpc", ":8082", "Bind address for the gRPC server.")
enableDebug = flag.Bool("debug", false, "Enable debugging mode.")
dsDriver = flag.String("datasource-driver", "sqlite3", "Data source driver to use")
dsURL = flag.String("datasource-url", "file:buildportal.db?_journal=WAL&_fk=1", "Data source URL for the DB")
Expand All @@ -47,62 +54,82 @@ var (
)

func main() {
flag.Parse()
program.RunMain(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error {
flag.Parse()

client, err := ent.Open(
*dsDriver,
*dsURL,
)
if err != nil {
fatal("opening ent client", "err", err)
}
if err = client.Schema.Create(context.Background(), migrate.WithGlobalUniqueID(true)); err != nil {
fatal("running schema migration", "err", err)
}
if len(os.Args) != 2 {
return status.Error(codes.InvalidArgument, "Usage: bb_portal bb_portal.jsonnet")
}

blobArchiver := processing.NewBlobMultiArchiver()
configureBlobArchiving(blobArchiver, *blobArchiveFolder)
var configuration bb_portal.ApplicationConfiguration
if err := util.UnmarshalConfigurationFromFile(os.Args[1], &configuration); err != nil {
return util.StatusWrapf(err, "Failed to read configuration from %s", os.Args[1])
}

// Create new watcher.
watcher, err := fsnotify.NewWatcher()
if err != nil {
fatal("failed to create fsnotify.Watcher", "err", err)
}
defer watcher.Close()
runWatcher(watcher, client, *bepFolder, blobArchiver)
lifecycleState, _, err := global.ApplyConfiguration(configuration.Global)
if err != nil {
return util.StatusWrap(err, "Failed to apply global configuration options")
}

srv := handler.NewDefaultServer(graphql.NewSchema(client))
srv.Use(entgql.Transactioner{TxOpener: client})
if *enableDebug {
srv.Use(&debug.Tracer{})
}
client, err := ent.Open(
*dsDriver,
*dsURL,
)
if err != nil {
return util.StatusWrapf(err, "Failed to open ent client")
}
if err = client.Schema.Create(context.Background(), migrate.WithGlobalUniqueID(true)); err != nil {
return util.StatusWrapf(err, "Failed to run schema migration")
}

fs := frontendServer()
http.Handle("/graphql", srv)
http.Handle("/graphiql",
playground.Handler("GraphQL Playground", "/graphql"),
)
casManager := cas.NewConnectionManager(cas.ManagerParams{
TLSCACertFile: *caFile,
CredentialsHelperCommand: *credentialsHelperCommand,
})
blobArchiver := processing.NewBlobMultiArchiver()
configureBlobArchiving(blobArchiver, *blobArchiveFolder)

// Create new watcher.
watcher, err := fsnotify.NewWatcher()
if err != nil {
return util.StatusWrapf(err, "Failed to create fsnotify.Watcher")
}
defer watcher.Close()
runWatcher(watcher, client, *bepFolder, blobArchiver)

http.Handle("/api/v1/blobs/{blobID}/{name}", api.NewBlobHandler(client, casManager))
http.Handle("POST /api/v1/bep/upload", api.NewBEPUploadHandler(client, blobArchiver))
http.Handle("/", fs)
slog.Info("HTTP listening on", "address", *httpBindAddr)
srv := handler.NewDefaultServer(graphql.NewSchema(client))
srv.Use(entgql.Transactioner{TxOpener: client})
if *enableDebug {
srv.Use(&debug.Tracer{})
}

grpcServer := runGRPCServer(client, *grpcBindAddr, blobArchiver)
defer grpcServer.GracefulStop()
slog.Info("gRPC listening on", "address", *grpcBindAddr)
router := mux.NewRouter()
router.PathPrefix("/graphql").Handler(srv)
router.Handle("/graphiql", playground.Handler("GraphQL Playground", "/graphql"))
casManager := cas.NewConnectionManager(cas.ManagerParams{
TLSCACertFile: *caFile,
CredentialsHelperCommand: *credentialsHelperCommand,
})

router.Handle("/api/v1/blobs/{blobID}/{name}", api.NewBlobHandler(client, casManager))
router.Handle("/api/v1/bep/upload", api.NewBEPUploadHandler(client, blobArchiver)).Methods("POST")
router.PathPrefix("/").Handler(frontendServer())

bb_http.NewServersFromConfigurationAndServe(
configuration.HttpServers,
bb_http.NewMetricsHandler(router, "Diagnostics"),
siblingsGroup,
)

if err := bb_grpc.NewServersFromConfigurationAndServe(
configuration.GrpcServers,
func(s go_grpc.ServiceRegistrar) {
build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.New(client, blobArchiver))
},
siblingsGroup,
); err != nil {
return util.StatusWrap(err, "gRPC server failure")
}

server := &http.Server{
Addr: *httpBindAddr,
ReadHeaderTimeout: readHeaderTimeout,
}
if err = server.ListenAndServe(); err != nil {
slog.Error("http server terminated", "err", err)
}
lifecycleState.MarkReadyAndWait(siblingsGroup)
return nil
})
}

func configureBlobArchiving(blobArchiver processing.BlobMultiArchiver, archiveFolder string) {
Expand All @@ -114,20 +141,6 @@ func configureBlobArchiving(blobArchiver processing.BlobMultiArchiver, archiveFo
blobArchiver.RegisterArchiver("file", localBlobArchiver)
}

func runGRPCServer(db *ent.Client, bindAddr string, blobArchiver processing.BlobMultiArchiver) *grpc.Server {
lis, err := net.Listen("tcp", bindAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
srv := grpc.NewServer(db, blobArchiver)
go func() {
if err := srv.Serve(lis); err != nil {
slog.Error("error from gRPC server", "err", err)
}
}()
return srv
}

func runWatcher(watcher *fsnotify.Watcher, client *ent.Client, bepFolder string, blobArchiver processing.BlobMultiArchiver) {
ctx := context.Background()
worker := processing.New(client, blobArchiver)
Expand Down
15 changes: 0 additions & 15 deletions internal/api/grpc/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,15 +0,0 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "grpc",
srcs = ["server.go"],
importpath = "github.com/buildbarn/bb-portal/internal/api/grpc",
visibility = ["//:__subpackages__"],
deps = [
"//ent/gen/ent",
"//internal/api/grpc/bes",
"//pkg/processing",
"@org_golang_google_genproto//googleapis/devtools/build/v1:build",
"@org_golang_google_grpc//:grpc",
],
)
21 changes: 0 additions & 21 deletions internal/api/grpc/server.go

This file was deleted.

0 comments on commit 9c4ef54

Please sign in to comment.