Skip to content

Commit

Permalink
minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kmrdhruv committed Dec 15, 2024
1 parent 54c2164 commit e6d3f26
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public List<TopicPartitions<PulsarStorageTopic>> shardTopic(
PulsarStorageTopic topic, InternalQueueCategory category
) {
List<TopicPartitions<PulsarStorageTopic>> topicPartitions = new ArrayList<>();
int shardCount = topicPlanner.getShardCount(topic.getCapacity(), category);
shardCount = Math.min(shardCount, topic.getPartitionCount());
int shardCount = topicPlanner.getShardCount(topic, category);
int partitionsPerShard = topic.getPartitionCount() / shardCount;
for (int shardId = 0; shardId < shardCount; shardId++) {
topicPartitions.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.flipkart.varadhi.entities.InternalQueueCategory;
import com.flipkart.varadhi.entities.TopicCapacityPolicy;
import com.flipkart.varadhi.pulsar.config.PulsarConfig;
import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -29,16 +30,24 @@ public int getPartitionCount(TopicCapacityPolicy ask, InternalQueueCategory cate
partitionCount = partitionCount * fanOutMultiplier;
// need to ensure that partitions are equally distributed across the shards and hence
// partitionCount will be in multiple of shardCount.
int shardCount = getShardCount(ask, category);
int shardCount = getShardCount(ask, partitionCount, category);
int deltaForMultiple = shardCount - (partitionCount % shardCount);
partitionCount = deltaForMultiple == shardCount ? partitionCount : partitionCount + deltaForMultiple;
int boundedPartitionCount =
Math.max(config.getMinPartitionPerTopic(), Math.min(partitionCount, config.getMaxPartitionPerTopic()));
if (0 != boundedPartitionCount % shardCount) {
log.error("Capacity ask:{} Suggested Partition(s):{} Suggested Shard(s):{}", ask, boundedPartitionCount, shardCount);
throw new IllegalArgumentException("Couldn't partition topic equally into shards.");
}
log.debug("Suggested PartitionCount:{} for capacity:{}", boundedPartitionCount, ask);
return boundedPartitionCount;
}

public int getShardCount(TopicCapacityPolicy ask, InternalQueueCategory category) {
public int getShardCount(PulsarStorageTopic topic, InternalQueueCategory category) {
return getShardCount(topic.getCapacity(), topic.getPartitionCount(), category);
}

private int getShardCount(TopicCapacityPolicy ask, int topicPartitionCount, InternalQueueCategory category) {
if (category != InternalQueueCategory.MAIN) {
return 1;
}
Expand All @@ -48,7 +57,8 @@ public int getShardCount(TopicCapacityPolicy ask, InternalQueueCategory category
int shardCount = Math.max(countFromQps, countFromKBps);
int deltaForMultiple = config.getShardMultiples() - (shardCount % config.getShardMultiples());
shardCount = deltaForMultiple == config.getShardMultiples() ? shardCount : shardCount + deltaForMultiple;
shardCount = Math.min(shardCount, config.getMaxShardPerSubscription());
// Limit max shardCount to topic's partition count and max configured shard per subscription.
shardCount = Math.min(Math.min(shardCount, topicPartitionCount), config.getMaxShardPerSubscription());
log.debug("Suggested ShardCount:{} for capacity:{}", shardCount, ask);
return shardCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ public VaradhiSubscription get(SubscriptionResource subscriptionResource, Projec
);
}

/*
* TODO::
* Push sharding logic further down to messaging stack as Vardhi can't decide if messaging stack
* allows sharding or not based on topic's capacity.
*/

private SubscriptionShards getSubscriptionShards(
String subName, VaradhiTopic topic, Project subProject, ConsumptionPolicy consumptionPolicy,
RetryPolicy retryPolicy
Expand Down Expand Up @@ -165,8 +159,11 @@ private InternalCompositeSubscription getInternalSub(
TopicCapacityPolicy errCapacity =
capacity.from(consumptionPolicy.getMaxErrorThreshold(), READ_FAN_OUT_FOR_INTERNAL_QUEUE);
StorageTopic st = topicFactory.getTopic(itTopicName, project, errCapacity, queueType.getCategory());
TopicPartitions<StorageTopic> tp = TopicPartitions.byPartitions(st, new int[]{1});
StorageSubscription<StorageTopic> ss = subscriptionFactory.get(itSubName, tp, project);
List<TopicPartitions<StorageTopic>> topicPartitions = topicService.shardTopic(st, queueType.getCategory());
if (topicPartitions.size() != 1) {
throw new IllegalArgumentException("Multi shard internal topics are unsupported for now.");
}
StorageSubscription<StorageTopic> ss = subscriptionFactory.get(itSubName, topicPartitions.get(0), project);
return InternalCompositeSubscription.of(ss, queueType);
}

Expand Down

0 comments on commit e6d3f26

Please sign in to comment.