-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize ConcurrentMergeScheduler for Multi-Tenant Indexing #14335
base: main
Are you sure you want to change the base?
Optimize ConcurrentMergeScheduler for Multi-Tenant Indexing #14335
Conversation
This looks too naive to me, we don't want each index writer to have 1/N of the resources, which would prevent one writer from maxing out resources (e.g. if one index has a heavy write load while other indexes are only rarely updated). |
Thank you for the review, @jpountz! I see your concern regarding equal resource distribution across IndexWriter instances potentially leading to inefficiencies when some writers have a significantly heavier workload than others. My initial goal was to ensure fairness in multi-tenant environments, but I agree that a static 1/N allocation might not be ideal. To address this, I’m considering an adaptive resource allocation approach, where:
Would this approach align better with Lucene’s design goals? |
Merge throttling is now disabled by default, IMO it's fine to ignore merge throttling for now. Regarding thread creation, I'm thinking of a shared fixed (e.g. number of processors / 2) thread pool for merge scheduling. |
Thank you for the clarification, @jpountz! I'll drop the merge throttling aspect from the changes since it's disabled by default. |
Just a gentle reminder |
Apologies I had missed your reply.
It should be shared, we don't want the total number of threads to scale with the number of index writers. The reasoning for the numProcessors/2 number is that merging generally should not be more expensive than indexing, so by reserving only half the CPU capacity for merging, it should still be possible to max out hardware while indexing, while also having a peak number of threads running merges under numProcessors/2.
Idle writers would naturally submit fewer tasks than highly active writers. IMO the fixed allocation is key here. |
Thanks for the detailed clarification, @jpountz! I have made the necessary changes to the implementation:
@jpountz, I’ve made the requested changes. Please review when you get a chance. |
I think it'll be simpler to create a new merge scheduler rather than modify ConcurrentMergeScheduler. Also we'll need tests. |
Thanks for your feedback, @jpountz!
Please review and let me know if any further changes are needed! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DivyanshIITB : I appreciate your interest in this issue. A multi-tenant CMS however, has quite a few nuances that this implementation will need to cover. I'd encourage you to look at the different ways in which IndexWriter interacts with merging. Some examples of functionality I find missing include support for forceMerging, aborting a merge, and waiting till all merges complete.
I can help review future iterations that include these changes.
public class MultiTenantMergeScheduler extends MergeScheduler { | ||
|
||
// Shared global thread pool | ||
private static final ExecutorService MERGE_THREAD_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create a threadpool even when this policy is not used. Let's use lazy static initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! I've updated the implementation to use lazy static initialization by wrapping the thread pool in a nested static class. This ensures the thread pool is only created when first accessed, avoiding unnecessary resource allocation. Let me know if you have any further feedback! Now, instead of directly initializing MERGE_THREAD_POOL, I'm using a LazyHolder class with a getMergeThreadPool() method to ensure thread-safe lazy initialization. This should prevent the thread pool from being created when the policy is unused."
public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws IOException { | ||
while (true) { | ||
MergePolicy.OneMerge merge = mergeSource.getNextMerge(); | ||
if (merge == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use hasPendingMerges()
instead? I know existing schedulers assume null
means there are no more merges, but it's not the documented API behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out! I’ve updated the loop condition to use hasPendingMerges() instead of relying on null. This ensures better adherence to the documented API behavior. Let me know if this aligns with what you were suggesting!
// Submit merge task to the shared thread pool | ||
Future<?> future = MERGE_THREAD_POOL.submit(() -> { | ||
try { | ||
mergeSource.merge(merge); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
|
||
try { | ||
future.get(); // Ensure the task completes | ||
} catch (Exception e) { | ||
throw new IOException("Merge operation failed", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this achieves concurrent background merging like we'd want. I think this code will submit a merge request and block on its future for completion, effectively making all merges sequential.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback! You're absolutely right—blocking on future.get(); was making the merges sequential. I've removed it so that merges can now proceed asynchronously in the background. Let me know if this aligns with the intended behavior!
} | ||
|
||
@Override | ||
public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be a no-op. We should wait for all running merges of the calling IndexWriter to complete, similar to sync()
in ConcurrentMergeScheduler. This could be done by maintaining a per writer list of futures for running merges. We could run a clean up loop in merge()
that removes completed futures.
Also, I get that this thread pool is global and shared, but I don't like the idea of having a threadpool that can never be closed. How about we expose some hooks to close this scheduler if all consuming writers have closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out! I’ve updated close() to properly wait for all running merges, similar to sync() in ConcurrentMergeScheduler. Now, merge() tracks active tasks, and I've added cleanup for completed futures. Also, I introduced shutdownThreadPool() to allow safe global shutdown. Let me know if this works!
|
||
public class TestMultiTenantMergeScheduler extends LuceneTestCase { | ||
|
||
public void testMultiTenantMergeScheduler() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not sufficient. We need to test (and also implement) a lot more scenarios, like multiple writers consuming this scheduler, scheduling different merges and starting/closing at different times. You can refer to the tests in TestConcurrentMergeScheduler
as a starting point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback! I’ve expanded the test suite to include multiple writers consuming the scheduler, different merge scheduling times, and staggered start/close behavior. Also, I’ve referred to TestConcurrentMergeScheduler for structuring test cases to verify concurrency. Let me know if you have additional suggestions!
…-merge-scheduler-clean
71ac56f
to
b6e7556
Compare
6c356ff
to
13b914c
Compare
…tDeletionPolicy.java
I have a request to you. Kindly ignore the following two deleted files in the "Files Changed" section : Thanks! |
} | ||
|
||
@Override | ||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC IndexWriter.close()
will call MergeScheduler.close()
which means that you would wait for all merges across all indexes using this scheduler to complete. The IndexWriter
that is currently calling close()
will stop scheduling new merges, but the other writers may not, which means this may not terminate. To fix this you may need to map active merges back to the IndexWriter
or Directory
they belong to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1; my suggestion in the previous review was similar. We need a mapping of writer to merges and only block on merges specific to the calling writer.
private static class LazyHolder { | ||
static final ExecutorService MERGE_THREAD_POOL = | ||
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2); | ||
} | ||
|
||
private static ExecutorService getMergeThreadPool() { | ||
return LazyHolder.MERGE_THREAD_POOL; | ||
} | ||
|
||
// Use getMergeThreadPool() instead of direct access |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can skip the getMergeThreadPool()
function since we only need to access the threadpool internally. Something like:
private static class MergeThreadPool {
private static final ExecutorService INSTANCE =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
}
// and then just use MergeThreadPool.INSTANCE inside merge() and other functions.
// ...
}
}); | ||
|
||
// Cleanup completed merges | ||
activeMerges.removeIf(Future::isDone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this declared and initialized? When are merges added to activeMerges
?
} | ||
|
||
@Override | ||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1; my suggestion in the previous review was similar. We need a mapping of writer to merges and only block on merges specific to the calling writer.
MultiTenantMergeScheduler.shutdownThreadPool(); | ||
|
||
// Check if merging took less time than sequential execution would | ||
assertTrue("Merges did not happen concurrently!", (endTime - startTime) < 5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not reliable to assert or depend on elapsed time. Lucene is run of many different machines and this timing will differ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @DivyanshIITB,
Thanks for your work here, a concurrent multi-tenant merge scheduler is an ambitious feature. This area of the codebase, especially dealing with how IndexWriter
interacts with merging, can be quite complex.
I'd suggest the following steps to help things along:
- First, please ensure your code compiles and all existing tests pass. You can run
./gradlew check
to verify this. - A helpful approach would be to try swapping out the existing
ConcurrentMergeScheduler
(CMS) with yourMultiTenantMergeScheduler
(MTMS) in your local branch and running the test suite. This can reveal some subtle edge cases that your scheduler will need to handle. - You can use the existing tests for CMS as a guide for writing comprehensive tests for your MTMS.
This is a challenging task, and building familiarity with the codebase through smaller contributions can be very beneficial. Would you be interested in exploring some smaller, more focused issues to start?
Appreciate your effort and here to help.
|
||
@Override | ||
public void close() throws IOException { | ||
IndexWriter currentWriter = getCurrentIndexWriter(); // Method to get the calling writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not defined anywhere!
merge.waitForCompletion(); // Only wait for merges related to this writer | ||
} | ||
|
||
activeMerges.remove(currentWriter); // Cleanup after closing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't line up, activeMerges
is a List, you're calling Map APIs on it. Did this compile?
Thankyou for your help @vigyasharma ! |
This PR has not had activity in the past 2 weeks, labeling it as stale. If the PR is waiting for review, notify the dev@lucene.apache.org list. Thank you for your contribution! |
This PR enhances the ConcurrentMergeScheduler by introducing dynamic resource allocation for multi-tenant indexing scenarios.
Key Improvements:
These changes improve efficiency in multi-writer environments by dynamically adapting merge scheduling to system load.
Fixes #13883