diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 4b0b4ba812b..3ee77965b4f 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -547,14 +547,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err != nil { return tabletPickerErr(err) } - var tabletPortStr string - if tablet.PortMap["grpc"] != 0 { - tabletPortStr = fmt.Sprintf(":%d", tablet.PortMap["grpc"]) - } - tabletMsgDetails := fmt.Sprintf("%s (currently at %s%s)", - topoproto.TabletAliasString(tablet.Alias), tablet.Hostname, tabletPortStr) + tabletAliasString := topoproto.TabletAliasString(tablet.Alias) log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)", - vs.tabletType.String(), tabletMsgDetails, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + vs.tabletType.String(), tabletAliasString, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) target := &querypb.Target{ Keyspace: sgtid.Keyspace, @@ -565,7 +560,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(ctx, tablet.Alias, target) if err != nil { log.Errorf(err.Error()) - return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletMsgDetails) + return vterrors.Wrapf(err, "failed to get tablet connection to %s", tabletAliasString) } errCh := make(chan error, 1) @@ -574,9 +569,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error switch { case ctx.Err() != nil: - err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletMsgDetails) + err = vterrors.Wrapf(ctx.Err(), "context ended while streaming tablet health from %s", tabletAliasString) case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: - err = fmt.Errorf("health check failed on %s", tabletMsgDetails) + err = fmt.Errorf("health check failed on %s", tabletAliasString) case vs.tabletType != shr.Target.TabletType: err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) @@ -589,7 +584,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } if err != nil { log.Warningf("Tablet state changed: %s, attempting to restart", err) - err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletMsgDetails) + err = vterrors.Wrapf(err, "error streaming tablet health from %s", tabletAliasString) errCh <- err return err } @@ -614,7 +609,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha Options: options, } var vstreamCreatedOnce sync.Once - log.Infof("Starting to vstream from %s, with req %+v", tabletMsgDetails, req) + log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -628,10 +623,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha select { case <-ctx.Done(): return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", - tabletMsgDetails, sgtid.Keyspace, sgtid.Shard) + tabletAliasString, sgtid.Keyspace, sgtid.Shard) case streamErr := <-errCh: return vterrors.Wrapf(streamErr, "error streaming from tablet %s in %s/%s", - tabletMsgDetails, sgtid.Keyspace, sgtid.Shard) + tabletAliasString, sgtid.Keyspace, sgtid.Shard) case <-journalDone: // Unreachable. // This can happen if a server misbehaves and does not end @@ -641,7 +636,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard) - sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletMsgDetails) + sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString) sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { @@ -721,7 +716,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha je, err := vs.getJournalEvent(ctx, sgtid, journal) if err != nil { return vterrors.Wrapf(err, "error getting journal event for shard GTID %+v on tablet %s", - sgtid, tabletMsgDetails) + sgtid, tabletAliasString) } if je != nil { var endTimer *time.Timer @@ -742,7 +737,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha select { case <-ctx.Done(): return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s", - sgtid, tabletMsgDetails) + sgtid, tabletAliasString) case <-journalDone: if endTimer != nil { <-endTimer.C @@ -770,14 +765,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err == nil { // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly on tablet %s in %s/%s", - tabletMsgDetails, sgtid.Keyspace, sgtid.Shard) + tabletAliasString, sgtid.Keyspace, sgtid.Shard) } retry, ignoreTablet := vs.shouldRetry(err) if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s", - sgtid.Keyspace, sgtid.Shard, tabletMsgDetails) + sgtid.Keyspace, sgtid.Shard, tabletAliasString) } if ignoreTablet { ignoreTablets = append(ignoreTablets, tablet.GetAlias()) @@ -788,7 +783,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) return vterrors.Wrapf(err, "persistent error in vstream for %s/%s on tablet %s; giving up", - sgtid.Keyspace, sgtid.Shard, tabletMsgDetails) + sgtid.Keyspace, sgtid.Shard, tabletAliasString) } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) }