Skip to content

Commit

Permalink
Added precomputation logic for the stats calculation
Browse files Browse the repository at this point in the history
Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
  • Loading branch information
vinaykpud committed Jan 27, 2025
1 parent 90b96a8 commit 7f465a0
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {
assertTrue(replicationStats.getTotalBytesBehind() == 0);
assertTrue(replicationStats.getMaxReplicationLag() == 0);
}
// replica nodes - should hold empty replication statistics
// replica nodes - should hold replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertTrue(replicationStats.getMaxBytesBehind() > 0);
assertTrue(replicationStats.getTotalBytesBehind() > 0);
Expand Down
5 changes: 2 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -656,7 +655,7 @@ public IndexService newIndexService(
recoverySettings,
remoteStoreSettings,
(s) -> {},
null
shardId -> ReplicationStats.empty()
);
}

Expand All @@ -683,7 +682,7 @@ public IndexService newIndexService(
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator,
BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down
7 changes: 3 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -198,7 +197,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -238,7 +237,7 @@ public IndexService(
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator,
BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -400,7 +399,7 @@ public IndexService(
null,
null,
s -> {},
null
(shardId) -> ReplicationStats.empty()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public ReplicationStats(StreamInput in) throws IOException {
this.maxReplicationLag = in.readVLong();
}

public static ReplicationStats empty() {
return new ReplicationStats();
}

public ReplicationStats() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Runnable getGlobalCheckpointSyncer() {
*/
private final ShardMigrationState shardMigrationState;
private DiscoveryNodes discoveryNodes;
private final BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -393,7 +393,7 @@ public IndexShard(
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote,
final DiscoveryNodes discoveryNodes,
final BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -3233,9 +3233,9 @@ public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas()

public ReplicationStats getReplicationStats() {
if (indexSettings.isSegRepEnabledOrRemoteNode() && !routingEntry().primary()) {
return segmentReplicationStatsProvider.apply(shardId, this.getLatestReplicationCheckpoint());
return segmentReplicationStatsProvider.apply(shardId);
}
return new ReplicationStats(0, 0, 0);
return ReplicationStats.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
private volatile int maxSizeInRequestCache;

@Override
Expand Down Expand Up @@ -403,7 +403,7 @@ public IndicesService(
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator,
BiFunction<ShardId, ReplicationCheckpoint, ReplicationStats> segmentReplicationStatsProvider
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void getSegmentFiles(
return;
}
logger.debug("Downloading segment files from remote store {}", filesToFetch);

slowByStringOperations(16);
if (remoteMetadataExists()) {
final Directory storeDirectory = indexShard.store().directory();
final Collection<String> directoryFiles = List.of(storeDirectory.listAll());
Expand All @@ -136,6 +136,15 @@ public void getSegmentFiles(
}
}

public static void slowByStringOperations(int intensity) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < intensity * 10000; i++) {
sb.append(String.valueOf(i));
sb.reverse();
}
System.out.println("Slow Operation finished");
}

@Override
public void cancel() {
this.cancellableThreads.cancel("Canceled by target");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -163,7 +162,7 @@ public void writeFileChunk(
*
* @param listener {@link ActionListener} listener.
*/
public void startReplication(ActionListener<Void> listener, BiConsumer<ReplicationCheckpoint, ShardId> checkpointUpdater) {
public void startReplication(ActionListener<Void> listener, BiConsumer<ReplicationCheckpoint, IndexShard> checkpointUpdater) {
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
throw new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
});
Expand All @@ -179,7 +178,7 @@ public void startReplication(ActionListener<Void> listener, BiConsumer<Replicati
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> {
checkpointUpdater.accept(checkpointInfo.getCheckpoint(), this.indexShard.shardId());
checkpointUpdater.accept(checkpointInfo.getCheckpoint(), this.indexShard);

final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
return;
}
updateLatestReceivedCheckpoint(receivedCheckpoint, replicaShard);
replicator.updatePrimaryLastRefreshedCheckpoint(receivedCheckpoint, replicaShard.shardId());
replicator.updateReplicationCheckpointStats(receivedCheckpoint, replicaShard);
// Checks if replica shard is in the correct STARTED state to process checkpoints (avoids parallel replication events taking place)
// This check ensures we do not try to process a received checkpoint while the shard is still recovering, yet we stored the latest
// checkpoint to be replayed once the shard is Active.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -47,8 +49,8 @@ public class SegmentReplicator {

private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;
private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();
private final Map<ShardId, ReplicationCheckpoint> primaryLastRefreshedCheckpoint = ConcurrentCollections.newConcurrentMap();
private final Map<ShardId, ReplicationCheckpoint> lastOnGoingReplicationCheckpoint = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, ConcurrentNavigableMap<Long, ReplicationCheckpointStats>> replicationCheckpointStats = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, ReplicationCheckpoint> latestCheckpoint = ConcurrentCollections.newConcurrentMap();
private final ThreadPool threadPool;
private final SetOnce<SegmentReplicationSourceFactory> sourceFactory;

Expand Down Expand Up @@ -107,47 +109,87 @@ SegmentReplicationTarget startReplication(
return target;
}

public ReplicationStats getSegmentReplicationStats(ShardId shardId, ReplicationCheckpoint indexReplicationCheckPoint) {
assert shardId != null : "shardId cannot be null";
assert indexReplicationCheckPoint != null : "indexReplicationCheckPoint cannot be null";
;
final Map<String, StoreFileMetadata> indexStoreFileMetadata = indexReplicationCheckPoint.getMetadataMap();
// If primaryLastRefreshedCheckpoint is null, we will default to indexReplicationCheckPoint
// so that we can avoid any failures
final ReplicationCheckpoint primaryLastRefreshedCheckpoint = Objects.requireNonNullElse(
this.primaryLastRefreshedCheckpoint.get(shardId),
indexReplicationCheckPoint
);
final Map<String, StoreFileMetadata> storeFileMetadata = primaryLastRefreshedCheckpoint.getMetadataMap();
public ReplicationStats getSegmentReplicationStats(final ShardId shardId) {
final ConcurrentNavigableMap<Long, ReplicationCheckpointStats> existingCheckpointStats = replicationCheckpointStats.get(shardId);
if (existingCheckpointStats == null || existingCheckpointStats.isEmpty()) {
return ReplicationStats.empty();
}

final Store.RecoveryDiff diff = Store.segmentReplicationDiff(storeFileMetadata, indexStoreFileMetadata);
long bytesBehindSum = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
Map.Entry<Long, ReplicationCheckpointStats> lowestEntry = existingCheckpointStats.firstEntry();
Map.Entry<Long, ReplicationCheckpointStats> highestEntry = existingCheckpointStats.lastEntry();

final ReplicationCheckpoint lastOnGoingReplicationCheckpoint = this.lastOnGoingReplicationCheckpoint.get(shardId);
final long replicationLag = lastOnGoingReplicationCheckpoint != null
? TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastOnGoingReplicationCheckpoint.getCreatedTimeStamp())
: 0;
long bytesBehind = highestEntry.getValue().getBytesBehind();
long replicationLag = bytesBehind > 0L ?
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lowestEntry.getValue().getTimestamp()) : 0;

return new ReplicationStats(bytesBehindSum, bytesBehindSum, bytesBehindSum > 0L ? replicationLag : 0);
return new ReplicationStats(bytesBehind, bytesBehind, replicationLag);
}

public void updatePrimaryLastRefreshedCheckpoint(ReplicationCheckpoint replicationCheckpoint, ShardId shardId) {
updateCheckpointIfAhead(primaryLastRefreshedCheckpoint, replicationCheckpoint, shardId);
public void updateReplicationCheckpointStats(final ReplicationCheckpoint latestCheckPoint, final IndexShard indexShard) {
latestCheckpoint.put(indexShard.shardId(), latestCheckPoint);
ConcurrentNavigableMap<Long, ReplicationCheckpointStats> existingCheckpointStats = replicationCheckpointStats.computeIfAbsent(
indexShard.shardId(),
k -> new ConcurrentSkipListMap<>()
);

long segmentInfosVersion = latestCheckPoint.getSegmentInfosVersion();
if (!existingCheckpointStats.containsKey(segmentInfosVersion)) {
ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint();
long bytesBehind = calculateBytesBehind(latestCheckPoint, replicationCheckpoint);

if (bytesBehind > 0) {
existingCheckpointStats.put(
segmentInfosVersion,
new ReplicationCheckpointStats(bytesBehind, latestCheckPoint.getCreatedTimeStamp())
);
}
}
}

public void updateReplicationCheckpoints(ReplicationCheckpoint replicationCheckpoint, ShardId shardId) {
updateCheckpointIfAhead(lastOnGoingReplicationCheckpoint, replicationCheckpoint, shardId);
updatePrimaryLastRefreshedCheckpoint(replicationCheckpoint, shardId);
protected void pruneCheckpointsUpToLastSync(final IndexShard indexShard) {
ReplicationCheckpoint latestCheckpoint = this.latestCheckpoint.get(indexShard.shardId());

ReplicationCheckpoint indexReplicationCheckPoint = indexShard.getLatestReplicationCheckpoint();
long segmentInfoVersion = indexReplicationCheckPoint.getSegmentInfosVersion();
final ConcurrentNavigableMap<Long, ReplicationCheckpointStats> existingCheckpointStats = replicationCheckpointStats.get(indexShard.shardId());

existingCheckpointStats.headMap(segmentInfoVersion, true).clear();
Map.Entry<Long, ReplicationCheckpointStats> lastEntry = existingCheckpointStats.lastEntry();
if(lastEntry != null) {
lastEntry.getValue().setBytesBehind(calculateBytesBehind(latestCheckpoint, indexReplicationCheckPoint));
}
}

private void updateCheckpointIfAhead(
Map<ShardId, ReplicationCheckpoint> checkpointMap,
ReplicationCheckpoint newCheckpoint,
ShardId shardId
) {
final ReplicationCheckpoint existingCheckpoint = checkpointMap.get(shardId);
if (existingCheckpoint == null || newCheckpoint.isAheadOf(existingCheckpoint)) {
checkpointMap.put(shardId, newCheckpoint);
private long calculateBytesBehind(final ReplicationCheckpoint latestCheckPoint, final ReplicationCheckpoint replicationCheckpoint) {
Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestCheckPoint.getMetadataMap(),
replicationCheckpoint.getMetadataMap()
);

return diff.missing.stream()
.mapToLong(StoreFileMetadata::length)
.sum();
}

private static class ReplicationCheckpointStats {
private long bytesBehind;
private final long timestamp;

public ReplicationCheckpointStats(long bytesBehind, long timestamp) {
this.bytesBehind = bytesBehind;
this.timestamp = timestamp;
}

public long getBytesBehind() {
return bytesBehind;
}

public void setBytesBehind(long bytesBehind) {
this.bytesBehind = bytesBehind;
}

public long getTimestamp() {
return timestamp;
}
}

Expand Down Expand Up @@ -187,6 +229,7 @@ private void start(final long replicationId) {
@Override
public void onResponse(Void o) {
logger.debug(() -> new ParameterizedMessage("Finished replicating {} marking as done.", target.description()));
pruneCheckpointsUpToLastSync(target.indexShard());
onGoingReplications.markAsDone(replicationId);
if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) {
completedReplications.put(target.shardId(), target.state());
Expand All @@ -202,7 +245,7 @@ public void onFailure(Exception e) {
}
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false);
}
}, this::updateReplicationCheckpoints);
}, this::updateReplicationCheckpointStats);
}

// pkg-private for integration tests
Expand All @@ -229,7 +272,7 @@ private void fetchPrimaryLastRefreshedCheckpoint(SegmentReplicationTarget target
sourceFactory.get().get(target.indexShard()).getCheckpointMetadata(target.getId(), target.getCheckpoint(), new ActionListener<>() {
@Override
public void onResponse(CheckpointInfoResponse checkpointInfoResponse) {
updatePrimaryLastRefreshedCheckpoint(checkpointInfoResponse.getCheckpoint(), target.indexShard().shardId());
updateReplicationCheckpointStats(checkpointInfoResponse.getCheckpoint(), target.indexShard());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.Version;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -44,7 +46,6 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
import org.junit.Assert;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -56,8 +57,6 @@
import java.util.Random;
import java.util.function.BiConsumer;

import org.mockito.Mockito;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -177,9 +176,9 @@ public void onFailure(Exception e) {
logger.error("Unexpected onFailure", e);
Assert.fail();
}
}, (ReplicationCheckpoint checkpoint, ShardId shardId) -> {
}, (ReplicationCheckpoint checkpoint, IndexShard indexShard) -> {
assertEquals(repCheckpoint, checkpoint);
assertEquals(shardId, spyIndexShard.shardId());
assertEquals(indexShard, spyIndexShard);
});
}

Expand Down
Loading

0 comments on commit 7f465a0

Please sign in to comment.