Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push Database Finish Processing Tasks to context background process #51

Merged
merged 7 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ download:

.PHONY: lint
lint:
golangci-lint --timeout 10m run ./...
bazel mod tidy
bazel run //:gazelle
bazel run @com_github_bazelbuild_buildtools//:buildifier
bazel run @cc_mvdan_gofumpt//:gofumpt -- -w -extra $(CURDIR)
bazel run @org_golang_x_lint//golint -- -set_exit_status $(CURDIR)/...
bazel test //...

.PHONY: lint-fix
lint-fix:
Expand Down
2 changes: 1 addition & 1 deletion ent/gen/ent/migrate/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ent/schema/eventfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type EventFile struct {
// Fields of the EventFile.
func (EventFile) Fields() []ent.Field {
return []ent.Field{
field.String("url").Unique().Immutable(),
field.String("url").Immutable(),
field.Time("mod_time"),
field.String("protocol"), // *.bep, *.log, etc
field.String("mime_type"),
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/BazelInvocationsTable/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const BazelInvocationsTable: React.FC<Props> = ({ height }) => {
const { loading, data, previousData, error } = useQuery(FIND_BAZEL_INVOCATIONS_QUERY, {
variables,
pollInterval: 120000,
fetchPolicy: 'cache-and-network',
fetchPolicy: "network-only",
});

const onChange: TableProps<BazelInvocationNodeFragment>['onChange'] = useCallback(
Expand Down
30 changes: 22 additions & 8 deletions internal/api/grpc/bes/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
if event.GetBazelEvent() == nil {
return nil
}

var bazelEvent bes.BuildEvent
err := event.GetBazelEvent().UnmarshalTo(&bazelEvent)
if err != nil {
Expand All @@ -56,25 +55,40 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {

// Finalize implements BuildEventChannel.Finalize.
func (c *buildEventChannel) Finalize() error {
// defer the ctx so its not reaped when the client closes the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24)
defer cancel()
summaryReport, err := c.summarizer.FinishProcessing()
if err != nil {
slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err)
cancel()
return err
}

// Hack for eventFile being required
summaryReport.EventFileURL = fmt.Sprintf(
"grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s",
c.streamID.String(),
"grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?invocationId=%s&buildID=%s&component=%s",
c.streamID.GetInvocationId(), c.streamID.GetBuildId(), c.streamID.GetComponent(),
)

slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamID.String())
slog.InfoContext(c.ctx, "Saving invocation",
"InvocationId", c.streamID.GetInvocationId(),
"BuildId", c.streamID.GetBuildId(),
"Component", c.streamID.GetComponent())
startTime := time.Now()
invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport)
// try to get the invocation id
if summaryReport.InvocationID == "" {
summaryReport.InvocationID = c.streamID.GetInvocationId()
slog.WarnContext(c.ctx, "summaryReport was missing invocation ID",
"invocationId", c.streamID.GetInvocationId(),
"buildId", c.streamID.GetBuildId(),
"component", c.streamID.GetComponent())
}
invocation, err := c.workflow.SaveSummary(ctx, summaryReport)
if err != nil {
slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err)
slog.ErrorContext(ctx, "SaveSummary failed", "err", err)
cancel()
return err
}
cancel()
endTime := time.Now()
elapsedTime := endTime.Sub(startTime)
slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID)
Expand Down
31 changes: 28 additions & 3 deletions internal/api/grpc/bes/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,35 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE
// We'll want to ack these once all events are received, as we don't support resumption.
seqNrs := make([]int64, 0)

ack := func(streamID *build.StreamId, sequenceNumber int64) {
ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) {
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
StreamId: streamID,
SequenceNumber: sequenceNumber,
}); err != nil {
slog.ErrorContext(stream.Context(), "Send failed", "err", err)

// with the option --bes_upload_mode=fully_async or nowait_for_upload_complete
// its not an error when the send fails. the bes gracefully terminated the close
// i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s)
// the context is processed in the background, so by the time we are acknowledging these
// requests, the client connection may have already timed out and these errors can be
// safely ignored
grpcErr := status.Convert(err)
if isClosing &&
grpcErr.Code() == codes.Unavailable &&
grpcErr.Message() == "transport is closing" {
return
}

slog.ErrorContext(
stream.Context(),
"Send failed",
"err",
err,
"streamid",
streamID,
"sequenceNumber",
sequenceNumber,
)
}
}

Expand All @@ -77,7 +100,9 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE
case err := <-errCh:
if err == io.EOF {
slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context())

if eventCh == nil {
slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context())
return nil
}

Expand All @@ -100,7 +125,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE

// Ack all events
for _, seqNr := range seqNrs {
ack(streamID, seqNr)
ack(streamID, seqNr, true)
}

return nil
Expand Down
42 changes: 27 additions & 15 deletions pkg/processing/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,35 @@ type SaveActor struct {

// SaveSummary saves an invocation summary to the database.
func (act SaveActor) SaveSummary(ctx context.Context, summary *summary.Summary) (*ent.BazelInvocation, error) {
// errors := []error{}
if summary.InvocationID == "" {
slog.ErrorContext(ctx, "No Invocation ID Found on summary", "ctx.Err()", ctx.Err())
return nil, fmt.Errorf("no Invocation ID Found on summary")
}
eventFile, err := act.saveEventFile(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save event file", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save EventFile: %w", err)
}
buildRecord, err := act.findOrCreateBuild(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to find or create build", "id", summary.InvocationID, "err", err)
return nil, err
slog.ErrorContext(ctx, "failed to find or create build", "summary.InvocationId", summary.InvocationID, "err", err)
}
metrics, err := act.saveMetrics(ctx, summary.Metrics)
if err != nil {
slog.ErrorContext(ctx, "failed to save metrics", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save Metrics: %w", err)
}
targets, err := act.saveTargets(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save targets", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save Targets: %w", err)
}
tests, err := act.saveTests(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save tests", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save test results: %w", err)
tests = nil
}
sourcecontrol, err := act.saveSourceControl(ctx, summary)
if err != nil {
slog.ErrorContext(ctx, "failed to save source control information", "id", summary.InvocationID, "err", err)
return nil, fmt.Errorf("could not save source control information: %w", err)
}
bazelInvocation, err := act.saveBazelInvocation(ctx, summary, eventFile, buildRecord, metrics, tests, targets, sourcecontrol)
if err != nil {
Expand Down Expand Up @@ -139,6 +139,9 @@ func (act SaveActor) saveBazelInvocation(
targets []*ent.TargetPair,
sourcecontrol *ent.SourceControl,
) (*ent.BazelInvocation, error) {
if summary == nil {
return nil, fmt.Errorf("no summary object provided")
}
uniqueID, err := uuid.Parse(summary.InvocationID)
if err != nil {
return nil, err
Expand All @@ -158,15 +161,24 @@ func (act SaveActor) saveBazelInvocation(
SetConfigurationMnemonic(summary.ConfigrationMnemonic).
SetPlatformName(summary.PlatformName).
SetNumFetches(summary.NumFetches).
SetBuildLogs(summary.BuildLogs.String()).
SetUserLdap(summary.UserLDAP).
SetRelatedFiles(summary.RelatedFiles).
SetEventFile(eventFile).
SetMetrics(metrics).
SetSourceControl(sourcecontrol).
AddTestCollection(tests...).
AddTargets(targets...)
SetRelatedFiles(summary.RelatedFiles)

if eventFile != nil {
create = create.SetEventFile(eventFile)
}
if metrics != nil {
create = create.SetMetrics(metrics)
}
if tests != nil {
create = create.AddTestCollection(tests...)
}
if targets != nil {
create = create.AddTargets(targets...)
}
if sourcecontrol != nil {
create = create.SetSourceControl(sourcecontrol)
}
if buildRecord != nil {
create = create.SetBuild(buildRecord)
}
Expand Down Expand Up @@ -210,7 +222,7 @@ func (act SaveActor) saveTestFiles(ctx context.Context, files []summary.TestFile
func (act SaveActor) saveOutputGroup(ctx context.Context, ouputGroup summary.OutputGroup) (*ent.OutputGroup, error) {
inlineFiles, err := act.saveTestFiles(ctx, ouputGroup.InlineFiles)
if err != nil {
slog.ErrorContext(ctx, "failed to save output group", "id", "err", err)
slog.ErrorContext(ctx, "failed to save output group", "err", err)
return nil, err
}

Expand Down
Loading