Skip to content

Commit

Permalink
Patch delete request based on their uniqness in client requests
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Jun 27, 2024
1 parent a7c3843 commit b2a21be
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 69 deletions.
171 changes: 104 additions & 67 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
return ops
}

func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount map[keyValue]int64) []porcupine.Operation {
func patchOperations(operations []porcupine.Operation, clientRequestCount, watchRevision, returnTime, persistedRequestCount *requestStats) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))

for _, op := range operations {
Expand All @@ -58,41 +58,62 @@ func patchOperations(operations []porcupine.Operation, clientRequestCount, watch
newOperations = append(newOperations, op)
continue
}
txnPersisted := false
txnCanBeDiscarded := true
txnUniquellyPersisted := false
var txnRevision int64 = 0
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
if count := clientRequestCount[kv]; count == 1 {
revision, ok := watchRevision[kv]
unique := clientRequestCount.Put[kv] == 1
if unique {
revision, ok := watchRevision.Put[kv]
if ok {
txnRevision = revision
}
if t, ok := returnTime[kv]; ok && t < op.Return {
if t, ok := returnTime.Put[kv]; ok && t < op.Return {
op.Return = t
}
}
_, ok := persistedRequestCount[kv]
_, ok := persistedRequestCount.Put[kv]
if ok {
txnPersisted = true
txnCanBeDiscarded = false
if unique {
txnUniquellyPersisted = true
}
}
case model.DeleteOperation:
unique := clientRequestCount.Delete[operation.Delete] == 1
if unique {
revision, ok := watchRevision.Delete[operation.Delete]
if ok {
txnRevision = revision
}
if t, ok := returnTime.Delete[operation.Delete]; ok && t < op.Return {
op.Return = t
}
}
_, ok := persistedRequestCount.Delete[operation.Delete]
if ok {
txnCanBeDiscarded = false
if unique {
txnUniquellyPersisted = true
}
}
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
}
}
if isUniqueTxn(request.Txn, clientRequestCount) {
if !txnPersisted {
// Remove non persisted operations
continue
if txnCanBeDiscarded {
// Remove non persisted operations
continue
}
if txnUniquellyPersisted {
if txnRevision != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision}
} else {
if txnRevision != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
}
// Leave operation as it is as we cannot discard it.
Expand All @@ -101,45 +122,31 @@ func patchOperations(operations []porcupine.Operation, clientRequestCount, watch
return newOperations
}

func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool {
return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount)
}

func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool {
hasUniqueWrite := false
hasWrite := false
for _, operation := range ops {
switch operation.Type {
case model.PutOperation:
hasWrite = true
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
if count := clientRequestCount[kv]; count == 1 {
hasUniqueWrite = true
}
case model.DeleteOperation:
hasWrite = true
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
}
func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) *requestStats {
earliestReturnTime := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
return hasUniqueWrite || !hasWrite
}

func returnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 {
earliestReturnTime := map[keyValue]int64{}
var lastReturnTime int64 = 0
for _, op := range allOperations {
request := op.Input.(model.EtcdRequest)
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if t, ok := earliestReturnTime[kv]; !ok || t > op.Return {
earliestReturnTime[kv] = op.Return
switch etcdOp.Type {
case model.PutOperation:
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if t, ok := earliestReturnTime.Put[kv]; !ok || t > op.Return {
earliestReturnTime.Put[kv] = op.Return
}
case model.DeleteOperation:
if t, ok := earliestReturnTime.Delete[etcdOp.Delete]; !ok || t > op.Return {
earliestReturnTime.Delete[etcdOp.Delete] = op.Return
}
earliestReturnTime.Delete[etcdOp.Delete] = op.Return
case model.RangeOperation:
default:
panic(fmt.Sprintf("Unknown operation type: %q", etcdOp.Type))
}
}
case model.Range:
Expand All @@ -163,10 +170,14 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime[kv] = resp.Time.Nanoseconds()
if t, ok := earliestReturnTime.Put[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime.Put[kv] = resp.Time.Nanoseconds()
}
case model.DeleteOperation:
del := model.DeleteOptions{Key: event.Key}
if t, ok := earliestReturnTime.Delete[del]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime.Delete[del] = resp.Time.Nanoseconds()
}
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
Expand All @@ -181,14 +192,23 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo
case model.Txn:
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
returnTime, ok := earliestReturnTime[kv]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime[kv] = lastReturnTime
switch op.Type {
case model.PutOperation:
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
returnTime, ok := earliestReturnTime.Put[kv]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime.Put[kv] = lastReturnTime
}
case model.DeleteOperation:
returnTime, ok := earliestReturnTime.Delete[op.Delete]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime.Delete[op.Delete] = lastReturnTime
}
case model.RangeOperation:
default:
panic(fmt.Sprintf("Unknown operation type: %q", op.Type))
}
}
case model.LeaseGrant:
Expand All @@ -201,8 +221,11 @@ func returnTime(allOperations []porcupine.Operation, reports []report.ClientRepo
return earliestReturnTime
}

func countClientRequests(reports []report.ClientReport) map[keyValue]int64 {
counter := map[keyValue]int64{}
func countClientRequests(reports []report.ClientReport) *requestStats {
counter := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
for _, client := range reports {
for _, op := range client.KeyValue {
request := op.Input.(model.EtcdRequest)
Expand All @@ -212,23 +235,27 @@ func countClientRequests(reports []report.ClientReport) map[keyValue]int64 {
return counter
}

func countPersistedRequests(requests []model.EtcdRequest) map[keyValue]int64 {
counter := map[keyValue]int64{}
func countPersistedRequests(requests []model.EtcdRequest) *requestStats {
counter := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
for _, request := range requests {
countRequest(counter, request)
}
return counter
}

func countRequest(counter map[keyValue]int64, request model.EtcdRequest) {
func countRequest(counter *requestStats, request model.EtcdRequest) {
switch request.Type {
case model.Txn:
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
counter[kv] += 1
counter.Put[kv] += 1
case model.DeleteOperation:
counter.Delete[operation.Delete] += 1
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
Expand All @@ -244,8 +271,11 @@ func countRequest(counter map[keyValue]int64, request model.EtcdRequest) {
}
}

func requestRevision(reports []report.ClientReport) map[keyValue]int64 {
requestRevision := map[keyValue]int64{}
func requestRevision(reports []report.ClientReport) *requestStats {
requestRevision := &requestStats{
Put: map[keyValue]int64{},
Delete: map[model.DeleteOptions]int64{},
}
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
Expand All @@ -254,8 +284,10 @@ func requestRevision(reports []report.ClientReport) map[keyValue]int64 {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
requestRevision[kv] = event.Revision
requestRevision.Put[kv] = event.Revision
case model.DeleteOperation:
del := model.DeleteOptions{Key: event.Key}
requestRevision.Delete[del] = event.Revision
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
Expand All @@ -266,6 +298,11 @@ func requestRevision(reports []report.ClientReport) map[keyValue]int64 {
return requestRevision
}

type requestStats struct {
Put map[keyValue]int64
Delete map[model.DeleteOptions]int64
}

type keyValue struct {
Key string
Value model.ValueOrHash
Expand Down
53 changes: 51 additions & 2 deletions tests/robustness/validate/patch_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put with lease remains if there is a matching event, return time untouched",
name: "failed put with lease remains if there is a matching persisted request, return time untouched",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed"))
},
Expand Down Expand Up @@ -220,15 +220,64 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed delete remains, time untouched regardless of persisted event and watch",
name: "failed delete is dropped",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{},
expectedRemainingOperations: []porcupine.Operation{},
},
{
name: "failed delete remains if there is a matching persisted request, time untouched",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
},
},
{
name: "failed delete remains if there is a matching persisted request, uniqueness allows return time to be based on next persisted request",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
putRequest("key", "value"),
},
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed delete remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
},
watchOperations: watchResponse(3, deleteEvent("key", 2)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
name: "failed delete remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendDelete("key", 1, 2, nil, errors.New("failed"))
h.AppendDelete("key", 3, 4, &clientv3.DeleteResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
deleteRequest("key"),
deleteRequest("key"),
},
watchOperations: watchResponse(3, deleteEvent("key", 2), deleteEvent("key", 3)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
Expand Down

0 comments on commit b2a21be

Please sign in to comment.