-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15370: ACL changes to support 2PC (KIP-939) #19364
base: trunk
Are you sure you want to change the base?
KAFKA-15370: ACL changes to support 2PC (KIP-939) #19364
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a minor comment. Please fix the failed case. Thanks.
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAclInheritance(quorum: String): Unit = {
testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
testImplicationsOfAllow(READ, Set(DESCRIBE))
testImplicationsOfAllow(WRITE, Set(DESCRIBE))
testImplicationsOfAllow(DELETE, Set(DESCRIBE))
testImplicationsOfAllow(ALTER, Set(DESCRIBE))
testImplicationsOfDeny(DESCRIBE, Set())
testImplicationsOfAllow(ALTER_CONFIGS, Set(DESCRIBE_CONFIGS))
testImplicationsOfDeny(DESCRIBE_CONFIGS, Set())
}
@@ -1520,6 +1520,10 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) | |||
return | |||
} | |||
if (initProducerIdRequest.enable2Pc() && !authHelper.authorize(request.context, TWO_PHASE_COMMIT, TRANSACTIONAL_ID, transactionalId)) { | |||
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TRANSACTIONAL_ID_AUTHORIZATION_FAILED
is returned when client lacks either WRITE
or TWO_PHASE_COMMIT
permissions. Should we include information about which specific permission needs to be granted to the user in order to complete the operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, thanks for the comment! I'm unsure if exposing the exact missing ACL is common practice. Let me double-check
This patch adds ACL support for 2PC as a part of KIP-939
A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT ((byte) 15 . When InitProducerId comes with enable2Pc=true, it would have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the transactional id resource.
The kafka-acls.sh tool is going to support a new --operation TwoPhaseCommit.