Skip to content

Commit

Permalink
final changes for sticky-dequeue-1 support
Browse files Browse the repository at this point in the history
  • Loading branch information
s-ourabh committed Feb 4, 2025
1 parent 2bf7677 commit dcfe305
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ record = new ConsumerRecord<>("", -1, -1, -1, TimestampType.NO_TIMESTAMP_TYPE, n
new LeaderAndEpoch(Optional.empty(), Optional.empty())));
} catch (IllegalStateException isE) {
TopicTeqParameters teqParam = metadata.topicParaMap.get(topic);
int stickyDeqParam = teqParam != null ? teqParam.getStickyDeq(): null;
int stickyDeqParam = teqParam != null ? teqParam.getStickyDeq(): 2;
if (metadata.getDBMajorVersion() < 23 || stickyDeqParam == 1) {
// Partition assigned by TEQ Server not through JoinGroup/Sync
subscriptions.assignFromSubscribed(Collections.singleton(tp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,8 @@ public void onComplete(ClientResponse response) {
log.debug("Fetch Records for topic " + poll.getValue() + " from host " + node );
String topic = poll.getValue();
TopicTeqParameters teqParam = metadata.topicParaMap.get(topic);
int stickyDeqParam = teqParam != null ? teqParam.getStickyDeq(): null;
int stickyDeqParam = teqParam != null ? teqParam.getStickyDeq(): 2;
if(stickyDeqParam == 0) {
System.out.println(metadata.topicParaMap.get(topic).getStickyDeq());
String errMsg = "Topic " + topic + " is not an Oracle kafka topic, Please drop and re-create topic"
+" using Admin.createTopics() or dbms_aqadm.create_database_kafka_topic procedure";
throw new InvalidTopicException(errMsg);
Expand Down Expand Up @@ -282,7 +281,7 @@ public void onComplete(ClientResponse response) {

return this.messages;
}

/**
*
* @return map of <node , topic> . Every node is leader for its corresponding topic.
Expand Down

0 comments on commit dcfe305

Please sign in to comment.