Skip to content

Commit

Permalink
Draft implement compaction support into robustness tests
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Apr 2, 2024
1 parent caca515 commit d51fd97
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 27 deletions.
8 changes: 5 additions & 3 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ var (
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
BackendAfterWritebackBufPanic,
//CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
//CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
//CompactAfterCommitBatchPanic,
RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
BeforeApplyOneConfChangeSleep,
Expand Down
18 changes: 9 additions & 9 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)

watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
//watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
//validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client)

Expand Down Expand Up @@ -121,19 +121,19 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
lg.Info("Finished injecting failures")
return nil
})
maxRevisionChan := make(chan int64, 1)
//maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
//defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
//maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
return nil
})
g.Go(func() error {
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
return nil
})
//g.Go(func() error {
// watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
// return nil
//})
g.Wait()
return append(operationReport, watchReport...)
}
Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
case Txn:
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
case LeaseGrant, LeaseRevoke, Defragment:
case LeaseGrant, LeaseRevoke, Defragment, Compact:
if response.Revision == 0 {
return "ok"
}
Expand Down Expand Up @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
22 changes: 18 additions & 4 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand Down Expand Up @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -234,6 +238,7 @@ type RequestType string
const (
Range RequestType = "range"
Txn RequestType = "txn"
Compact RequestType = "compact"
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Expand All @@ -246,6 +251,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

type RangeRequest struct {
Expand All @@ -269,6 +275,13 @@ type DeleteOptions struct {
Key string
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}

type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
Expand Down Expand Up @@ -322,6 +335,7 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
Revision int64
}

Expand Down
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
})
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: compactResponse(revision),
Return: end.Nanoseconds(),
})
}

func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
Expand Down Expand Up @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func deleteRequest(key string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}}
}
Expand All @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
}
Expand Down
14 changes: 14 additions & 0 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package traffic
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -135,6 +136,19 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del
return resp, err
}

func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
fmt.Printf("Compact %d\n", rev)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
if err == nil || !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
}
return resp, err
}

func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
txn := c.client.Txn(ctx).If(
conditions...,
Expand Down
20 changes: 11 additions & 9 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var (
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesUpdate, weight: 89},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
{choice: KubernetesCompact, weight: 1},
},
}
)
Expand Down Expand Up @@ -166,6 +167,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
case KubernetesCompact:
err = kc.Compact(writeCtx, rev)
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
Expand Down Expand Up @@ -208,9 +211,10 @@ func (t kubernetesTraffic) generateKey() string {
type KubernetesRequestType string

const (
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesCompact KubernetesRequestType = "compact"
)

type kubernetesClient struct {
Expand Down Expand Up @@ -242,11 +246,9 @@ func (k kubernetesClient) OptimisticCreate(ctx context.Context, key, value strin
return err
}

func (k kubernetesClient) RequestProgress(ctx context.Context) error {
// Kubernetes makes RequestProgress calls by requiring a leader to be
// present in the cluster:
// https://github.com/kubernetes/kubernetes/blob/2016fab3085562b4132e6d3774b6ded5ba9939fd/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L87
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
_, err := k.client.Compact(ctx, rev)
return err
}

// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
Name: "LowTraffic",
MinimalQPS: 100,
MaximalQPS: 200,
ClientCount: 8,
ClientCount: 6,
MaxNonUniqueRequestConcurrency: 3,
}
HighTrafficProfile = Profile{
Expand Down

0 comments on commit d51fd97

Please sign in to comment.