Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125618: roachprod: handle insecure cluster in delete cluster config r=DarrylWong a=nameisbhaskar

If a insecure cluster is created, the corresponding config is created in prometheus. This config is created in a separate location that is different from the secure clusters. When this cluster is destroyed, the ?insecure=true flag has to be passed so that the specific config file gets deleted. This is not done today.

This change passes the ?insecure=true for insecure clusters

Fixes: cockroachdb#125616
Epic: None

125832: streamingccl/logical: dedup code in query construction r=yuzefovich a=stevendanna

This dedups some code during insert query construction and makes use of the `excluded` table in the ON CONFLICT clause rather than referencing the positional arguments again.

I don't expect this to have much of an impact on query performance. Rather, I think this will simplify some other potential changes to query construction.

Interestingly, the microbenchmark does show a small reduction in allocs, but I haven't tested this at larger scales yet:

```
                    │   old.txt    │            new.txt            │
                    │    sec/op    │   sec/op     vs base          │
LastWriteWinsInsert   749.8µ ± 11%   718.4µ ± 5%  ~ (p=0.280 n=10)

                    │   old.txt    │               new.txt               │
                    │     B/op     │     B/op      vs base               │
LastWriteWinsInsert   251.3Ki ± 1%   244.5Ki ± 1%  -2.68% (p=0.000 n=10)

                    │   old.txt   │              new.txt               │
                    │  allocs/op  │  allocs/op   vs base               │
LastWriteWinsInsert   1.881k ± 0%   1.741k ± 0%  -7.42% (p=0.000 n=10)
```

Epic: none
Release note: None

Co-authored-by: Bhaskarjyoti Bora <bhaskar.bora@cockroachlabs.com>
Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
3 people committed Jun 18, 2024
3 parents 82a79b0 + 6db5100 + 593421b commit 6c94943
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 48 deletions.
79 changes: 37 additions & 42 deletions pkg/ccl/streamingccl/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,77 +199,72 @@ func makeInsertQueries(
var onConflictUpdateClause strings.Builder
argIdx := 1
seenIds := make(map[catid.ColumnID]struct{})
addColumn := func(colName string, colID catid.ColumnID) {
// We will set crdb_internal_origin_timestamp ourselves from the MVCC timestamp of the incoming datum.
// We should never see this on the rangefeed as a non-null value as that would imply we've looped data around.
if colName == "crdb_internal_origin_timestamp" {
return
}
if _, seen := seenIds[colID]; seen {
return
}

publicColumns := td.PublicColumns()
colOrd := catalog.ColumnIDToOrdinalMap(publicColumns)
addColumnByNameNoCheck := func(colName string) {
if argIdx == 1 {
columnNames.WriteString(colName)
fmt.Fprintf(&valueStrings, "$%d", argIdx)
fmt.Fprintf(&onConflictUpdateClause, "%s = $%d", colName, argIdx)
fmt.Fprintf(&onConflictUpdateClause, "%s = excluded.%[1]s", colName)
} else {
fmt.Fprintf(&columnNames, ", %s", colName)
fmt.Fprintf(&valueStrings, ", $%d", argIdx)
fmt.Fprintf(&onConflictUpdateClause, ",\n%s = $%d", colName, argIdx)
fmt.Fprintf(&onConflictUpdateClause, ",\n%s = excluded.%[1]s", colName)
}
seenIds[colID] = struct{}{}
argIdx++
}

publicColumns := td.PublicColumns()
colOrd := catalog.ColumnIDToOrdinalMap(publicColumns)
primaryIndex := td.GetPrimaryIndex()

for i := 0; i < primaryIndex.NumKeyColumns(); i++ {
colID := primaryIndex.GetKeyColumnID(i)
addColumnByID := func(colID catid.ColumnID) error {
ord, ok := colOrd.Get(colID)
if !ok {
return errors.AssertionFailedf("expected to find column %d", colID)
}
col := publicColumns[ord]
if col.IsComputed() {
continue
return nil
}
addColumn(col.GetName(), col.GetID())
colName := col.GetName()
// We will set crdb_internal_origin_timestamp ourselves from the MVCC timestamp of the incoming datum.
// We should never see this on the rangefeed as a non-null value as that would imply we've looped data around.
if colName == "crdb_internal_origin_timestamp" {
return nil
}
if _, seen := seenIds[colID]; seen {
return nil
}
addColumnByNameNoCheck(colName)
seenIds[colID] = struct{}{}
return nil
}

for i, colName := range family.ColumnNames {
colID := family.ColumnIDs[i]
ord, ok := colOrd.Get(colID)
if !ok {
return errors.AssertionFailedf("expected to find column %d", colID)
}
col := publicColumns[ord]
if col.IsComputed() {
continue
primaryIndex := td.GetPrimaryIndex()
for i := 0; i < primaryIndex.NumKeyColumns(); i++ {
if err := addColumnByID(primaryIndex.GetKeyColumnID(i)); err != nil {
return err
}
addColumn(colName, family.ColumnIDs[i])
}

for i := range family.ColumnNames {
if err := addColumnByID(family.ColumnIDs[i]); err != nil {
return err
}
}
addColumnByNameNoCheck("crdb_internal_origin_timestamp")
var err error
originTSIdx := argIdx
baseQuery := `
INSERT INTO %s (%s, crdb_internal_origin_timestamp)
VALUES (%s, $%d)
const baseQuery = `
INSERT INTO %s AS t (%s)
VALUES (%s)
ON CONFLICT ON CONSTRAINT %s
DO UPDATE SET
%s,
crdb_internal_origin_timestamp=$%[4]d
WHERE (%[1]s.crdb_internal_mvcc_timestamp <= $%[4]d
AND %[1]s.crdb_internal_origin_timestamp IS NULL)
OR (%[1]s.crdb_internal_origin_timestamp <= $%[4]d
AND %[1]s.crdb_internal_origin_timestamp IS NOT NULL)`
%s
WHERE (t.crdb_internal_mvcc_timestamp <= excluded.crdb_internal_origin_timestamp
AND t.crdb_internal_origin_timestamp IS NULL)
OR (t.crdb_internal_origin_timestamp <= excluded.crdb_internal_origin_timestamp
AND t.crdb_internal_origin_timestamp IS NOT NULL)`
queries[family.ID], err = parser.ParseOne(fmt.Sprintf(baseQuery,
fqTableName,
columnNames.String(),
valueStrings.String(),
originTSIdx,
td.GetPrimaryIndex().GetName(),
onConflictUpdateClause.String(),
))
Expand Down
15 changes: 13 additions & 2 deletions pkg/roachprod/cloud/cluster_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"regexp"
"sort"
"strings"
"text/tabwriter"
"time"

Expand Down Expand Up @@ -387,8 +388,18 @@ func DestroyCluster(l *logger.Logger, c *Cluster) error {
if _, ok := promhelperclient.SupportedPromProjects[node.Project]; ok &&
node.Provider == gce.ProviderName {
if err := promhelperclient.NewPromClient().DeleteClusterConfig(context.Background(),
c.Name, false, l); err != nil {
l.Errorf("Failed to delete the cluster config: %v", err)
c.Name, false, false /* insecure */, l); err != nil {
// TODO(bhaskar): Obtain secure cluster information.
// Cluster does not have the information on secure or not. So, we retry as insecure
// if delete fails with cluster as secure
if strings.Contains(err.Error(), "request failed with status 404") {
if err = promhelperclient.NewPromClient().DeleteClusterConfig(context.Background(),
c.Name, false, true /* insecure */, l); err != nil {
l.Errorf("Failed to delete the cluster config with cluster as insecure and secure: %v", err)
}
} else {
l.Errorf("Failed to delete the cluster config with cluster as secure: %v", err)
}
}
break
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/roachprod/promhelperclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *PromClient) UpdatePrometheusTargets(

// DeleteClusterConfig deletes the cluster config in the promUrl
func (c *PromClient) DeleteClusterConfig(
ctx context.Context, clusterName string, forceFetchCreds bool, l *logger.Logger,
ctx context.Context, clusterName string, forceFetchCreds, insecure bool, l *logger.Logger,
) error {

if c.disabled {
Expand All @@ -156,6 +156,9 @@ func (c *PromClient) DeleteClusterConfig(
return err
}
url := getUrl(c.promUrl, clusterName)
if insecure {
url = fmt.Sprintf("%s?insecure=true", url)
}
l.Printf("invoking DELETE for URL: %s", url)
h := &http.Header{}
h.Set("Authorization", token)
Expand All @@ -166,7 +169,7 @@ func (c *PromClient) DeleteClusterConfig(
if response.StatusCode != http.StatusNoContent {
defer func() { _ = response.Body.Close() }()
if response.StatusCode == http.StatusUnauthorized && !forceFetchCreds {
return c.DeleteClusterConfig(ctx, clusterName, true, l)
return c.DeleteClusterConfig(ctx, clusterName, true, insecure, l)
}
body, err := io.ReadAll(response.Body)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions pkg/roachprod/promhelperclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestDeleteClusterConfig(t *testing.T) {
Body: io.NopCloser(strings.NewReader("failed")),
}, nil
}
err := c.DeleteClusterConfig(ctx, "c1", false, l)
err := c.DeleteClusterConfig(ctx, "c1", false, false, l)
require.NotNil(t, err)
require.Equal(t, "request failed with status 400 and error failed", err.Error())
})
Expand All @@ -119,7 +119,18 @@ func TestDeleteClusterConfig(t *testing.T) {
StatusCode: 204,
}, nil
}
err := c.DeleteClusterConfig(ctx, "c1", false, l)
err := c.DeleteClusterConfig(ctx, "c1", false, false /* insecure */, l)
require.Nil(t, err)
})
t.Run("DeleteClusterConfig insecure succeeds", func(t *testing.T) {
c.httpDelete = func(ctx context.Context, url string, h *http.Header) (
resp *http.Response, err error) {
require.Equal(t, fmt.Sprintf("%s?insecure=true", getUrl(promUrl, "c1")), url)
return &http.Response{
StatusCode: 204,
}, nil
}
err := c.DeleteClusterConfig(ctx, "c1", false, true /* insecure */, l)
require.Nil(t, err)
})
}
Expand Down

0 comments on commit 6c94943

Please sign in to comment.