diff --git a/Makefile b/Makefile index 5559cb43..d2f28d98 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,12 @@ download: .PHONY: lint lint: - golangci-lint --timeout 10m run ./... + bazel mod tidy + bazel run //:gazelle + bazel run @com_github_bazelbuild_buildtools//:buildifier + bazel run @cc_mvdan_gofumpt//:gofumpt -- -w -extra $(CURDIR) + bazel run @org_golang_x_lint//golint -- -set_exit_status $(CURDIR)/... + bazel test //... .PHONY: lint-fix lint-fix: diff --git a/ent/gen/ent/migrate/schema.go b/ent/gen/ent/migrate/schema.go index 53ccb5de..91119c59 100644 --- a/ent/gen/ent/migrate/schema.go +++ b/ent/gen/ent/migrate/schema.go @@ -348,7 +348,7 @@ var ( // EventFilesColumns holds the columns for the "event_files" table. EventFilesColumns = []*schema.Column{ {Name: "id", Type: field.TypeInt, Increment: true}, - {Name: "url", Type: field.TypeString, Unique: true}, + {Name: "url", Type: field.TypeString}, {Name: "mod_time", Type: field.TypeTime}, {Name: "protocol", Type: field.TypeString}, {Name: "mime_type", Type: field.TypeString}, diff --git a/ent/schema/eventfile.go b/ent/schema/eventfile.go index fb98eba7..eb24bcef 100644 --- a/ent/schema/eventfile.go +++ b/ent/schema/eventfile.go @@ -15,7 +15,7 @@ type EventFile struct { // Fields of the EventFile. func (EventFile) Fields() []ent.Field { return []ent.Field{ - field.String("url").Unique().Immutable(), + field.String("url").Immutable(), field.Time("mod_time"), field.String("protocol"), // *.bep, *.log, etc field.String("mime_type"), diff --git a/frontend/src/components/BazelInvocationsTable/index.tsx b/frontend/src/components/BazelInvocationsTable/index.tsx index 826f79ae..98f72ab3 100644 --- a/frontend/src/components/BazelInvocationsTable/index.tsx +++ b/frontend/src/components/BazelInvocationsTable/index.tsx @@ -30,7 +30,7 @@ const BazelInvocationsTable: React.FC = ({ height }) => { const { loading, data, previousData, error } = useQuery(FIND_BAZEL_INVOCATIONS_QUERY, { variables, pollInterval: 120000, - fetchPolicy: 'cache-and-network', + fetchPolicy: "network-only", }); const onChange: TableProps['onChange'] = useCallback( diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index c03f8533..8e8e3df4 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -39,7 +39,6 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { if event.GetBazelEvent() == nil { return nil } - var bazelEvent bes.BuildEvent err := event.GetBazelEvent().UnmarshalTo(&bazelEvent) if err != nil { @@ -56,25 +55,40 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { // Finalize implements BuildEventChannel.Finalize. func (c *buildEventChannel) Finalize() error { + // defer the ctx so its not reaped when the client closes the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) + defer cancel() summaryReport, err := c.summarizer.FinishProcessing() if err != nil { slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) + cancel() return err } - // Hack for eventFile being required summaryReport.EventFileURL = fmt.Sprintf( - "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s", - c.streamID.String(), + "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?invocationId=%s&buildID=%s&component=%s", + c.streamID.GetInvocationId(), c.streamID.GetBuildId(), c.streamID.GetComponent(), ) - - slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamID.String()) + slog.InfoContext(c.ctx, "Saving invocation", + "InvocationId", c.streamID.GetInvocationId(), + "BuildId", c.streamID.GetBuildId(), + "Component", c.streamID.GetComponent()) startTime := time.Now() - invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport) + // try to get the invocation id + if summaryReport.InvocationID == "" { + summaryReport.InvocationID = c.streamID.GetInvocationId() + slog.WarnContext(c.ctx, "summaryReport was missing invocation ID", + "invocationId", c.streamID.GetInvocationId(), + "buildId", c.streamID.GetBuildId(), + "component", c.streamID.GetComponent()) + } + invocation, err := c.workflow.SaveSummary(ctx, summaryReport) if err != nil { - slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err) + slog.ErrorContext(ctx, "SaveSummary failed", "err", err) + cancel() return err } + cancel() endTime := time.Now() elapsedTime := endTime.Sub(startTime) slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index d2e98379..e4c36573 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -47,12 +47,35 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE // We'll want to ack these once all events are received, as we don't support resumption. seqNrs := make([]int64, 0) - ack := func(streamID *build.StreamId, sequenceNumber int64) { + ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) { if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ StreamId: streamID, SequenceNumber: sequenceNumber, }); err != nil { - slog.ErrorContext(stream.Context(), "Send failed", "err", err) + + // with the option --bes_upload_mode=fully_async or nowait_for_upload_complete + // its not an error when the send fails. the bes gracefully terminated the close + // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) + // the context is processed in the background, so by the time we are acknowledging these + // requests, the client connection may have already timed out and these errors can be + // safely ignored + grpcErr := status.Convert(err) + if isClosing && + grpcErr.Code() == codes.Unavailable && + grpcErr.Message() == "transport is closing" { + return + } + + slog.ErrorContext( + stream.Context(), + "Send failed", + "err", + err, + "streamid", + streamID, + "sequenceNumber", + sequenceNumber, + ) } } @@ -77,7 +100,9 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE case err := <-errCh: if err == io.EOF { slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) + if eventCh == nil { + slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context()) return nil } @@ -100,7 +125,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE // Ack all events for _, seqNr := range seqNrs { - ack(streamID, seqNr) + ack(streamID, seqNr, true) } return nil diff --git a/pkg/processing/save.go b/pkg/processing/save.go index 96842cf9..a55b624e 100644 --- a/pkg/processing/save.go +++ b/pkg/processing/save.go @@ -30,35 +30,35 @@ type SaveActor struct { // SaveSummary saves an invocation summary to the database. func (act SaveActor) SaveSummary(ctx context.Context, summary *summary.Summary) (*ent.BazelInvocation, error) { + // errors := []error{} + if summary.InvocationID == "" { + slog.ErrorContext(ctx, "No Invocation ID Found on summary", "ctx.Err()", ctx.Err()) + return nil, fmt.Errorf("no Invocation ID Found on summary") + } eventFile, err := act.saveEventFile(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save event file", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save EventFile: %w", err) } buildRecord, err := act.findOrCreateBuild(ctx, summary) if err != nil { - slog.ErrorContext(ctx, "failed to find or create build", "id", summary.InvocationID, "err", err) - return nil, err + slog.ErrorContext(ctx, "failed to find or create build", "summary.InvocationId", summary.InvocationID, "err", err) } metrics, err := act.saveMetrics(ctx, summary.Metrics) if err != nil { slog.ErrorContext(ctx, "failed to save metrics", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save Metrics: %w", err) } targets, err := act.saveTargets(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save targets", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save Targets: %w", err) } tests, err := act.saveTests(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save tests", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save test results: %w", err) + tests = nil } sourcecontrol, err := act.saveSourceControl(ctx, summary) if err != nil { slog.ErrorContext(ctx, "failed to save source control information", "id", summary.InvocationID, "err", err) - return nil, fmt.Errorf("could not save source control information: %w", err) } bazelInvocation, err := act.saveBazelInvocation(ctx, summary, eventFile, buildRecord, metrics, tests, targets, sourcecontrol) if err != nil { @@ -139,6 +139,9 @@ func (act SaveActor) saveBazelInvocation( targets []*ent.TargetPair, sourcecontrol *ent.SourceControl, ) (*ent.BazelInvocation, error) { + if summary == nil { + return nil, fmt.Errorf("no summary object provided") + } uniqueID, err := uuid.Parse(summary.InvocationID) if err != nil { return nil, err @@ -158,15 +161,24 @@ func (act SaveActor) saveBazelInvocation( SetConfigurationMnemonic(summary.ConfigrationMnemonic). SetPlatformName(summary.PlatformName). SetNumFetches(summary.NumFetches). - SetBuildLogs(summary.BuildLogs.String()). SetUserLdap(summary.UserLDAP). - SetRelatedFiles(summary.RelatedFiles). - SetEventFile(eventFile). - SetMetrics(metrics). - SetSourceControl(sourcecontrol). - AddTestCollection(tests...). - AddTargets(targets...) + SetRelatedFiles(summary.RelatedFiles) + if eventFile != nil { + create = create.SetEventFile(eventFile) + } + if metrics != nil { + create = create.SetMetrics(metrics) + } + if tests != nil { + create = create.AddTestCollection(tests...) + } + if targets != nil { + create = create.AddTargets(targets...) + } + if sourcecontrol != nil { + create = create.SetSourceControl(sourcecontrol) + } if buildRecord != nil { create = create.SetBuild(buildRecord) } @@ -210,7 +222,7 @@ func (act SaveActor) saveTestFiles(ctx context.Context, files []summary.TestFile func (act SaveActor) saveOutputGroup(ctx context.Context, ouputGroup summary.OutputGroup) (*ent.OutputGroup, error) { inlineFiles, err := act.saveTestFiles(ctx, ouputGroup.InlineFiles) if err != nil { - slog.ErrorContext(ctx, "failed to save output group", "id", "err", err) + slog.ErrorContext(ctx, "failed to save output group", "err", err) return nil, err }