From ca8f9f2ee3b3763f344e12dd63fb5fe490213b2e Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Thu, 24 Oct 2024 17:07:10 +0000 Subject: [PATCH 1/7] remove fk unique constraint from event file --- Makefile | 7 ++++++- ent/gen/ent/migrate/schema.go | 2 +- ent/schema/eventfile.go | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 5559cb43..d2f28d98 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,12 @@ download: .PHONY: lint lint: - golangci-lint --timeout 10m run ./... + bazel mod tidy + bazel run //:gazelle + bazel run @com_github_bazelbuild_buildtools//:buildifier + bazel run @cc_mvdan_gofumpt//:gofumpt -- -w -extra $(CURDIR) + bazel run @org_golang_x_lint//golint -- -set_exit_status $(CURDIR)/... + bazel test //... .PHONY: lint-fix lint-fix: diff --git a/ent/gen/ent/migrate/schema.go b/ent/gen/ent/migrate/schema.go index 53ccb5de..91119c59 100644 --- a/ent/gen/ent/migrate/schema.go +++ b/ent/gen/ent/migrate/schema.go @@ -348,7 +348,7 @@ var ( // EventFilesColumns holds the columns for the "event_files" table. EventFilesColumns = []*schema.Column{ {Name: "id", Type: field.TypeInt, Increment: true}, - {Name: "url", Type: field.TypeString, Unique: true}, + {Name: "url", Type: field.TypeString}, {Name: "mod_time", Type: field.TypeTime}, {Name: "protocol", Type: field.TypeString}, {Name: "mime_type", Type: field.TypeString}, diff --git a/ent/schema/eventfile.go b/ent/schema/eventfile.go index fb98eba7..eb24bcef 100644 --- a/ent/schema/eventfile.go +++ b/ent/schema/eventfile.go @@ -15,7 +15,7 @@ type EventFile struct { // Fields of the EventFile. func (EventFile) Fields() []ent.Field { return []ent.Field{ - field.String("url").Unique().Immutable(), + field.String("url").Immutable(), field.Time("mod_time"), field.String("protocol"), // *.bep, *.log, etc field.String("mime_type"), From a3c937bab4f1612058614c9ff45eab5c1bed6c75 Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Thu, 24 Oct 2024 17:59:15 +0000 Subject: [PATCH 2/7] add stream id and sequence number to err msg --- internal/api/grpc/bes/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index d2e98379..9e072fdf 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -52,7 +52,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE StreamId: streamID, SequenceNumber: sequenceNumber, }); err != nil { - slog.ErrorContext(stream.Context(), "Send failed", "err", err) + slog.ErrorContext(stream.Context(), "Send failed", "err", err, "streamid", streamID, "sequenceNumber", sequenceNumber) } } From 3b788476fbc9df6272ae6127743e00a1c2f56a7a Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Fri, 25 Oct 2024 16:06:10 +0000 Subject: [PATCH 3/7] push database saves to a context background process to avoid failures on larger builds --- .../BazelInvocationsTable/index.tsx | 2 +- internal/api/grpc/bes/channel.go | 34 ++++++++++++--- internal/api/grpc/bes/server.go | 28 +++++++++++-- pkg/processing/save.go | 42 ++++++++++++------- pkg/proto/configuration/bb_portal/BUILD.bazel | 9 +++- 5 files changed, 89 insertions(+), 26 deletions(-) diff --git a/frontend/src/components/BazelInvocationsTable/index.tsx b/frontend/src/components/BazelInvocationsTable/index.tsx index 826f79ae..98f72ab3 100644 --- a/frontend/src/components/BazelInvocationsTable/index.tsx +++ b/frontend/src/components/BazelInvocationsTable/index.tsx @@ -30,7 +30,7 @@ const BazelInvocationsTable: React.FC = ({ height }) => { const { loading, data, previousData, error } = useQuery(FIND_BAZEL_INVOCATIONS_QUERY, { variables, pollInterval: 120000, - fetchPolicy: 'cache-and-network', + fetchPolicy: "network-only", }); const onChange: TableProps['onChange'] = useCallback( diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index c03f8533..f0cd8c91 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -39,7 +39,6 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { if event.GetBazelEvent() == nil { return nil } - var bazelEvent bes.BuildEvent err := event.GetBazelEvent().UnmarshalTo(&bazelEvent) if err != nil { @@ -56,6 +55,10 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { // Finalize implements BuildEventChannel.Finalize. func (c *buildEventChannel) Finalize() error { + // defer the ctx so its not reaped when the client closes the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) + defer cancel() + summaryReport, err := c.summarizer.FinishProcessing() if err != nil { slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) @@ -64,19 +67,38 @@ func (c *buildEventChannel) Finalize() error { // Hack for eventFile being required summaryReport.EventFileURL = fmt.Sprintf( - "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s", - c.streamID.String(), + "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?invocationId=%s&buildID=%s&component=%s", + c.streamID.GetInvocationId(), c.streamID.GetBuildId(), c.streamID.GetComponent(), ) - slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamID.String()) + slog.InfoContext(c.ctx, "Saving invocation", + "InvocationId", c.streamID.GetInvocationId(), + "BuildId", c.streamID.GetBuildId(), + "Component", c.streamID.GetComponent()) + startTime := time.Now() - invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport) + + // try to get the invocation id + if summaryReport.InvocationID == "" { + summaryReport.InvocationID = c.streamID.GetInvocationId() + slog.WarnContext(c.ctx, "summaryReport was missing invocation ID", + "invocationId", c.streamID.GetInvocationId(), + "buildId", c.streamID.GetBuildId(), + "component", c.streamID.GetComponent()) + } + + invocation, err := c.workflow.SaveSummary(ctx, summaryReport) // will c.workflow still be there? if err != nil { - slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err) + slog.ErrorContext(ctx, "SaveSummary failed", "err", err) + cancel() return err } + + cancel() + endTime := time.Now() elapsedTime := endTime.Sub(startTime) + slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) return nil } diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index 9e072fdf..0f6ade20 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -47,12 +47,32 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE // We'll want to ack these once all events are received, as we don't support resumption. seqNrs := make([]int64, 0) - ack := func(streamID *build.StreamId, sequenceNumber int64) { + ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) { if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ StreamId: streamID, SequenceNumber: sequenceNumber, }); err != nil { - slog.ErrorContext(stream.Context(), "Send failed", "err", err, "streamid", streamID, "sequenceNumber", sequenceNumber) + + // with the option --bes_uplod_mode=fully_async or nowait_for_upload_complete + // its not an error when the send fails. the bes gracefully terminated the close + // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) + // the context is processed in the background, so by the time we are acknowledging these + // requests, the client connection may have already timed out and these errors can be + // safely ignored + if isClosing && (err.Error() != "rpc error: code = Unavailable desc = transport is closing") { + return + } + + slog.ErrorContext( + stream.Context(), + "Send failed", + "err", + err, + "streamid", + streamID, + "sequenceNumber", + sequenceNumber, + ) } } @@ -77,7 +97,9 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE case err := <-errCh: if err == io.EOF { slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) + if eventCh == nil { + slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context()) return nil } @@ -100,7 +122,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE // Ack all events for _, seqNr := range seqNrs { - ack(streamID, seqNr) + ack(streamID, seqNr, true) } return nil diff --git a/pkg/processing/save.go b/pkg/processing/save.go index 96842cf9..a55b624e 100644 --- a/pkg/processing/save.go +++ b/pkg/processing/save.go @@ -30,35 +30,35 @@ type SaveActor struct { // SaveSummary saves an invocation summary to the database. func (act SaveActor) SaveSummary(ctx context.Context, summary *summary.Summary) (*ent.BazelInvocation, error) { + // errors := []error{} + if summary.InvocationID == "" { + slog.ErrorContext(ctx, "No Invocation ID Found on summary", "ctx.Err()", ctx.Err()) + return nil, fmt.Errorf("no Invocation ID Found on summary") + } eventFile, err := act.saveEventFile(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save event file", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save EventFile: %w", err) } buildRecord, err := act.findOrCreateBuild(ctx, summary) if err != nil { - slog.ErrorContext(ctx, "failed to find or create build", "id", summary.InvocationID, "err", err) - return nil, err + slog.ErrorContext(ctx, "failed to find or create build", "summary.InvocationId", summary.InvocationID, "err", err) } metrics, err := act.saveMetrics(ctx, summary.Metrics) if err != nil { slog.ErrorContext(ctx, "failed to save metrics", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save Metrics: %w", err) } targets, err := act.saveTargets(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save targets", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save Targets: %w", err) } tests, err := act.saveTests(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save tests", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save test results: %w", err) + tests = nil } sourcecontrol, err := act.saveSourceControl(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save source control information", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save source control information: %w", err) } bazelInvocation, err := act.saveBazelInvocation(ctx, summary, eventFile, buildRecord, metrics, tests, targets, sourcecontrol) if err != nil { @@ -139,6 +139,9 @@ func (act SaveActor) saveBazelInvocation( targets []*ent.TargetPair, sourcecontrol *ent.SourceControl, ) (*ent.BazelInvocation, error) { + if summary == nil { + return nil, fmt.Errorf("no summary object provided") + } uniqueID, err := uuid.Parse(summary.InvocationID) if err != nil { return nil, err @@ -158,15 +161,24 @@ func (act SaveActor) saveBazelInvocation( SetConfigurationMnemonic(summary.ConfigrationMnemonic). SetPlatformName(summary.PlatformName). SetNumFetches(summary.NumFetches). - SetBuildLogs(summary.BuildLogs.String()). SetUserLdap(summary.UserLDAP). - SetRelatedFiles(summary.RelatedFiles). - SetEventFile(eventFile). - SetMetrics(metrics). - SetSourceControl(sourcecontrol). - AddTestCollection(tests...). - AddTargets(targets...) + SetRelatedFiles(summary.RelatedFiles) + if eventFile != nil { + create = create.SetEventFile(eventFile) + } + if metrics != nil { + create = create.SetMetrics(metrics) + } + if tests != nil { + create = create.AddTestCollection(tests...) + } + if targets != nil { + create = create.AddTargets(targets...) + } + if sourcecontrol != nil { + create = create.SetSourceControl(sourcecontrol) + } if buildRecord != nil { create = create.SetBuild(buildRecord) } @@ -210,7 +222,7 @@ func (act SaveActor) saveTestFiles(ctx context.Context, files []summary.TestFile func (act SaveActor) saveOutputGroup(ctx context.Context, ouputGroup summary.OutputGroup) (*ent.OutputGroup, error) { inlineFiles, err := act.saveTestFiles(ctx, ouputGroup.InlineFiles) if err != nil { - slog.ErrorContext(ctx, "failed to save output group", "id", "err", err) + slog.ErrorContext(ctx, "failed to save output group", "err", err) return nil, err } diff --git a/pkg/proto/configuration/bb_portal/BUILD.bazel b/pkg/proto/configuration/bb_portal/BUILD.bazel index 25ee3845..1864723a 100644 --- a/pkg/proto/configuration/bb_portal/BUILD.bazel +++ b/pkg/proto/configuration/bb_portal/BUILD.bazel @@ -27,7 +27,14 @@ go_proto_library( go_library( name = "bb_portal", - embed = [":bb_portal_go_proto"], + srcs = ["bb_portal.pb.go"], importpath = "github.com/buildbarn/bb-portal/pkg/proto/configuration/bb_portal", visibility = ["//visibility:public"], + deps = [ + "@com_github_buildbarn_bb_storage//pkg/proto/configuration/global", + "@com_github_buildbarn_bb_storage//pkg/proto/configuration/grpc", + "@com_github_buildbarn_bb_storage//pkg/proto/configuration/http", + "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//runtime/protoimpl", + ], ) From d3d4c4974f8013e71740b2eee6d753df6414320d Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Fri, 25 Oct 2024 16:28:22 +0000 Subject: [PATCH 4/7] add cancel call in err from summaryReport --- internal/api/grpc/bes/channel.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index f0cd8c91..a565dc8d 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -62,6 +62,7 @@ func (c *buildEventChannel) Finalize() error { summaryReport, err := c.summarizer.FinishProcessing() if err != nil { slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) + cancel() return err } From 412e999f676845f97cd103f7d423073b559c2a44 Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Fri, 25 Oct 2024 16:31:33 +0000 Subject: [PATCH 5/7] cleanup --- internal/api/grpc/bes/channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index a565dc8d..f71d2099 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -88,7 +88,7 @@ func (c *buildEventChannel) Finalize() error { "component", c.streamID.GetComponent()) } - invocation, err := c.workflow.SaveSummary(ctx, summaryReport) // will c.workflow still be there? + invocation, err := c.workflow.SaveSummary(ctx, summaryReport) if err != nil { slog.ErrorContext(ctx, "SaveSummary failed", "err", err) cancel() From f77a177f3e61105ca9323b1d8988aefa59ac2799 Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Fri, 25 Oct 2024 16:43:10 +0000 Subject: [PATCH 6/7] weird gazelle issue --- internal/api/grpc/bes/channel.go | 9 --------- pkg/proto/configuration/bb_portal/BUILD.bazel | 9 +-------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index f71d2099..8e8e3df4 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -58,27 +58,22 @@ func (c *buildEventChannel) Finalize() error { // defer the ctx so its not reaped when the client closes the connection ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) defer cancel() - summaryReport, err := c.summarizer.FinishProcessing() if err != nil { slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) cancel() return err } - // Hack for eventFile being required summaryReport.EventFileURL = fmt.Sprintf( "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?invocationId=%s&buildID=%s&component=%s", c.streamID.GetInvocationId(), c.streamID.GetBuildId(), c.streamID.GetComponent(), ) - slog.InfoContext(c.ctx, "Saving invocation", "InvocationId", c.streamID.GetInvocationId(), "BuildId", c.streamID.GetBuildId(), "Component", c.streamID.GetComponent()) - startTime := time.Now() - // try to get the invocation id if summaryReport.InvocationID == "" { summaryReport.InvocationID = c.streamID.GetInvocationId() @@ -87,19 +82,15 @@ func (c *buildEventChannel) Finalize() error { "buildId", c.streamID.GetBuildId(), "component", c.streamID.GetComponent()) } - invocation, err := c.workflow.SaveSummary(ctx, summaryReport) if err != nil { slog.ErrorContext(ctx, "SaveSummary failed", "err", err) cancel() return err } - cancel() - endTime := time.Now() elapsedTime := endTime.Sub(startTime) - slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) return nil } diff --git a/pkg/proto/configuration/bb_portal/BUILD.bazel b/pkg/proto/configuration/bb_portal/BUILD.bazel index 1864723a..25ee3845 100644 --- a/pkg/proto/configuration/bb_portal/BUILD.bazel +++ b/pkg/proto/configuration/bb_portal/BUILD.bazel @@ -27,14 +27,7 @@ go_proto_library( go_library( name = "bb_portal", - srcs = ["bb_portal.pb.go"], + embed = [":bb_portal_go_proto"], importpath = "github.com/buildbarn/bb-portal/pkg/proto/configuration/bb_portal", visibility = ["//visibility:public"], - deps = [ - "@com_github_buildbarn_bb_storage//pkg/proto/configuration/global", - "@com_github_buildbarn_bb_storage//pkg/proto/configuration/grpc", - "@com_github_buildbarn_bb_storage//pkg/proto/configuration/http", - "@org_golang_google_protobuf//reflect/protoreflect", - "@org_golang_google_protobuf//runtime/protoimpl", - ], ) From acedc53c0074ac317e42bd89d9fbe465bbf42956 Mon Sep 17 00:00:00 2001 From: Trey Ivy Date: Mon, 28 Oct 2024 13:01:43 +0000 Subject: [PATCH 7/7] check error message code --- internal/api/grpc/bes/server.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index 0f6ade20..e4c36573 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -53,13 +53,16 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE SequenceNumber: sequenceNumber, }); err != nil { - // with the option --bes_uplod_mode=fully_async or nowait_for_upload_complete + // with the option --bes_upload_mode=fully_async or nowait_for_upload_complete // its not an error when the send fails. the bes gracefully terminated the close // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) // the context is processed in the background, so by the time we are acknowledging these // requests, the client connection may have already timed out and these errors can be // safely ignored - if isClosing && (err.Error() != "rpc error: code = Unavailable desc = transport is closing") { + grpcErr := status.Convert(err) + if isClosing && + grpcErr.Code() == codes.Unavailable && + grpcErr.Message() == "transport is closing" { return }