Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool merge scheduler #120869

Conversation

albertzaharovits
Copy link
Contributor

@albertzaharovits albertzaharovits commented Jan 26, 2025

This adds a new merge scheduler implementation that uses a (new) dedicated thread pool to run the merges. This way the number of concurrent merges is limited to the number of threads in the pool (i.e. the number of allocated processors to the ES JVM).

It implements dynamic IO throttling (the same target IO rate for all merges, roughly, with caveats) that's adjusted based on the number of currently active (queued + running) merges.
Smaller merges are always preferred to larger ones, irrespective of the index shard that they're coming from.
The implementation also supports the per-shard "max thread count" and "max merge count" settings, the later being used today for indexing throttling.
Note that IO throttling, max merge count, and max thread count work similarly, but not identical, to their siblings in the ConcurrentMergeScheduler.

The per-shard merge statistics are not affected, and the thread-pool statistics should reflect the merge ones (i.e. the completed thread pool stats reflects the total number of merges, across shards, per node).

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, though I wonder if there is test instability problem and I'd like to avoid the static variables in the new test.

public class ThreadPoolMergeSchedulerStressTestIT extends ESSingleNodeTestCase {

private static final AtomicReference<ThreadPoolMergeExecutorService> MERGE_EXECUTOR_SERVICE_REFERENCE = new AtomicReference<>();
private static final Set<OneMerge> ENQUEUED_MERGES_SET = ConcurrentCollections.newConcurrentSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid the statics here by putting them on the plugin instead. You can grab the plugin using getInstance and then get to the variables. I prefer to avoid non-constant statics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree... when I embarked on the static variables strategy here there were 1-2 variables... will use the getInstance for the plugin instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored it in e4216d2

assertAllSuccessful(indicesAdmin().prepareRefresh("index").get());
var segmentsAfter = getSegmentsCountForAllShards("index");
// there should be way fewer segments after merging completed
assertThat(segmentsBefore, greaterThan(segmentsAfter));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not exactly sure we are guaranteed this. In the worst case, all merging concluded before we grab segmentsBefore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the worst case, all merging concluded before we grab segmentsBefore?

No, I don't think that's possible. There should be at least 50 merges blocked (because we grab segmentsBefore before the semaphore release), where each merge does between 2 and 3 segments and the segments that are covered by these blocked merges at the semaphore are not available to any other merges (IndexWriter handles this). Do you think it's possible that the merge policy selects merges of 2 segments which are then not reduced down to 1 segment? (there are no deletes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the frequently if-statement around acquire means that we risk no acquire at all - or just a few like the up to 5 initially released. But we could still have enough merge build up that the assert busy wait above (waiting for enough ENQUEUED_MERGES_SET) is satisfied.

Or did I misunderstand that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see you're thinking that it is possible that ENQUEUED_MERGES_SET gets up (to 50 at least) before thread pool's merge threads are blocked at the SEMAPHORE. Will also assertBusy that the SEMAPHORE is queueing threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed 0e300a7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced it does, since the threads may have no more iterations/work to do regardless of whether you capture it before or after they complete - and merging may complete between the assertbusy and capturing the set of segments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could instead check that we merged as far as we should. I.e., once all merging is done, grab the segments. Then do a force-merge (with no options/no max-segments, i.e., just trigger maybeMerge) and check that it did not do anything, i.e., did not invoke the scheduler (make it fail in that case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed a3f73b7 .

Here's how I thought about it: The test design is to accumulate (enqueue or backlog) outstanding merges, but without actually pausing merging completely (because I think this is more realistic). So there is some amount of merging completing all while most merges are stopped at a semaphore. When there's a specific limit of enqueued/backlogged merges, new merges won't stop at the semaphore anymore. In this state, the amount of enqueued/backlogged merges will oscillate below the specific limit. I don't think it's worth it to pause merging at the limit: it's artificial and not elegant to code.

It is true that, as you pointed out, the "oscillation" can in theory be so large that at some point in time the amount of enqueued/backlogged merges is actually 0. I think this is very very unlikely (given the limit value of 50), but if we measure the number of segments at this exact point the test will fail.
In order to account for this "oscillation", I've put the sampling of the number of segments in an assert busy and asserted a minimum value for it (given the limit of outstanding merges and the number of segments in a single merge). This way, we know that we're looking at a plausible number of segments to be merged. Later, when the test releases the merging semaphore, we expect these segments to be merged away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could instead check that we merged as far as we should. I.e., once all merging is done, grab the segments. Then do a force-merge (with no options/no max-segments, i.e., just trigger maybeMerge) and check that it did not do anything, i.e., did not invoke the scheduler (make it fail in that case).

I've pushed b187e1c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I've removed the merge count before merging caught up, 1a7db64 .

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Left one final comment on the stress test that needs sorting.

assertAllSuccessful(indicesAdmin().prepareRefresh("index").get());
var segmentsAfter = getSegmentsCountForAllShards("index");
// there should be way fewer segments after merging completed
assertThat(segmentsBefore, greaterThan(segmentsAfter));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could instead check that we merged as far as we should. I.e., once all merging is done, grab the segments. Then do a force-merge (with no options/no max-segments, i.e., just trigger maybeMerge) and check that it did not do anything, i.e., did not invoke the scheduler (make it fail in that case).

@albertzaharovits albertzaharovits merged commit fa46b87 into elastic:main Mar 18, 2025
16 of 17 checks passed
@albertzaharovits albertzaharovits deleted the threadpool-merge-scheduler-sort-all-merges-take-2 branch March 19, 2025 13:44
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the
// writer
// we deadlock on engine#close for instance.
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case the queue in the target thread pool is full we will wait for other flushes are completed. Do it makes sense to queue multiple flush requests if they are all identical? It may be we should schedule for flush only when no other flush has been scheduled yet, so I wonder why we do actually need a thread pool for it (a simple thread checking pre-condition and flushing time to time would serve as well).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduling a new flush will not wait, the underlying queue is unbounded. Also, this path is not a hot path at all, since it is only entered for write-idle shards. In case multiple flushes should enter the queue, only the first one is likely to do real work.

We do prefer a thread pool to ensure we can do other merges while the flush happens.

This behavior is unchanged by this PR.

smalyshev pushed a commit to smalyshev/elasticsearch that referenced this pull request Mar 21, 2025
This adds a new merge scheduler implementation that uses a (new)
dedicated thread pool to run the merges. This way the number of
concurrent merges is limited to the number of threads in the pool
(i.e. the number of allocated processors to the ES JVM).

It implements dynamic IO throttling (the same target IO rate for all
merges, roughly, with caveats) that's adjusted based on the number
of currently active (queued + running) merges.
Smaller merges are always preferred to larger ones, irrespective of
the index shard that they're coming from.
The implementation also supports the per-shard "max thread count"
and "max merge count" settings, the later being used today for indexing throttling.
Note that IO throttling, max merge count, and max thread count work similarly,
but not identical, to their siblings in the ConcurrentMergeScheduler.

The per-shard merge statistics are not affected, and the thread-pool statistics should
reflect the merge ones (i.e. the completed thread pool stats reflects the total
number of merges, across shards, per node).
omricohenn pushed a commit to omricohenn/elasticsearch that referenced this pull request Mar 28, 2025
This adds a new merge scheduler implementation that uses a (new)
dedicated thread pool to run the merges. This way the number of
concurrent merges is limited to the number of threads in the pool
(i.e. the number of allocated processors to the ES JVM).

It implements dynamic IO throttling (the same target IO rate for all
merges, roughly, with caveats) that's adjusted based on the number
of currently active (queued + running) merges.
Smaller merges are always preferred to larger ones, irrespective of
the index shard that they're coming from.
The implementation also supports the per-shard "max thread count"
and "max merge count" settings, the later being used today for indexing throttling.
Note that IO throttling, max merge count, and max thread count work similarly,
but not identical, to their siblings in the ConcurrentMergeScheduler.

The per-shard merge statistics are not affected, and the thread-pool statistics should
reflect the merge ones (i.e. the completed thread pool stats reflects the total
number of merges, across shards, per node).
albertzaharovits added a commit that referenced this pull request Mar 30, 2025
…ep up with the merge load (#125654)

Fixes an issue where indexing throttling kicks in while disk IO is throttling.
Instead disk IO should first unthrottle, and only then, if we still can't keep up with the merging load, start throttling indexing.

Fixes elastic/elasticsearch-benchmarks#2437
Relates #120869
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >feature serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants