From 975b0827a3837de2b17cf846f5ec165c0cd4df44 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Mon, 18 Mar 2024 12:48:58 -0700 Subject: [PATCH] Add a counter to node stat api to track when a shard goes from idle to non-idle Signed-off-by: Ruirui Zhang --- .../index/search/stats/SearchStats.java | 23 +++++++++++++- .../index/search/stats/ShardSearchStats.java | 9 +++++- .../opensearch/index/shard/IndexShard.java | 4 +++ .../index/shard/SearchOperationListener.java | 16 ++++++++++ .../index/search/stats/SearchStatsTests.java | 7 +++-- .../shard/SearchOperationListenerTests.java | 31 +++++++++++++++++++ 6 files changed, 85 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index 576e00f8f30d1..1da6faa3d6aa2 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -163,6 +163,8 @@ public static class Stats implements Writeable, ToXContentFragment { private long pitTimeInMillis; private long pitCurrent; + private long searchIdleWakenUpCount; + @Nullable private RequestStatsLongHolder requestStatsLongHolder; @@ -193,7 +195,8 @@ public Stats( long pitCurrent, long suggestCount, long suggestTimeInMillis, - long suggestCurrent + long suggestCurrent, + long searchIdleWakenUpCount ) { this.requestStatsLongHolder = new RequestStatsLongHolder(); this.queryCount = queryCount; @@ -220,6 +223,8 @@ public Stats( this.pitCount = pitCount; this.pitTimeInMillis = pitTimeInMillis; this.pitCurrent = pitCurrent; + + this.searchIdleWakenUpCount = searchIdleWakenUpCount; } private Stats(StreamInput in) throws IOException { @@ -239,6 +244,8 @@ private Stats(StreamInput in) throws IOException { suggestTimeInMillis = in.readVLong(); suggestCurrent = in.readVLong(); + searchIdleWakenUpCount = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_2_4_0)) { pitCount = in.readVLong(); pitTimeInMillis = in.readVLong(); @@ -282,6 +289,8 @@ public void add(Stats stats) { pitCount += stats.pitCount; pitTimeInMillis += stats.pitTimeInMillis; pitCurrent += stats.pitCurrent; + + searchIdleWakenUpCount += stats.searchIdleWakenUpCount; } public void addForClosingShard(Stats stats) { @@ -306,6 +315,8 @@ public void addForClosingShard(Stats stats) { pitTimeInMillis += stats.pitTimeInMillis; pitCurrent += stats.pitCurrent; queryConcurrency += stats.queryConcurrency; + + searchIdleWakenUpCount += stats.searchIdleWakenUpCount; } public long getQueryCount() { @@ -412,6 +423,10 @@ public long getSuggestCurrent() { return suggestCurrent; } + public long getSearchIdleWakenUpCount() { + return searchIdleWakenUpCount; + } + public static Stats readStats(StreamInput in) throws IOException { return new Stats(in); } @@ -434,6 +449,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(suggestTimeInMillis); out.writeVLong(suggestCurrent); + out.writeVLong(searchIdleWakenUpCount); + if (out.getVersion().onOrAfter(Version.V_2_4_0)) { out.writeVLong(pitCount); out.writeVLong(pitTimeInMillis); @@ -457,6 +474,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(concurrentQueryCurrent); out.writeVLong(queryConcurrency); } + } @Override @@ -486,6 +504,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime()); builder.field(Fields.SUGGEST_CURRENT, suggestCurrent); + builder.field(Fields.SEARCH_IDLE_WAKEN_UP_TOTAL, searchIdleWakenUpCount); + if (requestStatsLongHolder != null) { builder.startObject(Fields.REQUEST); @@ -647,6 +667,7 @@ static final class Fields { static final String PIT_CURRENT = "point_in_time_current"; static final String SUGGEST_TOTAL = "suggest_total"; static final String SUGGEST_TIME = "suggest_time"; + static final String SEARCH_IDLE_WAKEN_UP_TOTAL = "search_idle_waken_up_total"; static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; static final String SUGGEST_CURRENT = "suggest_current"; static final String REQUEST = "request"; diff --git a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java index 99e3f8465c5db..2002d7a216124 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java @@ -194,6 +194,11 @@ public void onNewScrollContext(ReaderContext readerContext) { totalStats.scrollCurrent.inc(); } + @Override + public void onNewSearchIdleWakenUp() { + totalStats.searchIdleMetric.inc(); + } + @Override public void onFreeScrollContext(ReaderContext readerContext) { totalStats.scrollCurrent.dec(); @@ -220,6 +225,7 @@ public void onFreePitContext(ReaderContext readerContext) { */ static final class StatsHolder { final MeanMetric queryMetric = new MeanMetric(); + final CounterMetric searchIdleMetric = new CounterMetric(); final MeanMetric concurrentQueryMetric = new MeanMetric(); final CounterMetric queryConcurrencyMetric = new CounterMetric(); final MeanMetric fetchMetric = new MeanMetric(); @@ -260,7 +266,8 @@ SearchStats.Stats stats() { pitCurrent.count(), suggestMetric.count(), TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()), - suggestCurrent.count() + suggestCurrent.count(), + searchIdleMetric.count() ); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 977155a1cbb72..3ed42e3203bb5 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1859,6 +1859,10 @@ public Engine.Searcher acquireSearcher(String source) { } private void markSearcherAccessed() { + if (isSearchIdle()) { + SearchOperationListener searchOperationListener = getSearchOperationListener(); + searchOperationListener.onNewSearchIdleWakenUp(); + } lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); } diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index 849a4f9c15318..9d0f26199dcb1 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -114,6 +114,11 @@ default void onFreeReaderContext(ReaderContext readerContext) {} */ default void onNewScrollContext(ReaderContext readerContext) {} + /** + * Executed when a shard goes from idle to non-idle state + */ + default void onNewSearchIdleWakenUp() {} + /** * Executed when a scroll search {@link SearchContext} is freed. * This happens either when the scroll search execution finishes, if the @@ -256,6 +261,17 @@ public void onNewScrollContext(ReaderContext readerContext) { } } + @Override + public void onNewSearchIdleWakenUp() { + for (SearchOperationListener listener : listeners) { + try { + listener.onNewSearchIdleWakenUp(); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onNewSearchIdleWakenUp listener [{}] failed", listener), e); + } + } + } + @Override public void onFreeScrollContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 5656b77445772..089f16480caa7 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -57,9 +57,9 @@ public void testShardLevelSearchGroupStats() throws Exception { // let's create two dummy search stats with groups Map groupStats1 = new HashMap<>(); Map groupStats2 = new HashMap<>(); - groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); - SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); - SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); + groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); + SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); + SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); // adding these two search stats and checking group stats are correct searchStats1.add(searchStats2); @@ -128,6 +128,7 @@ private static void assertStats(Stats stats, long equalTo) { assertEquals(equalTo, stats.getSuggestCount()); assertEquals(equalTo, stats.getSuggestTimeInMillis()); assertEquals(equalTo, stats.getSuggestCurrent()); + assertEquals(equalTo, stats.getSearchIdleWakenUpCount()); // avg_concurrency is not summed up across stats assertEquals(1, stats.getConcurrentAvgSliceCount(), 0); } diff --git a/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java index 98f86758ea2ca..543546486ee64 100644 --- a/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java @@ -64,6 +64,7 @@ public void testListenersAreExecuted() { AtomicInteger newScrollContext = new AtomicInteger(); AtomicInteger freeScrollContext = new AtomicInteger(); AtomicInteger validateSearchContext = new AtomicInteger(); + AtomicInteger searchIdleWakenUp = new AtomicInteger(); AtomicInteger timeInNanos = new AtomicInteger(randomIntBetween(0, 10)); SearchOperationListener listener = new SearchOperationListener() { @Override @@ -133,6 +134,11 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertNotNull(readerContext); validateSearchContext.incrementAndGet(); } + + @Override + public void onNewSearchIdleWakenUp() { + searchIdleWakenUp.incrementAndGet(); + } }; SearchOperationListener throwingListener = (SearchOperationListener) Proxy.newProxyInstance( @@ -169,6 +175,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onFetchPhase(ctx, timeInNanos.get()); @@ -182,6 +189,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onPreQueryPhase(ctx); @@ -195,6 +203,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onPreFetchPhase(ctx); @@ -208,6 +217,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onFailedFetchPhase(ctx); @@ -221,6 +231,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onFailedQueryPhase(ctx); @@ -234,6 +245,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onNewReaderContext(mock(ReaderContext.class)); @@ -247,6 +259,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onNewScrollContext(mock(ReaderContext.class)); @@ -260,6 +273,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(2, newScrollContext.get()); assertEquals(0, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onFreeReaderContext(mock(ReaderContext.class)); @@ -273,6 +287,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); compositeListener.onFreeScrollContext(mock(ReaderContext.class)); @@ -286,6 +301,21 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); assertEquals(2, freeScrollContext.get()); + assertEquals(0, searchIdleWakenUp.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onNewSearchIdleWakenUp(); + assertEquals(2, preFetch.get()); + assertEquals(2, preQuery.get()); + assertEquals(2, failedFetch.get()); + assertEquals(2, failedQuery.get()); + assertEquals(2, onQuery.get()); + assertEquals(2, onFetch.get()); + assertEquals(2, newContext.get()); + assertEquals(2, newScrollContext.get()); + assertEquals(2, freeContext.get()); + assertEquals(2, freeScrollContext.get()); + assertEquals(2, searchIdleWakenUp.get()); assertEquals(0, validateSearchContext.get()); if (throwingListeners == 0) { @@ -311,6 +341,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); assertEquals(2, freeScrollContext.get()); + assertEquals(2, searchIdleWakenUp.get()); assertEquals(2, validateSearchContext.get()); } }