Skip to content

Commit a4ae763

Browse files
committed
Remove page alignment in ExchangeSink
1 parent 6036a1e commit a4ae763

File tree

6 files changed

+10
-23
lines changed

6 files changed

+10
-23
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.io.IOException;
2525
import java.util.Objects;
26-
import java.util.function.Function;
2726
import java.util.function.Supplier;
2827

2928
/**
@@ -32,17 +31,14 @@
3231
public class ExchangeSinkOperator extends SinkOperator {
3332

3433
private final ExchangeSink sink;
35-
private final Function<Page, Page> transformer;
3634
private int pagesReceived;
3735
private long rowsReceived;
3836

39-
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
40-
implements
41-
SinkOperatorFactory {
37+
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {
4238

4339
@Override
4440
public SinkOperator get(DriverContext driverContext) {
45-
return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
41+
return new ExchangeSinkOperator(exchangeSinks.get());
4642
}
4743

4844
@Override
@@ -51,9 +47,8 @@ public String describe() {
5147
}
5248
}
5349

54-
public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
50+
public ExchangeSinkOperator(ExchangeSink sink) {
5551
this.sink = sink;
56-
this.transformer = transformer;
5752
}
5853

5954
@Override
@@ -84,7 +79,7 @@ public boolean needsInput() {
8479
protected void doAddInput(Page page) {
8580
pagesReceived++;
8681
rowsReceived += page.getPositionCount();
87-
sink.addPage(transformer.apply(page));
82+
sink.addPage(page);
8883
}
8984

9085
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.concurrent.CyclicBarrier;
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicInteger;
48-
import java.util.function.Function;
4948
import java.util.function.LongSupplier;
5049

5150
import static org.hamcrest.Matchers.either;
@@ -328,7 +327,7 @@ public void testEarlyTermination() {
328327
final int maxAllowedRows = between(1, 100);
329328
final AtomicInteger processedRows = new AtomicInteger(0);
330329
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
331-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
330+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
332331
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
333332
@Override
334333
public Block eval(Page page) {
@@ -365,7 +364,7 @@ public void testResumeOnEarlyFinish() throws Exception {
365364
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
366365
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
367366
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
368-
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
367+
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
369368
Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator);
370369
PlainActionFuture<Void> future = new PlainActionFuture<>();
371370
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.Iterator;
4040
import java.util.List;
4141
import java.util.Set;
42-
import java.util.function.Function;
4342
import java.util.stream.Collectors;
4443
import java.util.stream.IntStream;
4544
import java.util.stream.Stream;
@@ -242,7 +241,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
242241
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
243242
intermediateOperatorItr.next()
244243
),
245-
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity())
244+
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}))
246245
)
247246
);
248247
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.atomic.AtomicBoolean;
6767
import java.util.concurrent.atomic.AtomicInteger;
68-
import java.util.function.Function;
6968
import java.util.function.Supplier;
7069
import java.util.stream.Collectors;
7170
import java.util.stream.IntStream;
@@ -305,7 +304,7 @@ Set<Integer> runConcurrentTest(
305304
"sink-" + i,
306305
dc,
307306
seqNoGenerator.get(dc),
308-
new ExchangeSinkOperator(exchangeSink.get(), Function.identity())
307+
new ExchangeSinkOperator(exchangeSink.get())
309308
);
310309
drivers.add(d);
311310
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec {
2727
);
2828

2929
private final List<Attribute> output;
30+
// TODO: remove this flag
3031
private final boolean intermediateAgg;
3132

3233
public ExchangeSinkExec(Source source, List<Attribute> output, boolean intermediateAgg, PhysicalPlan child) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,8 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution
385385
private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
386386
Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided");
387387
var child = exchangeSink.child();
388-
389388
PhysicalOperation source = plan(child, context);
390-
391-
Function<Page, Page> transformer = exchangeSink.isIntermediateAgg()
392-
? Function.identity()
393-
: alignPageToAttributes(exchangeSink.output(), source.layout);
394-
395-
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
389+
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
396390
}
397391

398392
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {

0 commit comments

Comments
 (0)