Skip to content

Commit

Permalink
Revert "Include tablet process details (hostname and port) in messages"
Browse files Browse the repository at this point in the history
This reverts commit a136b86.

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 4, 2025
1 parent be74e0c commit 55e2a5e
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,14 +547,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err != nil {
return tabletPickerErr(err)
}
var tabletPortStr string
if tablet.PortMap["grpc"] != 0 {
tabletPortStr = fmt.Sprintf(":%d", tablet.PortMap["grpc"])
}
tabletMsgDetails := fmt.Sprintf("%s (currently at %s%s)",
topoproto.TabletAliasString(tablet.Alias), tablet.Hostname, tabletPortStr)
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), tabletMsgDetails, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
vs.tabletType.String(), tabletAliasString, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))

target := &querypb.Target{
Keyspace: sgtid.Keyspace,
Expand All @@ -565,7 +560,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target)
if err != nil {
log.Errorf(err.Error())
return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletMsgDetails)
return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletAliasString)
}

errCh := make(chan error, 1)
Expand All @@ -574,9 +569,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
switch {
case ctx.Err() != nil:
err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletMsgDetails)
err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletAliasString)
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", tabletMsgDetails)
err = fmt.Errorf("health check failed on %s", tabletAliasString)
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
Expand All @@ -589,7 +584,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletMsgDetails)
err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletAliasString)
errCh <- err
return err
}
Expand All @@ -614,7 +609,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
Options: options,
}
var vstreamCreatedOnce sync.Once
log.Infof("Starting to vstream from %s, with req %+v", tabletMsgDetails, req)
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand All @@ -628,10 +623,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
tabletMsgDetails, sgtid.Keyspace, sgtid.Shard)
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
case streamErr := <-errCh:
return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s",
tabletMsgDetails, sgtid.Keyspace, sgtid.Shard)
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
case <-journalDone:
// Unreachable.
// This can happen if a server misbehaves and does not end
Expand All @@ -641,7 +636,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard)
sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletMsgDetails)
sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString)

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for i, event := range events {
Expand Down Expand Up @@ -721,7 +716,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
je, err := vs.getJournalEvent(ctx, sgtid, journal)
if err != nil {
return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s",
sgtid, tabletMsgDetails)
sgtid, tabletAliasString)
}
if je != nil {
var endTimer *time.Timer
Expand All @@ -742,7 +737,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s",
sgtid, tabletMsgDetails)
sgtid, tabletAliasString)
case <-journalDone:
if endTimer != nil {
<-endTimer.C
Expand Down Expand Up @@ -770,14 +765,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err == nil {
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s in %s/%s",
tabletMsgDetails, sgtid.Keyspace, sgtid.Shard)
tabletAliasString, sgtid.Keyspace, sgtid.Shard)
}

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s",
sgtid.Keyspace, sgtid.Shard, tabletMsgDetails)
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
Expand All @@ -788,7 +783,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up",
sgtid.Keyspace, sgtid.Shard, tabletMsgDetails)
sgtid.Keyspace, sgtid.Shard, tabletAliasString)
}
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}
Expand Down

0 comments on commit 55e2a5e

Please sign in to comment.