diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 895dba6a3f8..4c6d4794ff4 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -26,9 +26,10 @@ import ( func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { allOperations := relevantOperations(reports) putRevision := putRevision(reports) - putReturnTimeFromWatch := putReturnTimeFromWatch(reports) - putReturnTimeFromPersisted := putReturnTimeFromPersistedOperations(allOperations, persistedRequests) - return patchOperations(allOperations, putRevision, putReturnTimeFromWatch, putReturnTimeFromPersisted) + putReturnTime := putReturnTime(allOperations, reports, persistedRequests) + clientPutCount := countClientPuts(reports) + persistedPutCount := countPersistedPuts(persistedRequests) + return patchOperations(allOperations, putRevision, putReturnTime, clientPutCount, persistedPutCount) } func relevantOperations(reports []report.ClientReport) []porcupine.Operation { @@ -46,30 +47,6 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func putReturnTimeFromWatch(reports []report.ClientReport) map[keyValue]int64 { - earliestTime := map[keyValue]int64{} - for _, client := range reports { - for _, watch := range client.Watch { - for _, resp := range watch.Responses { - for _, event := range resp.Events { - switch event.Type { - case model.RangeOperation: - case model.PutOperation: - kv := keyValue{Key: event.Key, Value: event.Value} - if t, ok := earliestTime[kv]; !ok || t > resp.Time.Nanoseconds() { - earliestTime[kv] = resp.Time.Nanoseconds() - } - case model.DeleteOperation: - default: - panic(fmt.Sprintf("unknown event type %q", event.Type)) - } - } - } - } - } - return earliestTime -} - func putRevision(reports []report.ClientReport) map[keyValue]int64 { requestRevision := map[keyValue]int64{} for _, client := range reports { @@ -92,7 +69,7 @@ func putRevision(reports []report.ClientReport) map[keyValue]int64 { return requestRevision } -func patchOperations(operations []porcupine.Operation, watchRevision, putReturnTimeFromWatch, putReturnTimeFromPersisted map[keyValue]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, watchRevision, putReturnTime, clientPutCount, persistedPutCount map[keyValue]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) for _, op := range operations { @@ -109,14 +86,16 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT switch etcdOp.Type { case model.PutOperation: kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} + if _, ok := persistedPutCount[kv]; ok { + persisted = true + } + if count := clientPutCount[kv]; count != 1 { + continue + } if revision, ok := watchRevision[kv]; ok { txnRevision = revision } - if returnTime, ok := putReturnTimeFromWatch[kv]; ok { - op.Return = min(op.Return, returnTime) - } - if returnTime, ok := putReturnTimeFromPersisted[kv]; ok { - persisted = true + if returnTime, ok := putReturnTime[kv]; ok { op.Return = min(op.Return, returnTime) } case model.DeleteOperation: @@ -125,7 +104,7 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT panic(fmt.Sprintf("unknown operation type %q", etcdOp.Type)) } } - if isUniqueTxn(request.Txn) { + if isUniqueTxn(request.Txn, clientPutCount) { if !persisted { // Remove non persisted operations continue @@ -143,12 +122,12 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT return newOperations } -func isUniqueTxn(request *model.TxnRequest) bool { - return isUniqueOps(request.OperationsOnSuccess) && isUniqueOps(request.OperationsOnFailure) +func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool { + return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount) } -func isUniqueOps(ops []model.EtcdOperation) bool { - return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops) +func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool { + return hasUniqueWriteOperation(ops, clientRequestCount) || !hasWriteOperation(ops) } func hasWriteOperation(ops []model.EtcdOperation) bool { @@ -160,70 +139,88 @@ func hasWriteOperation(ops []model.EtcdOperation) bool { return false } -func hasUniqueWriteOperation(ops []model.EtcdOperation) bool { - for _, etcdOp := range ops { - if etcdOp.Type == model.PutOperation { - return true +func hasUniqueWriteOperation(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool { + for _, operation := range ops { + switch operation.Type { + case model.PutOperation: + kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} + if count := clientRequestCount[kv]; count == 1 { + return true + } + case model.DeleteOperation: + case model.RangeOperation: + default: + panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } return false } -func putReturnTimeFromPersistedOperations(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[keyValue]int64 { - putReturnTimes := putReturnTime(allOperations) - persisted := map[keyValue]int64{} - - lastReturnTime := maxReturnTime(putReturnTimes) - - for i := len(persistedRequests) - 1; i >= 0; i-- { - request := persistedRequests[i] +func putReturnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 { + earliestReturnTime := map[keyValue]int64{} + var lastReturnTime int64 + for _, op := range allOperations { + request := op.Input.(model.EtcdRequest) switch request.Type { case model.Txn: - hasPut := false - lastReturnTime-- - for _, op := range request.Txn.OperationsOnSuccess { - if op.Type != model.PutOperation { + for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if etcdOp.Type != model.PutOperation { continue } - kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} - if _, found := persisted[kv]; found { - panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op)) - } - hasPut = true - persisted[kv] = lastReturnTime - } - if hasPut { - newReturnTime := returnTimeFromRequest(putReturnTimes, request) - if newReturnTime != -1 { - lastReturnTime = min(lastReturnTime, newReturnTime) + kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value} + if returnTime, ok := earliestReturnTime[kv]; !ok || returnTime > op.Return { + earliestReturnTime[kv] = op.Return } + earliestReturnTime[kv] = op.Return } + case model.Range: case model.LeaseGrant: case model.LeaseRevoke: case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } + if op.Return > lastReturnTime { + lastReturnTime = op.Return + } } - return persisted -} -func putReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 { - newOperations := map[model.EtcdOperation]int64{} - for _, op := range operations { - request := op.Input.(model.EtcdRequest) + for _, client := range reports { + for _, watch := range client.Watch { + for _, resp := range watch.Responses { + for _, event := range resp.Events { + switch event.Type { + case model.RangeOperation: + case model.PutOperation: + kv := keyValue{Key: event.Key, Value: event.Value} + if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() { + earliestReturnTime[kv] = resp.Time.Nanoseconds() + } + case model.DeleteOperation: + default: + panic(fmt.Sprintf("unknown event type %q", event.Type)) + } + } + } + } + } + + for i := len(persistedRequests) - 1; i >= 0; i-- { + request := persistedRequests[i] switch request.Type { case model.Txn: - for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if etcdOp.Type != model.PutOperation { + lastReturnTime-- + for _, op := range request.Txn.OperationsOnSuccess { + if op.Type != model.PutOperation { continue } - if _, found := newOperations[etcdOp]; found { - panic("Unexpected duplicate event in persisted requests.") + kv := keyValue{Key: op.Put.Key, Value: op.Put.Value} + returnTime, ok := earliestReturnTime[kv] + if ok { + lastReturnTime = min(returnTime, lastReturnTime) + earliestReturnTime[kv] = lastReturnTime } - newOperations[etcdOp] = op.Return } - case model.Range: case model.LeaseGrant: case model.LeaseRevoke: case model.Compact: @@ -231,33 +228,49 @@ func putReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } } - return newOperations + return earliestReturnTime } -func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 { - var maxReturnTime int64 - for _, returnTime := range operationTime { - if returnTime > maxReturnTime { - maxReturnTime = returnTime +func countClientPuts(reports []report.ClientReport) map[keyValue]int64 { + counter := map[keyValue]int64{} + for _, client := range reports { + for _, op := range client.KeyValue { + request := op.Input.(model.EtcdRequest) + countPuts(counter, request) } } - return maxReturnTime + return counter +} + +func countPersistedPuts(requests []model.EtcdRequest) map[keyValue]int64 { + counter := map[keyValue]int64{} + for _, request := range requests { + countPuts(counter, request) + } + return counter } -func returnTimeFromRequest(putReturnTimes map[model.EtcdOperation]int64, request model.EtcdRequest) int64 { +func countPuts(counter map[keyValue]int64, request model.EtcdRequest) { switch request.Type { case model.Txn: - for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if op.Type != model.PutOperation { - continue - } - if time, found := putReturnTimes[op]; found { - return time + for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + switch operation.Type { + case model.PutOperation: + kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value} + counter[kv]++ + case model.DeleteOperation: + case model.RangeOperation: + default: + panic(fmt.Sprintf("unknown operation type %q", operation.Type)) } } - return -1 + case model.LeaseGrant: + case model.LeaseRevoke: + case model.Compact: + case model.Defragment: + case model.Range: default: - panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + panic(fmt.Sprintf("unknown request type %q", request.Type)) } } diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index 559c70b82d5..bb104b0125f 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//nolint:unparam package validate import ( @@ -73,7 +74,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put remains if there is a matching event, return time based on next persisted request", + name: "failed put remains if there is a matching event, uniqueness allows for return time to be based on next persisted request", historyFunc: func(h *model.AppendableHistory) { h.AppendPut("key1", "value", 100, infinite, nil, errors.New("failed")) h.AppendPut("key2", "value", 300, 400, &clientv3.PutResponse{}, nil) @@ -88,7 +89,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put remains if there is a matching event, revision and return time based on watch", + name: "failed put remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch", historyFunc: func(h *model.AppendableHistory) { h.AppendPut("key", "value", 100, infinite, nil, errors.New("failed")) }, @@ -100,6 +101,22 @@ func TestPatchHistory(t *testing.T) { {Return: 300, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, }, }, + { + name: "failed put remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendPut("key", "value", 1, 2, nil, errors.New("failed")) + h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil) + }, + persistedRequest: []model.EtcdRequest{ + putRequest("key", "value"), + putRequest("key", "value"), + }, + watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, + {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, + }, + }, { name: "failed put is dropped if event has different key", historyFunc: func(h *model.AppendableHistory) { @@ -139,7 +156,7 @@ func TestPatchHistory(t *testing.T) { }, }, { - name: "failed put with lease remains if there is a matching event, return time based on next persisted request", + name: "failed put with lease remains if there is a matching event, uniqueness allows return time to be based on next persisted request", historyFunc: func(h *model.AppendableHistory) { h.AppendPutWithLease("key1", "value", 123, 100, infinite, nil, errors.New("failed")) h.AppendPutWithLease("key2", "value", 234, 300, 400, &clientv3.PutResponse{}, nil) @@ -153,6 +170,35 @@ func TestPatchHistory(t *testing.T) { {Return: 400, Output: putResponse(model.EtcdOperationResult{})}, }, }, + { + name: "failed put with lease remains if there is a matching event, uniqueness allows for revision and return time to be based on watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed")) + }, + persistedRequest: []model.EtcdRequest{ + putRequestWithLease("key", "value", 123), + }, + watchOperations: watchResponse(3, putEvent("key", "value", 2)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}}, + }, + }, + { + name: "failed put with lease remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch", + historyFunc: func(h *model.AppendableHistory) { + h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed")) + h.AppendPutWithLease("key", "value", 321, 3, 4, &clientv3.PutResponse{}, nil) + }, + persistedRequest: []model.EtcdRequest{ + putRequestWithLease("key", "value", 123), + putRequestWithLease("key", "value", 321), + }, + watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)), + expectedRemainingOperations: []porcupine.Operation{ + {Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}}, + {Return: 4, Output: putResponse(model.EtcdOperationResult{})}, + }, + }, { name: "failed put is dropped", historyFunc: func(h *model.AppendableHistory) { diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index e119ea4c574..5918ec0df83 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -59,11 +59,7 @@ type Config struct { } func checkValidationAssumptions(reports []report.ClientReport, persistedRequests []model.EtcdRequest) error { - err := validatePutOperationUnique(reports) - if err != nil { - return err - } - err = validateEmptyDatabaseAtStart(reports) + err := validateEmptyDatabaseAtStart(reports) if err != nil { return err } @@ -79,36 +75,6 @@ func checkValidationAssumptions(reports []report.ClientReport, persistedRequests return nil } -func validatePutOperationUnique(reports []report.ClientReport) error { - type KV struct { - Key string - Value model.ValueOrHash - } - putValue := map[KV]struct{}{} - for _, r := range reports { - for _, op := range r.KeyValue { - request := op.Input.(model.EtcdRequest) - if request.Type != model.Txn { - continue - } - for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { - if op.Type != model.PutOperation { - continue - } - kv := KV{ - Key: op.Put.Key, - Value: op.Put.Value, - } - if _, ok := putValue[kv]; ok { - return fmt.Errorf("non unique put %v, required to patch operation history", kv) - } - putValue[kv] = struct{}{} - } - } - } - return nil -} - func validateEmptyDatabaseAtStart(reports []report.ClientReport) error { for _, r := range reports { for _, op := range r.KeyValue {