diff --git a/pkg/ccl/streamingccl/logical/lww_row_processor.go b/pkg/ccl/streamingccl/logical/lww_row_processor.go index 8a305db64871..1f491cbd3755 100644 --- a/pkg/ccl/streamingccl/logical/lww_row_processor.go +++ b/pkg/ccl/streamingccl/logical/lww_row_processor.go @@ -199,77 +199,72 @@ func makeInsertQueries( var onConflictUpdateClause strings.Builder argIdx := 1 seenIds := make(map[catid.ColumnID]struct{}) - addColumn := func(colName string, colID catid.ColumnID) { - // We will set crdb_internal_origin_timestamp ourselves from the MVCC timestamp of the incoming datum. - // We should never see this on the rangefeed as a non-null value as that would imply we've looped data around. - if colName == "crdb_internal_origin_timestamp" { - return - } - if _, seen := seenIds[colID]; seen { - return - } + publicColumns := td.PublicColumns() + colOrd := catalog.ColumnIDToOrdinalMap(publicColumns) + addColumnByNameNoCheck := func(colName string) { if argIdx == 1 { columnNames.WriteString(colName) fmt.Fprintf(&valueStrings, "$%d", argIdx) - fmt.Fprintf(&onConflictUpdateClause, "%s = $%d", colName, argIdx) + fmt.Fprintf(&onConflictUpdateClause, "%s = excluded.%[1]s", colName) } else { fmt.Fprintf(&columnNames, ", %s", colName) fmt.Fprintf(&valueStrings, ", $%d", argIdx) - fmt.Fprintf(&onConflictUpdateClause, ",\n%s = $%d", colName, argIdx) + fmt.Fprintf(&onConflictUpdateClause, ",\n%s = excluded.%[1]s", colName) } - seenIds[colID] = struct{}{} argIdx++ } - - publicColumns := td.PublicColumns() - colOrd := catalog.ColumnIDToOrdinalMap(publicColumns) - primaryIndex := td.GetPrimaryIndex() - - for i := 0; i < primaryIndex.NumKeyColumns(); i++ { - colID := primaryIndex.GetKeyColumnID(i) + addColumnByID := func(colID catid.ColumnID) error { ord, ok := colOrd.Get(colID) if !ok { return errors.AssertionFailedf("expected to find column %d", colID) } col := publicColumns[ord] if col.IsComputed() { - continue + return nil } - addColumn(col.GetName(), col.GetID()) + colName := col.GetName() + // We will set crdb_internal_origin_timestamp ourselves from the MVCC timestamp of the incoming datum. + // We should never see this on the rangefeed as a non-null value as that would imply we've looped data around. + if colName == "crdb_internal_origin_timestamp" { + return nil + } + if _, seen := seenIds[colID]; seen { + return nil + } + addColumnByNameNoCheck(colName) + seenIds[colID] = struct{}{} + return nil } - for i, colName := range family.ColumnNames { - colID := family.ColumnIDs[i] - ord, ok := colOrd.Get(colID) - if !ok { - return errors.AssertionFailedf("expected to find column %d", colID) - } - col := publicColumns[ord] - if col.IsComputed() { - continue + primaryIndex := td.GetPrimaryIndex() + for i := 0; i < primaryIndex.NumKeyColumns(); i++ { + if err := addColumnByID(primaryIndex.GetKeyColumnID(i)); err != nil { + return err } - addColumn(colName, family.ColumnIDs[i]) } + for i := range family.ColumnNames { + if err := addColumnByID(family.ColumnIDs[i]); err != nil { + return err + } + } + addColumnByNameNoCheck("crdb_internal_origin_timestamp") var err error - originTSIdx := argIdx - baseQuery := ` -INSERT INTO %s (%s, crdb_internal_origin_timestamp) -VALUES (%s, $%d) + const baseQuery = ` +INSERT INTO %s AS t (%s) +VALUES (%s) ON CONFLICT ON CONSTRAINT %s DO UPDATE SET -%s, -crdb_internal_origin_timestamp=$%[4]d -WHERE (%[1]s.crdb_internal_mvcc_timestamp <= $%[4]d - AND %[1]s.crdb_internal_origin_timestamp IS NULL) - OR (%[1]s.crdb_internal_origin_timestamp <= $%[4]d - AND %[1]s.crdb_internal_origin_timestamp IS NOT NULL)` +%s +WHERE (t.crdb_internal_mvcc_timestamp <= excluded.crdb_internal_origin_timestamp + AND t.crdb_internal_origin_timestamp IS NULL) + OR (t.crdb_internal_origin_timestamp <= excluded.crdb_internal_origin_timestamp + AND t.crdb_internal_origin_timestamp IS NOT NULL)` queries[family.ID], err = parser.ParseOne(fmt.Sprintf(baseQuery, fqTableName, columnNames.String(), valueStrings.String(), - originTSIdx, td.GetPrimaryIndex().GetName(), onConflictUpdateClause.String(), )) diff --git a/pkg/roachprod/cloud/cluster_cloud.go b/pkg/roachprod/cloud/cluster_cloud.go index 5feb4cc35334..abb9b8aecfd8 100644 --- a/pkg/roachprod/cloud/cluster_cloud.go +++ b/pkg/roachprod/cloud/cluster_cloud.go @@ -16,6 +16,7 @@ import ( "fmt" "regexp" "sort" + "strings" "text/tabwriter" "time" @@ -387,8 +388,18 @@ func DestroyCluster(l *logger.Logger, c *Cluster) error { if _, ok := promhelperclient.SupportedPromProjects[node.Project]; ok && node.Provider == gce.ProviderName { if err := promhelperclient.NewPromClient().DeleteClusterConfig(context.Background(), - c.Name, false, l); err != nil { - l.Errorf("Failed to delete the cluster config: %v", err) + c.Name, false, false /* insecure */, l); err != nil { + // TODO(bhaskar): Obtain secure cluster information. + // Cluster does not have the information on secure or not. So, we retry as insecure + // if delete fails with cluster as secure + if strings.Contains(err.Error(), "request failed with status 404") { + if err = promhelperclient.NewPromClient().DeleteClusterConfig(context.Background(), + c.Name, false, true /* insecure */, l); err != nil { + l.Errorf("Failed to delete the cluster config with cluster as insecure and secure: %v", err) + } + } else { + l.Errorf("Failed to delete the cluster config with cluster as secure: %v", err) + } } break } diff --git a/pkg/roachprod/promhelperclient/client.go b/pkg/roachprod/promhelperclient/client.go index b864be16f792..24c04d53dc48 100644 --- a/pkg/roachprod/promhelperclient/client.go +++ b/pkg/roachprod/promhelperclient/client.go @@ -145,7 +145,7 @@ func (c *PromClient) UpdatePrometheusTargets( // DeleteClusterConfig deletes the cluster config in the promUrl func (c *PromClient) DeleteClusterConfig( - ctx context.Context, clusterName string, forceFetchCreds bool, l *logger.Logger, + ctx context.Context, clusterName string, forceFetchCreds, insecure bool, l *logger.Logger, ) error { if c.disabled { @@ -156,6 +156,9 @@ func (c *PromClient) DeleteClusterConfig( return err } url := getUrl(c.promUrl, clusterName) + if insecure { + url = fmt.Sprintf("%s?insecure=true", url) + } l.Printf("invoking DELETE for URL: %s", url) h := &http.Header{} h.Set("Authorization", token) @@ -166,7 +169,7 @@ func (c *PromClient) DeleteClusterConfig( if response.StatusCode != http.StatusNoContent { defer func() { _ = response.Body.Close() }() if response.StatusCode == http.StatusUnauthorized && !forceFetchCreds { - return c.DeleteClusterConfig(ctx, clusterName, true, l) + return c.DeleteClusterConfig(ctx, clusterName, true, insecure, l) } body, err := io.ReadAll(response.Body) if err != nil { diff --git a/pkg/roachprod/promhelperclient/client_test.go b/pkg/roachprod/promhelperclient/client_test.go index e5c927067862..d750b44c9a66 100644 --- a/pkg/roachprod/promhelperclient/client_test.go +++ b/pkg/roachprod/promhelperclient/client_test.go @@ -107,7 +107,7 @@ func TestDeleteClusterConfig(t *testing.T) { Body: io.NopCloser(strings.NewReader("failed")), }, nil } - err := c.DeleteClusterConfig(ctx, "c1", false, l) + err := c.DeleteClusterConfig(ctx, "c1", false, false, l) require.NotNil(t, err) require.Equal(t, "request failed with status 400 and error failed", err.Error()) }) @@ -119,7 +119,18 @@ func TestDeleteClusterConfig(t *testing.T) { StatusCode: 204, }, nil } - err := c.DeleteClusterConfig(ctx, "c1", false, l) + err := c.DeleteClusterConfig(ctx, "c1", false, false /* insecure */, l) + require.Nil(t, err) + }) + t.Run("DeleteClusterConfig insecure succeeds", func(t *testing.T) { + c.httpDelete = func(ctx context.Context, url string, h *http.Header) ( + resp *http.Response, err error) { + require.Equal(t, fmt.Sprintf("%s?insecure=true", getUrl(promUrl, "c1")), url) + return &http.Response{ + StatusCode: 204, + }, nil + } + err := c.DeleteClusterConfig(ctx, "c1", false, true /* insecure */, l) require.Nil(t, err) }) }