Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ConcurrentMergeScheduler for Multi-Tenant Indexing #14335

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

DivyanshIITB
Copy link
Contributor

This PR enhances the ConcurrentMergeScheduler by introducing dynamic resource allocation for multi-tenant indexing scenarios.

Key Improvements:

  1. Global Counter for Active IndexWriters: Introduced an AtomicInteger to track active IndexWriter instances dynamically.
  2. Dynamic Thread Allocation: Adjusted maxThreadCount based on the number of active writers to optimize merge performance.
  3. Adaptive Merge Throttling: Distributed merge speed limits among active writers to prevent resource contention and ensure fair indexing performance.

These changes improve efficiency in multi-writer environments by dynamically adapting merge scheduling to system load.

Fixes #13883

@jpountz
Copy link
Contributor

jpountz commented Mar 8, 2025

This looks too naive to me, we don't want each index writer to have 1/N of the resources, which would prevent one writer from maxing out resources (e.g. if one index has a heavy write load while other indexes are only rarely updated).

@DivyanshIITB
Copy link
Contributor Author

Thank you for the review, @jpountz!

I see your concern regarding equal resource distribution across IndexWriter instances potentially leading to inefficiencies when some writers have a significantly heavier workload than others. My initial goal was to ensure fairness in multi-tenant environments, but I agree that a static 1/N allocation might not be ideal.

To address this, I’m considering an adaptive resource allocation approach, where:

  • Recent Merge Activity Tracking: Each IndexWriter's recent merge activity is monitored.
  • Dynamic Thread Allocation: Instead of an equal split, threads are assigned based on recent workload.
  • Merge Throttling Balance: Prevents a single heavy writer from starving others while ensuring underutilized writers don’t hold
    excessive resources.

Would this approach align better with Lucene’s design goals?

@jpountz
Copy link
Contributor

jpountz commented Mar 11, 2025

Merge throttling is now disabled by default, IMO it's fine to ignore merge throttling for now. Regarding thread creation, I'm thinking of a shared fixed (e.g. number of processors / 2) thread pool for merge scheduling.

@DivyanshIITB
Copy link
Contributor Author

Thank you for the clarification, @jpountz!

I'll drop the merge throttling aspect from the changes since it's disabled by default.
Regarding the fixed thread pool approach (numProcessors / 2), should this be a shared global pool across all IndexWriters, or should each writer have its own pool?
If it's a shared pool, how should we handle cases where a few writers are highly active while others are idle? Should we allow active writers to take more resources dynamically, or keep a strict fixed allocation?
Let me know your thoughts, and I'll adjust the implementation accordingly!

@DivyanshIITB
Copy link
Contributor Author

Just a gentle reminder

@jpountz
Copy link
Contributor

jpountz commented Mar 14, 2025

Apologies I had missed your reply.

should this be a shared global pool across all IndexWriters, or should each writer have its own pool?

It should be shared, we don't want the total number of threads to scale with the number of index writers. The reasoning for the numProcessors/2 number is that merging generally should not be more expensive than indexing, so by reserving only half the CPU capacity for merging, it should still be possible to max out hardware while indexing, while also having a peak number of threads running merges under numProcessors/2.

If it's a shared pool, how should we handle cases where a few writers are highly active while others are idle? Should we allow active writers to take more resources dynamically, or keep a strict fixed allocation?

Idle writers would naturally submit fewer tasks than highly active writers. IMO the fixed allocation is key here.

@DivyanshIITB
Copy link
Contributor Author

Thanks for the detailed clarification, @jpountz!

I have made the necessary changes to the implementation:

  • Implemented a shared global thread pool with a fixed size of numProcessors / 2.
  • Ensured that each IndexWriter submits merge tasks to the shared pool without dynamic reallocation, as idle writers naturally
    submit fewer tasks.
  • Updated the logic in MergeScheduler.java to align with these changes.

@jpountz, I’ve made the requested changes. Please review when you get a chance.

@jpountz
Copy link
Contributor

jpountz commented Mar 15, 2025

I think it'll be simpler to create a new merge scheduler rather than modify ConcurrentMergeScheduler. Also we'll need tests.

@DivyanshIITB
Copy link
Contributor Author

DivyanshIITB commented Mar 16, 2025

Thanks for your feedback, @jpountz!

  • I have created a new MultiTenantMergeScheduler as suggested, instead of modifying ConcurrentMergeScheduler.
  • I have restored ConcurrentMergeScheduler to its original state.
  • Added a test case (TestMultiTenantMergeScheduler) to validate the new scheduler's behavior.

Please review and let me know if any further changes are needed!

Copy link
Contributor

@vigyasharma vigyasharma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DivyanshIITB : I appreciate your interest in this issue. A multi-tenant CMS however, has quite a few nuances that this implementation will need to cover. I'd encourage you to look at the different ways in which IndexWriter interacts with merging. Some examples of functionality I find missing include support for forceMerging, aborting a merge, and waiting till all merges complete.

I can help review future iterations that include these changes.

public class MultiTenantMergeScheduler extends MergeScheduler {

// Shared global thread pool
private static final ExecutorService MERGE_THREAD_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create a threadpool even when this policy is not used. Let's use lazy static initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I've updated the implementation to use lazy static initialization by wrapping the thread pool in a nested static class. This ensures the thread pool is only created when first accessed, avoiding unnecessary resource allocation. Let me know if you have any further feedback! Now, instead of directly initializing MERGE_THREAD_POOL, I'm using a LazyHolder class with a getMergeThreadPool() method to ensure thread-safe lazy initialization. This should prevent the thread pool from being created when the policy is unused."

public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws IOException {
while (true) {
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
if (merge == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use hasPendingMerges() instead? I know existing schedulers assume null means there are no more merges, but it's not the documented API behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I’ve updated the loop condition to use hasPendingMerges() instead of relying on null. This ensures better adherence to the documented API behavior. Let me know if this aligns with what you were suggesting!

Comment on lines 23 to 36
// Submit merge task to the shared thread pool
Future<?> future = MERGE_THREAD_POOL.submit(() -> {
try {
mergeSource.merge(merge);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

try {
future.get(); // Ensure the task completes
} catch (Exception e) {
throw new IOException("Merge operation failed", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this achieves concurrent background merging like we'd want. I think this code will submit a merge request and block on its future for completion, effectively making all merges sequential.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! You're absolutely right—blocking on future.get(); was making the merges sequential. I've removed it so that merges can now proceed asynchronously in the background. Let me know if this aligns with the intended behavior!

}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be a no-op. We should wait for all running merges of the calling IndexWriter to complete, similar to sync() in ConcurrentMergeScheduler. This could be done by maintaining a per writer list of futures for running merges. We could run a clean up loop in merge() that removes completed futures.

Also, I get that this thread pool is global and shared, but I don't like the idea of having a threadpool that can never be closed. How about we expose some hooks to close this scheduler if all consuming writers have closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I’ve updated close() to properly wait for all running merges, similar to sync() in ConcurrentMergeScheduler. Now, merge() tracks active tasks, and I've added cleanup for completed futures. Also, I introduced shutdownThreadPool() to allow safe global shutdown. Let me know if this works!


public class TestMultiTenantMergeScheduler extends LuceneTestCase {

public void testMultiTenantMergeScheduler() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not sufficient. We need to test (and also implement) a lot more scenarios, like multiple writers consuming this scheduler, scheduling different merges and starting/closing at different times. You can refer to the tests in TestConcurrentMergeScheduler as a starting point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I’ve expanded the test suite to include multiple writers consuming the scheduler, different merge scheduling times, and staggered start/close behavior. Also, I’ve referred to TestConcurrentMergeScheduler for structuring test cases to verify concurrency. Let me know if you have additional suggestions!

@DivyanshIITB DivyanshIITB force-pushed the fix-multitenancy-merge-scheduler-clean branch from 71ac56f to b6e7556 Compare March 17, 2025 12:29
@DivyanshIITB DivyanshIITB force-pushed the fix-multitenancy-merge-scheduler-clean branch from 6c356ff to 13b914c Compare March 17, 2025 12:33
@DivyanshIITB
Copy link
Contributor Author

DivyanshIITB commented Mar 17, 2025

I have a request to you. Kindly ignore the following two deleted files in the "Files Changed" section :
"KeepOnlyLastCommitDeletionPolicy.java"
"ConcurrentMergeScheduler.java"

Thanks!

}

@Override
public void close() throws IOException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC IndexWriter.close() will call MergeScheduler.close() which means that you would wait for all merges across all indexes using this scheduler to complete. The IndexWriter that is currently calling close() will stop scheduling new merges, but the other writers may not, which means this may not terminate. To fix this you may need to map active merges back to the IndexWriter or Directory they belong to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1; my suggestion in the previous review was similar. We need a mapping of writer to merges and only block on merges specific to the calling writer.

Comment on lines 14 to 23
private static class LazyHolder {
static final ExecutorService MERGE_THREAD_POOL =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
}

private static ExecutorService getMergeThreadPool() {
return LazyHolder.MERGE_THREAD_POOL;
}

// Use getMergeThreadPool() instead of direct access
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip the getMergeThreadPool() function since we only need to access the threadpool internally. Something like:

  private static class MergeThreadPool {
      private static final ExecutorService INSTANCE = 
          Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
  }
  // and then just use MergeThreadPool.INSTANCE inside merge() and other functions.
  // ...
}

});

// Cleanup completed merges
activeMerges.removeIf(Future::isDone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this declared and initialized? When are merges added to activeMerges ?

}

@Override
public void close() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1; my suggestion in the previous review was similar. We need a mapping of writer to merges and only block on merges specific to the calling writer.

MultiTenantMergeScheduler.shutdownThreadPool();

// Check if merging took less time than sequential execution would
assertTrue("Merges did not happen concurrently!", (endTime - startTime) < 5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not reliable to assert or depend on elapsed time. Lucene is run of many different machines and this timing will differ.

Copy link
Contributor

@vigyasharma vigyasharma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @DivyanshIITB,

Thanks for your work here, a concurrent multi-tenant merge scheduler is an ambitious feature. This area of the codebase, especially dealing with how IndexWriter interacts with merging, can be quite complex.

I'd suggest the following steps to help things along:

  1. First, please ensure your code compiles and all existing tests pass. You can run ./gradlew check to verify this.
  2. A helpful approach would be to try swapping out the existing ConcurrentMergeScheduler (CMS) with your MultiTenantMergeScheduler (MTMS) in your local branch and running the test suite. This can reveal some subtle edge cases that your scheduler will need to handle.
  3. You can use the existing tests for CMS as a guide for writing comprehensive tests for your MTMS.

This is a challenging task, and building familiarity with the codebase through smaller contributions can be very beneficial. Would you be interested in exploring some smaller, more focused issues to start?

Appreciate your effort and here to help.


@Override
public void close() throws IOException {
IndexWriter currentWriter = getCurrentIndexWriter(); // Method to get the calling writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not defined anywhere!

merge.waitForCompletion(); // Only wait for merges related to this writer
}

activeMerges.remove(currentWriter); // Cleanup after closing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't line up, activeMerges is a List, you're calling Map APIs on it. Did this compile?

@DivyanshIITB
Copy link
Contributor Author

Thankyou for your help @vigyasharma !
I would love to explore small and more focused issues to start with !

Copy link

github-actions bot commented Apr 3, 2025

This PR has not had activity in the past 2 weeks, labeling it as stale. If the PR is waiting for review, notify the dev@lucene.apache.org list. Thank you for your contribution!

@github-actions github-actions bot added the Stale label Apr 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A multi-tenant ConcurrentMergeScheduler
4 participants