Skip to content

Commit

Permalink
Add integ test
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Feb 17, 2025
1 parent b6b0e72 commit 74f65f6
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public void testSearchReplicaWithDedicatedNodesAndAwarenessAttributes() throws E
System.out.println(shardRouting);
}
System.out.println(allocatedWriters);
assertEquals(1, allocatedWriters.size());
}

private IndexShardRoutingTable getRoutingTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("index", shardRouting.getIndexName());
builder.field("shard", shardRouting.getId());
builder.field("primary", shardRouting.primary());
// builder.field("search", shardRouting.isSearchOnly());
builder.field("current_state", shardRouting.state().toString().toLowerCase(Locale.ROOT));
if (shardRouting.unassignedInfo() != null) {
unassignedInfoToXContent(shardRouting.unassignedInfo(), builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,14 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
}
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
}
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
addAllocationDecider(deciders, new RemoteStoreMigrationAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
preference = Preference.PRIMARY_FIRST.type();
}

// FIXME: Need another metadata setting to force write only so we don't fall back
if (isReaderWriterSplitEnabled) {
if (preference == null || preference.isEmpty()) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -150,6 +152,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return underCapacity(shardRouting, node, allocation, false);
}

public static Logger logger = LogManager.getLogger(AwarenessAllocationDecider.class);

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.isEmpty()) {
return allocation.decision(
Expand Down Expand Up @@ -179,6 +183,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
// build attr_value -> nodes map

Set<String> nodesPerAttribute = getNodesPerAttributeSet(shardRouting, allocation, awarenessAttribute);
logger.info("NODES PER ATTR {}", nodesPerAttribute);
int numberOfAttributes = nodesPerAttribute.size(); // number of nodes that contain the "zone" attr
List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);

Expand All @@ -193,8 +198,9 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout

// TODO should we remove ones that are not part of full list?
final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
logger.info("Current node count {} {} {} {} - MAX - CURRENT {}", shardRouting.isSearchOnly(), currentNodeCount, nodesPerAttribute.size(), maximumNodeCount, currentNodeCount);
if (currentNodeCount > maximumNodeCount) {
return allocation.decision(
Decision decision = allocation.decision(
Decision.NO,
NAME,
"there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured "
Expand All @@ -206,6 +212,15 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
currentNodeCount,
maximumNodeCount
);
logger.warn( "there are too many copies of the shard allocated to nodes with attribute [{}], there are [{}] total configured "
+ "shard copies for this shard id and [{}] total attribute values, expected the allocated shard count per "
+ "attribute [{}] to be less than or equal to the upper bound of the required number of shards per attribute [{}]",
awarenessAttribute, // the attribute (zone)
shardCount, // total shard count
numberOfAttributes, // num of unique attribute values
currentNodeCount, // num of assigned shard copies with the same attr value as the node in consideration
maximumNodeCount);
return decision;
}
}

Expand Down Expand Up @@ -233,6 +248,8 @@ private int getCurrentNodeCountForAttribute(
final List<ShardRouting> shardRoutings = allocation.routingNodes().assignedShards(shardRouting.shardId());
final List<ShardRouting> assignedShards = shardRouting.isSearchOnly() ? shardRoutings.stream().filter(ShardRouting::isSearchOnly).collect(Collectors.toList()) :
shardRoutings.stream().filter(s -> s.isSearchOnly() == false).collect(Collectors.toList());
logger.info("{} SSIGNED SHARDS {}", shardRouting.isSearchOnly(), assignedShards);
// .stream().filter(s -> s.isSearchOnly() && shardRouting.isSearchOnly()).collect(Collectors.toList());

for (ShardRouting assignedShard : assignedShards) {
if (assignedShard.started() || assignedShard.initializing()) {
Expand All @@ -259,11 +276,15 @@ private int getCurrentNodeCountForAttribute(
++currentNodeCount;
}
} else {
if (shardRouting.isSearchOnly() == SearchReplicaAllocationDecider.isSearchOnlyNode(node.node())) {
logger.info("Shard not assigned, isSearchOnly: {}, node isSearchOnly: {}",
shardRouting.isSearchOnly(),
SearchReplicaAllocationDecider.isSearchOnlyNode(node.node())); if (shardRouting.isSearchOnly() == SearchReplicaAllocationDecider.isSearchOnlyNode(node.node())) {
++currentNodeCount;
}
// logger.info("ELSE Increment {} {} {}", currentNodeCount, shardRouting.isSearchOnly(), SearchReplicaAllocationDecider.isSearchOnlyNode(node.node()));
}
}

return currentNodeCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeFilters;
import org.opensearch.cluster.routing.RoutingNode;
Expand All @@ -32,7 +30,6 @@
* The filter behaves similar to an include for any defined node attribute.
* A search replica can be allocated to only nodes with one of the specified attributes while
* other shard types will be rejected from nodes with any of the attributes.
*
* @opensearch.internal
*/
public class SearchReplicaAllocationDecider extends AllocationDecider {
Expand All @@ -51,8 +48,7 @@ public SearchReplicaAllocationDecider(Settings settings, ClusterSettings cluster
clusterSettings.addAffixMapUpdateConsumer(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
this::setSearchReplicaIncludeFilters,
(a, b) -> {
}
(a, b) -> {}
);
}

Expand All @@ -66,14 +62,9 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return shouldFilter(shardRouting, node.node(), allocation);
}

public static Logger logger = LogManager.getLogger(SearchReplicaAllocationDecider.class);

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
if (searchReplicaIncludeFilters != null) {
logger.info("Should filter");
final boolean match = searchReplicaIncludeFilters.match(node);
logger.info("Node {} match {} routing {}", node.getName(), match, shardRouting.isSearchOnly());

if (match == false && shardRouting.isSearchOnly()) {
return allocation.decision(
Decision.NO,
Expand All @@ -93,8 +84,9 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou
searchReplicaIncludeFilters
);
}
} else if (shardRouting.isSearchOnly()) {
return allocation.decision(Decision.NO, NAME, "There are no nodes designated with node attribute [%s] for search replicas", SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX);
}
if (shardRouting.isSearchOnly()) {
return allocation.decision(Decision.NO, NAME, "There are no nodes designated with node attribute [%] for search replicas", SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX);
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}
Expand Down

0 comments on commit 74f65f6

Please sign in to comment.