Skip to content

Commit a90a692

Browse files
committed
aio: Try repeatedly to give batched IOs to workers.
Previously, if the first of a batch of IOs didn't fit in a batch we'd run all of them synchronously. Andres rightly pointed out that we should really try again between synchronous IOs, since the workers might have made progress. Suggested-by: Andres Freund <andres@anarazel.de>
1 parent 99c9a30 commit a90a692

File tree

1 file changed

+27
-3
lines changed

1 file changed

+27
-3
lines changed

src/backend/storage/aio/method_worker.c

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,12 +280,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
280280
SetLatch(wakeup);
281281

282282
/* Run whatever is left synchronously. */
283-
if (nsync > 0)
283+
for (int i = 0; i < nsync; ++i)
284284
{
285-
for (int i = 0; i < nsync; ++i)
285+
wakeup = NULL;
286+
287+
/*
288+
* Between synchronous IO operations, try again to enqueue as many as
289+
* we can.
290+
*/
291+
if (i > 0)
286292
{
287-
pgaio_io_perform_synchronously(synchronous_ios[i]);
293+
wakeup = NULL;
294+
295+
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
296+
while (i < nsync &&
297+
pgaio_worker_submission_queue_insert(synchronous_ios[i]))
298+
{
299+
if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
300+
wakeup = io_worker_control->workers[worker].latch;
301+
i++;
302+
}
303+
LWLockRelease(AioWorkerSubmissionQueueLock);
304+
305+
if (wakeup)
306+
SetLatch(wakeup);
307+
308+
if (i == nsync)
309+
break;
288310
}
311+
312+
pgaio_io_perform_synchronously(synchronous_ios[i]);
289313
}
290314
}
291315

0 commit comments

Comments
 (0)