Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions Lib/profiling/sampling/heatmap_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,10 @@ def __init__(self, *args, **kwargs):
self.callers_graph = collections.defaultdict(set)
self.function_definitions = {}

# Map each sampled line to its function for proper caller lookup
# (filename, lineno) -> funcname
self.line_to_function = {}

# Edge counting for call path analysis
self.edge_samples = collections.Counter()

Expand Down Expand Up @@ -596,6 +600,10 @@ def _record_line_sample(self, filename, lineno, funcname, is_leaf=False,
if funcname and (filename, funcname) not in self.function_definitions:
self.function_definitions[(filename, funcname)] = lineno

# Map this line to its function for caller/callee navigation
if funcname:
self.line_to_function[(filename, lineno)] = funcname

def _record_bytecode_sample(self, filename, lineno, opcode,
end_lineno=None, col_offset=None, end_col_offset=None,
weight=1):
Expand Down Expand Up @@ -1150,13 +1158,36 @@ def _format_specialization_color(self, spec_pct: int) -> str:
return f"rgba({r}, {g}, {b}, {alpha})"

def _build_navigation_buttons(self, filename: str, line_num: int) -> str:
"""Build navigation buttons for callers/callees."""
"""Build navigation buttons for callers/callees.

- Callers: All lines in a function show who calls this function
- Callees: Only actual call site lines show what they call
"""
line_key = (filename, line_num)
caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set()))

funcname = self.line_to_function.get(line_key)

# Get callers: look up by function definition line, not current line
# This ensures all lines in a function show who calls this function
if funcname:
func_def_line = self.function_definitions.get((filename, funcname), line_num)
func_def_key = (filename, func_def_line)
caller_list = self._deduplicate_by_function(self.callers_graph.get(func_def_key, set()))
else:
caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set()))

# Get callees: only show for actual call site lines (not every line in function)
callee_list = self._deduplicate_by_function(self.call_graph.get(line_key, set()))

# Get edge counts for each caller/callee
callers_with_counts = self._get_edge_counts(line_key, caller_list, is_caller=True)
# For callers, use the function definition key for edge lookup
if funcname:
func_def_line = self.function_definitions.get((filename, funcname), line_num)
caller_edge_key = (filename, func_def_line)
else:
caller_edge_key = line_key
callers_with_counts = self._get_edge_counts(caller_edge_key, caller_list, is_caller=True)
# For callees, use the actual line key since that's where the call happens
callees_with_counts = self._get_edge_counts(line_key, callee_list, is_caller=False)

# Build navigation buttons with counts
Expand Down
90 changes: 90 additions & 0 deletions Lib/test/test_profiling/test_heatmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,96 @@ def test_process_frames_with_file_samples_dict(self):
self.assertEqual(collector.file_samples['test.py'][10], 1)


def frame(filename, line, func):
"""Create a frame tuple: (filename, location, funcname, opcode)."""
return (filename, (line, line, -1, -1), func, None)


class TestHeatmapCollectorNavigationButtons(unittest.TestCase):
"""Test navigation button behavior for caller/callee relationships.

For every call stack:
- Root frames (entry points): only DOWN arrow (callees)
- Middle frames: both UP and DOWN arrows
- Leaf frames: only UP arrow (callers)
"""

def collect(self, *stacks):
"""Create collector and process frame stacks."""
collector = HeatmapCollector(sample_interval_usec=100)
for stack in stacks:
collector.process_frames(stack, thread_id=1)
return collector

def test_deep_call_stack_relationships(self):
"""Test root/middle/leaf navigation in a 5-level call stack."""
# Stack: root -> A -> B -> C -> leaf
stack = [
frame('leaf.py', 5, 'leaf'),
frame('c.py', 10, 'func_c'),
frame('b.py', 15, 'func_b'),
frame('a.py', 20, 'func_a'),
frame('root.py', 25, 'root'),
]
c = self.collect(stack)

# Root: only callees (no one calls it)
self.assertIn(('root.py', 25), c.call_graph)
self.assertNotIn(('root.py', 25), c.callers_graph)

# Middle frames: both callers and callees
for key in [('a.py', 20), ('b.py', 15), ('c.py', 10)]:
self.assertIn(key, c.call_graph)
self.assertIn(key, c.callers_graph)

# Leaf: only callers (doesn't call anyone)
self.assertNotIn(('leaf.py', 5), c.call_graph)
self.assertIn(('leaf.py', 5), c.callers_graph)

def test_all_lines_in_function_see_callers(self):
"""Test that interior lines map to their function for caller lookup."""
# Same function sampled at different lines (12, 15, 10)
c = self.collect(
[frame('mod.py', 12, 'my_func'), frame('caller.py', 100, 'caller')],
[frame('mod.py', 15, 'my_func'), frame('caller.py', 100, 'caller')],
[frame('mod.py', 10, 'my_func'), frame('caller.py', 100, 'caller')],
)

# All lines should map to same function
for line in [10, 12, 15]:
self.assertEqual(c.line_to_function[('mod.py', line)], 'my_func')

# Function definition line should have callers
func_def = c.function_definitions[('mod.py', 'my_func')]
self.assertIn(('mod.py', func_def), c.callers_graph)

def test_multiple_callers_and_callees(self):
"""Test multiple callers/callees are recorded correctly."""
# Two callers -> target, and caller -> two callees
c = self.collect(
[frame('target.py', 10, 'target'), frame('caller1.py', 20, 'c1')],
[frame('target.py', 10, 'target'), frame('caller2.py', 30, 'c2')],
[frame('callee1.py', 5, 'f1'), frame('dispatcher.py', 40, 'dispatch')],
[frame('callee2.py', 6, 'f2'), frame('dispatcher.py', 40, 'dispatch')],
)

# Target has 2 callers
callers = c.callers_graph[('target.py', 10)]
self.assertEqual({x[0] for x in callers}, {'caller1.py', 'caller2.py'})

# Dispatcher has 2 callees
callees = c.call_graph[('dispatcher.py', 40)]
self.assertEqual({x[0] for x in callees}, {'callee1.py', 'callee2.py'})

def test_edge_samples_counted(self):
"""Test that repeated calls accumulate edge counts."""
stack = [frame('callee.py', 10, 'callee'), frame('caller.py', 20, 'caller')]
c = self.collect(stack, stack, stack)

edge_key = (('caller.py', 20), ('callee.py', 10))
self.assertEqual(c.edge_samples[edge_key], 3)


class TestHeatmapCollectorExport(unittest.TestCase):
"""Test HeatmapCollector.export() method."""

Expand Down
Loading