Skip to content

Commit d32f060

Browse files
ericltamohannes
authored andcommitted
[data] New executor backend [1/n]--- Add basic interfaces (ray-project#31216)
This PR adds the basic interfaces and feature flags; split out from https://github.com/ray-project/ray/pull/30903/files See REP ray-project/enhancements#18 for more details. Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
1 parent 7a5eec5 commit d32f060

File tree

3 files changed

+302
-5
lines changed

3 files changed

+302
-5
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
from dataclasses import dataclass
2+
from typing import Dict, List, Optional, Iterator, Tuple
3+
4+
import ray
5+
from ray.data._internal.stats import DatasetStats, StatsDict
6+
from ray.data.block import Block, BlockMetadata
7+
from ray.types import ObjectRef
8+
9+
10+
@dataclass
11+
class RefBundle:
12+
"""A group of data block references and their metadata.
13+
14+
Operators take in and produce streams of RefBundles.
15+
16+
Most commonly an RefBundle consists of a single block object reference.
17+
In some cases, e.g., due to block splitting, or for a reduce task, there may
18+
be more than one block.
19+
20+
Block bundles have ownership semantics, i.e., shared ownership (similar to C++
21+
shared_ptr, multiple operators share the same block bundle), or unique ownership
22+
(similar to C++ unique_ptr, only one operator owns the block bundle). This
23+
allows operators to know whether they can destroy blocks when they don't need
24+
them. Destroying blocks eagerly is more efficient than waiting for Python GC /
25+
Ray reference counting to kick in.
26+
"""
27+
28+
# The size_bytes must be known in the metadata, num_rows is optional.
29+
blocks: List[Tuple[ObjectRef[Block], BlockMetadata]]
30+
31+
# Whether we own the blocks (can safely destroy them).
32+
owns_blocks: bool
33+
34+
def __post_init__(self):
35+
for b in self.blocks:
36+
assert isinstance(b, tuple), b
37+
assert len(b) == 2, b
38+
assert isinstance(b[0], ray.ObjectRef), b
39+
assert isinstance(b[1], BlockMetadata), b
40+
if b[1].size_bytes is None:
41+
raise ValueError(
42+
"The size in bytes of the block must be known: {}".format(b)
43+
)
44+
45+
def num_rows(self) -> Optional[int]:
46+
"""Number of rows present in this bundle, if known."""
47+
total = 0
48+
for b in self.blocks:
49+
if b[1].num_rows is None:
50+
return None
51+
else:
52+
total += b[1].num_rows
53+
return total
54+
55+
def size_bytes(self) -> int:
56+
"""Size of the blocks of this bundle in bytes."""
57+
return sum(b[1].size_bytes for b in self.blocks)
58+
59+
def destroy_if_owned(self) -> int:
60+
"""Clears the object store memory for these blocks if owned.
61+
62+
Returns:
63+
The number of bytes freed.
64+
"""
65+
raise NotImplementedError
66+
67+
68+
@dataclass
69+
class ExecutionOptions:
70+
"""Common options for execution.
71+
72+
Some options may not be supported on all executors (e.g., parallelism limit).
73+
"""
74+
75+
# Max number of in flight tasks. This is a soft limit, and is not supported in
76+
# bulk execution mode.
77+
parallelism_limit: Optional[int] = None
78+
79+
# Example: set to 1GB and executor will try to limit object store
80+
# memory usage to 1GB. This is a soft limit, and is not supported in
81+
# bulk execution mode.
82+
memory_limit_bytes: Optional[int] = None
83+
84+
# Set this to prefer running tasks on the same node as the output
85+
# node (node driving the execution).
86+
locality_with_output: bool = False
87+
88+
# Always preserve ordering of blocks, even if using operators that
89+
# don't require it.
90+
preserve_order: bool = True
91+
92+
93+
class PhysicalOperator:
94+
"""Abstract class for physical operators.
95+
96+
An operator transforms one or more input streams of RefBundles into a single
97+
output stream of RefBundles.
98+
99+
Operators are stateful and non-serializable; they live on the driver side of the
100+
Dataset execution only.
101+
102+
Here's a simple example of implementing a basic "Map" operator:
103+
104+
class Map(PhysicalOperator):
105+
def __init__(self):
106+
self.active_tasks = []
107+
108+
def add_input(self, refs):
109+
self.active_tasks.append(map_task.remote(refs))
110+
111+
def has_next(self):
112+
ready, _ = ray.wait(self.active_tasks, timeout=0)
113+
return len(ready) > 0
114+
115+
def get_next(self):
116+
ready, remaining = ray.wait(self.active_tasks, num_returns=1)
117+
self.active_tasks = remaining
118+
return ready[0]
119+
120+
Note that the above operator fully supports both bulk and streaming execution,
121+
since `add_input` and `get_next` can be called in any order. In bulk execution,
122+
all inputs would be added up-front, but in streaming execution the calls could
123+
be interleaved.
124+
"""
125+
126+
def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]):
127+
self._name = name
128+
self._input_dependencies = input_dependencies
129+
for x in input_dependencies:
130+
assert isinstance(x, PhysicalOperator), x
131+
132+
@property
133+
def name(self) -> str:
134+
return self._name
135+
136+
@property
137+
def input_dependencies(self) -> List["PhysicalOperator"]:
138+
"""List of operators that provide inputs for this operator."""
139+
assert hasattr(
140+
self, "_input_dependencies"
141+
), "PhysicalOperator.__init__() was not called."
142+
return self._input_dependencies
143+
144+
def get_stats(self) -> StatsDict:
145+
"""Return recorded execution stats for use with DatasetStats."""
146+
raise NotImplementedError
147+
148+
def get_metrics(self) -> Dict[str, int]:
149+
"""Returns dict of metrics reported from this operator.
150+
151+
These should be instant values that can be queried at any time, e.g.,
152+
obj_store_mem_allocated, obj_store_mem_freed.
153+
"""
154+
return {}
155+
156+
def __reduce__(self):
157+
raise ValueError("PhysicalOperator is not serializable.")
158+
159+
def __str__(self):
160+
if self.input_dependencies:
161+
out_str = ", ".join([str(x) for x in self.input_dependencies])
162+
out_str += " -> "
163+
else:
164+
out_str = ""
165+
out_str += f"{self.__class__.__name__}[{self._name}]"
166+
return out_str
167+
168+
def num_outputs_total(self) -> Optional[int]:
169+
"""Returns the total number of output bundles of this operator, if known.
170+
171+
This is useful for reporting progress.
172+
"""
173+
if len(self.input_dependencies) == 1:
174+
return self.input_dependencies[0].num_outputs_total()
175+
return None
176+
177+
def add_input(self, refs: RefBundle, input_index: int) -> None:
178+
"""Called when an upstream result is available.
179+
180+
Inputs may be added in any order, and calls to `add_input` may be interleaved
181+
with calls to `get_next` / `has_next` to implement streaming execution.
182+
183+
Args:
184+
refs: The ref bundle that should be added as input.
185+
input_index: The index identifying the input dependency producing the
186+
input. For most operators, this is always `0` since there is only
187+
one upstream input operator.
188+
"""
189+
raise NotImplementedError
190+
191+
def inputs_done(self, input_index: int) -> None:
192+
"""Called when an upstream operator finishes.
193+
194+
This is called exactly once per input dependency. After this is called, the
195+
upstream operator guarantees no more inputs will be added via `add_input`
196+
for that input index.
197+
198+
Args:
199+
input_index: The index identifying the input dependency producing the
200+
input. For most operators, this is always `0` since there is only
201+
one upstream input operator.
202+
"""
203+
pass
204+
205+
def has_next(self) -> bool:
206+
"""Returns when a downstream output is available.
207+
208+
When this returns true, it is safe to call `get_next()`.
209+
"""
210+
raise NotImplementedError
211+
212+
def get_next(self) -> RefBundle:
213+
"""Get the next downstream output.
214+
215+
It is only allowed to call this if `has_next()` has returned True.
216+
"""
217+
raise NotImplementedError
218+
219+
def get_work_refs(self) -> List[ray.ObjectRef]:
220+
"""Get a list of object references the executor should wait on.
221+
222+
When a reference becomes ready, the executor must call
223+
`notify_work_completed(ref)` to tell this operator of the state change.
224+
"""
225+
return []
226+
227+
def notify_work_completed(self, work_ref: ray.ObjectRef) -> None:
228+
"""Executor calls this when the given work is completed and local.
229+
230+
This must be called as soon as the operator is aware that `work_ref` is
231+
ready.
232+
"""
233+
raise NotImplementedError
234+
235+
def shutdown(self) -> None:
236+
"""Abort execution and release all resources used by this operator.
237+
238+
This release any Ray resources acquired by this operator such as active
239+
tasks, actors, and objects.
240+
"""
241+
pass
242+
243+
244+
class Executor:
245+
"""Abstract class for executors, which implement physical operator execution.
246+
247+
Subclasses:
248+
BulkExecutor
249+
StreamingExecutor
250+
"""
251+
252+
def __init__(self, options: ExecutionOptions):
253+
"""Create the executor."""
254+
self._options = options
255+
256+
def execute(
257+
self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None
258+
) -> Iterator[RefBundle]:
259+
"""Start execution.
260+
261+
Args:
262+
dag: The operator graph to execute.
263+
initial_stats: The DatasetStats to prepend to the stats returned by the
264+
executor. These stats represent actions done to compute inputs.
265+
"""
266+
raise NotImplementedError
267+
268+
def get_stats(self) -> DatasetStats:
269+
"""Return stats for the execution so far.
270+
271+
This is generally called after `execute` has completed, but may be called
272+
while iterating over `execute` results for streaming execution.
273+
"""
274+
raise NotImplementedError

python/ray/data/_internal/stats.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
STATS_ACTOR_NAME = "datasets_stats_actor"
1515
STATS_ACTOR_NAMESPACE = "_dataset_stats_actor"
1616

17+
StatsDict = Dict[str, List[BlockMetadata]]
18+
1719

1820
def fmt(seconds: float) -> str:
1921
if seconds > 1:
@@ -57,9 +59,7 @@ def __init__(self, stage_name: str, parent: "DatasetStats"):
5759
self.parent = parent
5860
self.start_time = time.perf_counter()
5961

60-
def build_multistage(
61-
self, stages: Dict[str, List[BlockMetadata]]
62-
) -> "DatasetStats":
62+
def build_multistage(self, stages: StatsDict) -> "DatasetStats":
6363
stage_infos = {}
6464
for i, (k, v) in enumerate(stages.items()):
6565
if i == 0:
@@ -168,7 +168,7 @@ class DatasetStats:
168168
def __init__(
169169
self,
170170
*,
171-
stages: Dict[str, List[BlockMetadata]],
171+
stages: StatsDict,
172172
parent: Union[Optional["DatasetStats"], List["DatasetStats"]],
173173
needs_stats_actor: bool = False,
174174
stats_uuid: str = None,
@@ -189,7 +189,7 @@ def __init__(
189189
base_name: The name of the base operation for a multi-stage operation.
190190
"""
191191

192-
self.stages: Dict[str, List[BlockMetadata]] = stages
192+
self.stages: StatsDict = stages
193193
if parent is not None and not isinstance(parent, list):
194194
parent = [parent]
195195
self.parents: List["DatasetStats"] = parent

python/ray/data/context.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,20 @@
6666
# Whether to use Polars for tabular dataset sorts, groupbys, and aggregations.
6767
DEFAULT_USE_POLARS = False
6868

69+
# Whether to use the new executor backend.
70+
DEFAULT_NEW_EXECUTION_BACKEND = bool(
71+
int(os.environ.get("RAY_DATASET_NEW_EXECUTION_BACKEND", "1"))
72+
)
73+
74+
# Whether to eagerly free memory (new backend only).
75+
DEFAULT_EAGER_FREE = bool(int(os.environ.get("RAY_DATASET_EAGER_FREE", "1")))
76+
77+
# Whether to trace allocations / eager free (new backend only). This adds significant
78+
# performance overheads and should only be used for debugging.
79+
DEFAULT_TRACE_ALLOCATIONS = bool(
80+
int(os.environ.get("RAY_DATASET_TRACE_ALLOCATIONS", "0"))
81+
)
82+
6983
# Whether to estimate in-memory decoding data size for data source.
7084
DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = True
7185

@@ -111,10 +125,13 @@ def __init__(
111125
pipeline_push_based_shuffle_reduce_tasks: bool,
112126
scheduling_strategy: SchedulingStrategyT,
113127
use_polars: bool,
128+
new_execution_backend: bool,
129+
eager_free: bool,
114130
decoding_size_estimation: bool,
115131
min_parallelism: bool,
116132
enable_tensor_extension_casting: bool,
117133
enable_auto_log_stats: bool,
134+
trace_allocations: bool,
118135
):
119136
"""Private constructor (use get_current() instead)."""
120137
self.block_splitting_enabled = block_splitting_enabled
@@ -133,10 +150,13 @@ def __init__(
133150
)
134151
self.scheduling_strategy = scheduling_strategy
135152
self.use_polars = use_polars
153+
self.new_execution_backend = new_execution_backend
154+
self.eager_free = eager_free
136155
self.decoding_size_estimation = decoding_size_estimation
137156
self.min_parallelism = min_parallelism
138157
self.enable_tensor_extension_casting = enable_tensor_extension_casting
139158
self.enable_auto_log_stats = enable_auto_log_stats
159+
self.trace_allocations = trace_allocations
140160

141161
@staticmethod
142162
def get_current() -> "DatasetContext":
@@ -168,12 +188,15 @@ def get_current() -> "DatasetContext":
168188
pipeline_push_based_shuffle_reduce_tasks=True,
169189
scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY,
170190
use_polars=DEFAULT_USE_POLARS,
191+
new_execution_backend=DEFAULT_NEW_EXECUTION_BACKEND,
192+
eager_free=DEFAULT_EAGER_FREE,
171193
decoding_size_estimation=DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED,
172194
min_parallelism=DEFAULT_MIN_PARALLELISM,
173195
enable_tensor_extension_casting=(
174196
DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING
175197
),
176198
enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS,
199+
trace_allocations=DEFAULT_TRACE_ALLOCATIONS,
177200
)
178201

179202
return _default_context

0 commit comments

Comments
 (0)