Skip to content

Commit fccbfc4

Browse files
gh-129195: use future_add_to_awaited_by/future_discard_from_awaited_by in asyncio.staggered.staggered_race (#129253)
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
1 parent a156b19 commit fccbfc4

File tree

3 files changed

+70
-1
lines changed

3 files changed

+70
-1
lines changed

Lib/asyncio/staggered.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from . import exceptions as exceptions_mod
99
from . import locks
1010
from . import tasks
11+
from . import futures
1112

1213

1314
async def staggered_race(coro_fns, delay, *, loop=None):
@@ -63,6 +64,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
6364
"""
6465
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
6566
loop = loop or events.get_running_loop()
67+
parent_task = tasks.current_task(loop)
6668
enum_coro_fns = enumerate(coro_fns)
6769
winner_result = None
6870
winner_index = None
@@ -73,6 +75,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
7375

7476
def task_done(task):
7577
running_tasks.discard(task)
78+
futures.future_discard_from_awaited_by(task, parent_task)
7679
if (
7780
on_completed_fut is not None
7881
and not on_completed_fut.done()
@@ -110,6 +113,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
110113
this_failed = locks.Event()
111114
next_ok_to_start = locks.Event()
112115
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
116+
futures.future_add_to_awaited_by(next_task, parent_task)
113117
running_tasks.add(next_task)
114118
next_task.add_done_callback(task_done)
115119
# next_task has been appended to running_tasks so next_task is ok to
@@ -148,6 +152,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
148152
try:
149153
ok_to_start = locks.Event()
150154
first_task = loop.create_task(run_one_coro(ok_to_start, None))
155+
futures.future_add_to_awaited_by(first_task, parent_task)
151156
running_tasks.add(first_task)
152157
first_task.add_done_callback(task_done)
153158
# first_task has been appended to running_tasks so first_task is ok to start.
@@ -171,4 +176,4 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
171176
raise propagate_cancellation_error
172177
return winner_result, winner_index, exceptions
173178
finally:
174-
del exceptions, propagate_cancellation_error, unhandled_exceptions
179+
del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task

Lib/test/test_external_inspection.py

+63
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,69 @@ async def main():
286286
]
287287
self.assertEqual(stack_trace, expected_stack_trace)
288288

289+
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
290+
"Test only runs on Linux and MacOS")
291+
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
292+
"Test only runs on Linux with process_vm_readv support")
293+
def test_async_staggered_race_remote_stack_trace(self):
294+
# Spawn a process with some realistic Python code
295+
script = textwrap.dedent("""\
296+
import asyncio.staggered
297+
import time
298+
import sys
299+
300+
async def deep():
301+
await asyncio.sleep(0)
302+
fifo_path = sys.argv[1]
303+
with open(fifo_path, "w") as fifo:
304+
fifo.write("ready")
305+
time.sleep(10000)
306+
307+
async def c1():
308+
await asyncio.sleep(0)
309+
await deep()
310+
311+
async def c2():
312+
await asyncio.sleep(10000)
313+
314+
async def main():
315+
await asyncio.staggered.staggered_race(
316+
[c1, c2],
317+
delay=None,
318+
)
319+
320+
asyncio.run(main())
321+
""")
322+
stack_trace = None
323+
with os_helper.temp_dir() as work_dir:
324+
script_dir = os.path.join(work_dir, "script_pkg")
325+
os.mkdir(script_dir)
326+
fifo = f"{work_dir}/the_fifo"
327+
os.mkfifo(fifo)
328+
script_name = _make_test_script(script_dir, 'script', script)
329+
try:
330+
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
331+
with open(fifo, "r") as fifo_file:
332+
response = fifo_file.read()
333+
self.assertEqual(response, "ready")
334+
stack_trace = get_async_stack_trace(p.pid)
335+
except PermissionError:
336+
self.skipTest(
337+
"Insufficient permissions to read the stack trace")
338+
finally:
339+
os.remove(fifo)
340+
p.kill()
341+
p.terminate()
342+
p.wait(timeout=SHORT_TIMEOUT)
343+
344+
# sets are unordered, so we want to sort "awaited_by"s
345+
stack_trace[2].sort(key=lambda x: x[1])
346+
347+
expected_stack_trace = [
348+
['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]]
349+
]
350+
self.assertEqual(stack_trace, expected_stack_trace)
351+
289352
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
290353
"Test only runs on Linux and MacOS")
291354
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support reporting call graph information from :func:`!asyncio.staggered.staggered_race`.

0 commit comments

Comments
 (0)