-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Threadpool merge scheduler #120869
Threadpool merge scheduler #120869
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, though I wonder if there is test instability problem and I'd like to avoid the static variables in the new test.
public class ThreadPoolMergeSchedulerStressTestIT extends ESSingleNodeTestCase { | ||
|
||
private static final AtomicReference<ThreadPoolMergeExecutorService> MERGE_EXECUTOR_SERVICE_REFERENCE = new AtomicReference<>(); | ||
private static final Set<OneMerge> ENQUEUED_MERGES_SET = ConcurrentCollections.newConcurrentSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can avoid the statics here by putting them on the plugin instead. You can grab the plugin using getInstance
and then get to the variables. I prefer to avoid non-constant statics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree... when I embarked on the static variables strategy here there were 1-2 variables... will use the getInstance for the plugin instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored it in e4216d2
...nalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java
Outdated
Show resolved
Hide resolved
assertAllSuccessful(indicesAdmin().prepareRefresh("index").get()); | ||
var segmentsAfter = getSegmentsCountForAllShards("index"); | ||
// there should be way fewer segments after merging completed | ||
assertThat(segmentsBefore, greaterThan(segmentsAfter)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not exactly sure we are guaranteed this. In the worst case, all merging concluded before we grab segmentsBefore
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the worst case, all merging concluded before we grab
segmentsBefore
?
No, I don't think that's possible. There should be at least 50 merges blocked (because we grab segmentsBefore
before the semaphore release), where each merge does between 2 and 3 segments and the segments that are covered by these blocked merges at the semaphore are not available to any other merges (IndexWriter
handles this). Do you think it's possible that the merge policy selects merges of 2 segments which are then not reduced down to 1 segment? (there are no deletes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the frequently
if-statement around acquire
means that we risk no acquire at all - or just a few like the up to 5 initially released. But we could still have enough merge build up that the assert busy wait above (waiting for enough ENQUEUED_MERGES_SET) is satisfied.
Or did I misunderstand that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see you're thinking that it is possible that ENQUEUED_MERGES_SET gets up (to 50 at least) before thread pool's merge threads are blocked at the SEMAPHORE. Will also assertBusy that the SEMAPHORE is queueing threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed 0e300a7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not convinced it does, since the threads may have no more iterations/work to do regardless of whether you capture it before or after they complete - and merging may complete between the assertbusy and capturing the set of segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could instead check that we merged as far as we should. I.e., once all merging is done, grab the segments. Then do a force-merge (with no options/no max-segments, i.e., just trigger maybeMerge) and check that it did not do anything, i.e., did not invoke the scheduler (make it fail in that case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed a3f73b7 .
Here's how I thought about it: The test design is to accumulate (enqueue or backlog) outstanding merges, but without actually pausing merging completely (because I think this is more realistic). So there is some amount of merging completing all while most merges are stopped at a semaphore. When there's a specific limit of enqueued/backlogged merges, new merges won't stop at the semaphore anymore. In this state, the amount of enqueued/backlogged merges will oscillate below the specific limit. I don't think it's worth it to pause merging at the limit: it's artificial and not elegant to code.
It is true that, as you pointed out, the "oscillation" can in theory be so large that at some point in time the amount of enqueued/backlogged merges is actually 0. I think this is very very unlikely (given the limit value of 50), but if we measure the number of segments at this exact point the test will fail.
In order to account for this "oscillation", I've put the sampling of the number of segments in an assert busy and asserted a minimum value for it (given the limit of outstanding merges and the number of segments in a single merge). This way, we know that we're looking at a plausible number of segments to be merged. Later, when the test releases the merging semaphore, we expect these segments to be merged away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could instead check that we merged as far as we should. I.e., once all merging is done, grab the segments. Then do a force-merge (with no options/no max-segments, i.e., just trigger maybeMerge) and check that it did not do anything, i.e., did not invoke the scheduler (make it fail in that case).
I've pushed b187e1c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, I've removed the merge count before merging caught up, 1a7db64 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Left one final comment on the stress test that needs sorting.
assertAllSuccessful(indicesAdmin().prepareRefresh("index").get()); | ||
var segmentsAfter = getSegmentsCountForAllShards("index"); | ||
// there should be way fewer segments after merging completed | ||
assertThat(segmentsBefore, greaterThan(segmentsAfter)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could instead check that we merged as far as we should. I.e., once all merging is done, grab the segments. Then do a force-merge (with no options/no max-segments, i.e., just trigger maybeMerge) and check that it did not do anything, i.e., did not invoke the scheduler (make it fail in that case).
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the | ||
// writer | ||
// we deadlock on engine#close for instance. | ||
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case the queue in the target thread pool is full we will wait for other flushes are completed. Do it makes sense to queue multiple flush requests if they are all identical? It may be we should schedule for flush only when no other flush has been scheduled yet, so I wonder why we do actually need a thread pool for it (a simple thread checking pre-condition and flushing time to time would serve as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scheduling a new flush will not wait, the underlying queue is unbounded. Also, this path is not a hot path at all, since it is only entered for write-idle shards. In case multiple flushes should enter the queue, only the first one is likely to do real work.
We do prefer a thread pool to ensure we can do other merges while the flush happens.
This behavior is unchanged by this PR.
This adds a new merge scheduler implementation that uses a (new) dedicated thread pool to run the merges. This way the number of concurrent merges is limited to the number of threads in the pool (i.e. the number of allocated processors to the ES JVM). It implements dynamic IO throttling (the same target IO rate for all merges, roughly, with caveats) that's adjusted based on the number of currently active (queued + running) merges. Smaller merges are always preferred to larger ones, irrespective of the index shard that they're coming from. The implementation also supports the per-shard "max thread count" and "max merge count" settings, the later being used today for indexing throttling. Note that IO throttling, max merge count, and max thread count work similarly, but not identical, to their siblings in the ConcurrentMergeScheduler. The per-shard merge statistics are not affected, and the thread-pool statistics should reflect the merge ones (i.e. the completed thread pool stats reflects the total number of merges, across shards, per node).
This adds a new merge scheduler implementation that uses a (new) dedicated thread pool to run the merges. This way the number of concurrent merges is limited to the number of threads in the pool (i.e. the number of allocated processors to the ES JVM). It implements dynamic IO throttling (the same target IO rate for all merges, roughly, with caveats) that's adjusted based on the number of currently active (queued + running) merges. Smaller merges are always preferred to larger ones, irrespective of the index shard that they're coming from. The implementation also supports the per-shard "max thread count" and "max merge count" settings, the later being used today for indexing throttling. Note that IO throttling, max merge count, and max thread count work similarly, but not identical, to their siblings in the ConcurrentMergeScheduler. The per-shard merge statistics are not affected, and the thread-pool statistics should reflect the merge ones (i.e. the completed thread pool stats reflects the total number of merges, across shards, per node).
…ep up with the merge load (#125654) Fixes an issue where indexing throttling kicks in while disk IO is throttling. Instead disk IO should first unthrottle, and only then, if we still can't keep up with the merging load, start throttling indexing. Fixes elastic/elasticsearch-benchmarks#2437 Relates #120869
This adds a new merge scheduler implementation that uses a (new) dedicated thread pool to run the merges. This way the number of concurrent merges is limited to the number of threads in the pool (i.e. the number of allocated processors to the ES JVM).
It implements dynamic IO throttling (the same target IO rate for all merges, roughly, with caveats) that's adjusted based on the number of currently active (queued + running) merges.
Smaller merges are always preferred to larger ones, irrespective of the index shard that they're coming from.
The implementation also supports the per-shard "max thread count" and "max merge count" settings, the later being used today for indexing throttling.
Note that IO throttling, max merge count, and max thread count work similarly, but not identical, to their siblings in the
ConcurrentMergeScheduler
.The per-shard merge statistics are not affected, and the thread-pool statistics should reflect the merge ones (i.e. the completed thread pool stats reflects the total number of merges, across shards, per node).