Skip to content

Commit f8aff26

Browse files
authored
[WIP] Cluster resource limits (dask#266)
* [WIP] Cluster resource limits This adds resource limits *per cluster*. Currently we support: - Max cores per cluster - Max memory per cluster - Max workers per cluster At runtime these are normalized into a single `cluster_max_workers` field, which is used to check incoming scale and adapt requests. If a request from a user exceeds the limit, it is trimmed to be within bounds, and a warning is raised in the user's terminal notifying them of the limit. Still needs tests and docs. * Update, add tests * Add docs
1 parent 652ed95 commit f8aff26

File tree

9 files changed

+326
-8
lines changed

9 files changed

+326
-8
lines changed

dask-gateway-server/dask_gateway_server/backends/base.py

+132-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
import asyncio
22

33
import aiohttp
4-
from traitlets import Instance, Integer, Float, Dict, Union, Unicode, default
4+
from traitlets import (
5+
Instance,
6+
Integer,
7+
Float,
8+
Dict,
9+
Union,
10+
Unicode,
11+
default,
12+
validate,
13+
observe,
14+
)
515
from traitlets.config import LoggingConfigurable, Configurable
616

717
from .. import models
818
from ..options import Options
919
from ..traitlets import MemoryLimit, Type, Callable, Command
10-
from ..utils import awaitable
20+
from ..utils import awaitable, format_bytes
1121

1222

1323
__all__ = ("Backend", "ClusterConfig")
@@ -320,6 +330,126 @@ class ClusterConfig(Configurable):
320330
config=True,
321331
)
322332

333+
cluster_max_memory = MemoryLimit(
334+
None,
335+
help="""
336+
The maximum amount of memory (in bytes) available to this cluster.
337+
Allows the following suffixes:
338+
339+
- K -> Kibibytes
340+
- M -> Mebibytes
341+
- G -> Gibibytes
342+
- T -> Tebibytes
343+
344+
Set to ``None`` for no memory limit (default).
345+
""",
346+
min=0,
347+
allow_none=True,
348+
config=True,
349+
)
350+
351+
cluster_max_cores = Float(
352+
None,
353+
help="""
354+
The maximum number of cores available to this cluster.
355+
356+
Set to ``None`` for no cores limit (default).
357+
""",
358+
min=0.0,
359+
allow_none=True,
360+
config=True,
361+
)
362+
363+
cluster_max_workers = Integer(
364+
help="""
365+
The maximum number of workers available to this cluster.
366+
367+
Note that this will be combined with ``cluster_max_cores`` and
368+
``cluster_max_memory`` at runtime to determine the actual maximum
369+
number of workers available to this cluster.
370+
""",
371+
allow_none=True,
372+
min=0,
373+
config=True,
374+
)
375+
376+
def _check_scheduler_memory(self, scheduler_memory, cluster_max_memory):
377+
if cluster_max_memory is not None and scheduler_memory > cluster_max_memory:
378+
memory = format_bytes(scheduler_memory)
379+
limit = format_bytes(cluster_max_memory)
380+
raise ValueError(
381+
f"Scheduler memory request of {memory} exceeds cluster memory "
382+
f"limit of {limit}"
383+
)
384+
385+
def _check_scheduler_cores(self, scheduler_cores, cluster_max_cores):
386+
if cluster_max_cores is not None and scheduler_cores > cluster_max_cores:
387+
raise ValueError(
388+
f"Scheduler cores request of {scheduler_cores} exceeds cluster "
389+
f"cores limit of {cluster_max_cores}"
390+
)
391+
392+
def _worker_limit_from_resources(self):
393+
inf = max_workers = float("inf")
394+
if self.cluster_max_memory is not None:
395+
max_workers = min(
396+
(self.cluster_max_memory - self.scheduler_memory) // self.worker_memory,
397+
max_workers,
398+
)
399+
if self.cluster_max_cores is not None:
400+
max_workers = min(
401+
(self.cluster_max_cores - self.scheduler_cores) // self.worker_cores,
402+
max_workers,
403+
)
404+
405+
if max_workers == inf:
406+
return None
407+
return max(0, int(max_workers))
408+
409+
@validate("scheduler_memory")
410+
def _validate_scheduler_memory(self, proposal):
411+
self._check_scheduler_memory(proposal.value, self.cluster_max_memory)
412+
return proposal.value
413+
414+
@validate("scheduler_cores")
415+
def _validate_scheduler_cores(self, proposal):
416+
self._check_scheduler_cores(proposal.value, self.cluster_max_cores)
417+
return proposal.value
418+
419+
@validate("cluster_max_memory")
420+
def _validate_cluster_max_memory(self, proposal):
421+
self._check_scheduler_memory(self.scheduler_memory, proposal.value)
422+
return proposal.value
423+
424+
@validate("cluster_max_cores")
425+
def _validate_cluster_max_cores(self, proposal):
426+
self._check_scheduler_cores(self.scheduler_cores, proposal.value)
427+
return proposal.value
428+
429+
@validate("cluster_max_workers")
430+
def _validate_cluster_max_workers(self, proposal):
431+
lim = self._worker_limit_from_resources()
432+
if lim is None:
433+
return proposal.value
434+
if proposal.value is None:
435+
return lim
436+
return min(proposal.value, lim)
437+
438+
@observe("cluster_max_workers")
439+
def _observe_cluster_max_workers(self, change):
440+
# This shouldn't be needed, but traitlet validators don't run
441+
# if a value is `None` and `allow_none` is true, so we need to
442+
# add an observer to handle the event of an *explicit* `None`
443+
# set for `cluster_max_workers`
444+
if change.new is None:
445+
lim = self._worker_limit_from_resources()
446+
if lim is not None:
447+
self.cluster_max_workers = lim
448+
449+
@default("cluster_max_workers")
450+
def _default_cluster_max_workers(self):
451+
return self._worker_limit_from_resources()
452+
323453
def to_dict(self):
324454
return {
325455
k: getattr(self, k)

dask-gateway-server/dask_gateway_server/backends/db_base.py

+13
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def to_model(self):
207207
username=self.username,
208208
token=self.token,
209209
options=self.options,
210+
config=self.config,
210211
status=self.model_status,
211212
scheduler_address=self.scheduler_address,
212213
dashboard_address=self.dashboard_address,
@@ -956,6 +957,18 @@ async def on_cluster_heartbeat(self, cluster_name, msg):
956957
len(closed_workers),
957958
)
958959

960+
max_workers = cluster.config.get("cluster_max_workers")
961+
if max_workers is not None and count > max_workers:
962+
# This shouldn't happen under normal operation, but could if the
963+
# user does something malicious (or there's a bug).
964+
self.log.info(
965+
"Cluster %s heartbeat requested %d workers, exceeding limit of %s.",
966+
cluster_name,
967+
count,
968+
max_workers,
969+
)
970+
count = max_workers
971+
959972
if count != cluster.count:
960973
cluster_update["count"] = count
961974

dask-gateway-server/dask_gateway_server/backends/kubernetes/backend.py

+16
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,21 @@ async def on_cluster_heartbeat(self, cluster_name, msg):
456456
len(msg["closed_workers"]),
457457
)
458458

459+
cluster = self.clusters.get(cluster_name)
460+
if cluster is None:
461+
return
462+
max_workers = cluster.config.get("cluster_max_workers")
463+
if max_workers is not None and count > max_workers:
464+
# This shouldn't happen under normal operation, but could if the
465+
# user does something malicious (or there's a bug).
466+
self.log.info(
467+
"Cluster %s heartbeat requested %d workers, exceeding limit of %s.",
468+
cluster_name,
469+
count,
470+
max_workers,
471+
)
472+
count = max_workers
473+
459474
try:
460475
await self.custom_client.patch_namespaced_custom_object(
461476
"gateway.dask.org",
@@ -567,6 +582,7 @@ async def sync_cluster(self, cluster_name):
567582
name=cluster_name,
568583
username=obj["spec"].get("username", ""),
569584
options=obj["spec"].get("options") or {},
585+
config=obj["spec"].get("config") or {},
570586
token="",
571587
scheduler_address=scheduler_address,
572588
dashboard_address=dashboard_address,

dask-gateway-server/dask_gateway_server/models.py

+4
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class Cluster(object):
7878
The normalized set of configuration options provided when starting this
7979
cluster. These values are user-facing, and don't necessarily correspond
8080
with the ``ClusterConfig`` options on the backend.
81+
config : dict
82+
The serialized version of ``ClusterConfig`` for this cluster.
8183
status : ClusterStatus
8284
The status of the cluster.
8385
scheduler_address : str
@@ -103,6 +105,7 @@ def __init__(
103105
username,
104106
token,
105107
options,
108+
config,
106109
status,
107110
scheduler_address="",
108111
dashboard_address="",
@@ -116,6 +119,7 @@ def __init__(
116119
self.username = username
117120
self.token = token
118121
self.options = options
122+
self.config = config
119123
self.status = status
120124
self.scheduler_address = scheduler_address
121125
self.dashboard_address = dashboard_address

dask-gateway-server/dask_gateway_server/routes.py

+35-3
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,23 @@ async def scale_cluster(request):
198198
reason=f"Scale expects a non-negative integer, got {count}"
199199
)
200200

201+
max_workers = cluster.config.get("cluster_max_workers")
202+
resp_msg = None
203+
if max_workers is not None and count > max_workers:
204+
resp_msg = (
205+
f"Scale request of {count} workers would exceed resource limit of "
206+
f"{max_workers} workers. Scaling to {max_workers} instead."
207+
)
208+
count = max_workers
209+
201210
try:
202211
await backend.forward_message_to_scheduler(
203212
cluster, {"op": "scale", "count": count}
204213
)
205214
except PublicException as exc:
206215
raise web.HTTPConflict(reason=str(exc))
207-
return web.Response()
216+
217+
return web.json_response({"ok": not resp_msg, "msg": resp_msg})
208218

209219

210220
@default_routes.post("/api/v1/clusters/{cluster_name}/adapt")
@@ -226,14 +236,33 @@ async def adapt_cluster(request):
226236
maximum = msg.get("maximum", None)
227237
active = msg.get("active", True)
228238

239+
max_workers = cluster.config.get("cluster_max_workers")
240+
resp_msg = None
241+
if max_workers is not None:
242+
if maximum is None:
243+
maximum = max_workers
244+
if minimum is None:
245+
minimum = 0
246+
if maximum > max_workers or minimum > max_workers:
247+
orig_max = maximum
248+
orig_min = minimum
249+
maximum = min(max_workers, maximum)
250+
minimum = min(max_workers, minimum)
251+
resp_msg = (
252+
f"Adapt with `maximum={orig_max}, minimum={orig_min}` workers "
253+
f"would exceed resource limit of {max_workers} workers. Using "
254+
f"`maximum={maximum}, minimum={minimum}` instead."
255+
)
256+
229257
try:
230258
await backend.forward_message_to_scheduler(
231259
cluster,
232260
{"op": "adapt", "minimum": minimum, "maximum": maximum, "active": active},
233261
)
234262
except PublicException as exc:
235263
raise web.HTTPConflict(reason=str(exc))
236-
return web.Response()
264+
265+
return web.json_response({"ok": not resp_msg, "msg": resp_msg})
237266

238267

239268
@default_routes.post("/api/v1/clusters/{cluster_name}/heartbeat")
@@ -242,7 +271,10 @@ async def handle_heartbeat(request):
242271
backend = request.app["backend"]
243272
cluster_name = request.match_info["cluster_name"]
244273
msg = await request.json()
245-
await backend.on_cluster_heartbeat(cluster_name, msg)
274+
try:
275+
await backend.on_cluster_heartbeat(cluster_name, msg)
276+
except PublicException as exc:
277+
raise web.HTTPConflict(reason=str(exc))
246278
return web.Response()
247279

248280

dask-gateway/dask_gateway/client.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,13 @@ def stop_cluster(self, cluster_name, **kwargs):
652652

653653
async def _scale_cluster(self, cluster_name, n):
654654
url = "%s/api/v1/clusters/%s/scale" % (self.address, cluster_name)
655-
await self._request("POST", url, json={"count": n})
655+
resp = await self._request("POST", url, json={"count": n})
656+
try:
657+
msg = await resp.json()
658+
except Exception:
659+
msg = {}
660+
if not msg.get("ok", True) and msg.get("msg"):
661+
warnings.warn(GatewayWarning(msg["msg"]))
656662

657663
def scale_cluster(self, cluster_name, n, **kwargs):
658664
"""Scale a cluster to n workers.
@@ -669,11 +675,17 @@ def scale_cluster(self, cluster_name, n, **kwargs):
669675
async def _adapt_cluster(
670676
self, cluster_name, minimum=None, maximum=None, active=True
671677
):
672-
await self._request(
678+
resp = await self._request(
673679
"POST",
674680
"%s/api/v1/clusters/%s/adapt" % (self.address, cluster_name),
675681
json={"minimum": minimum, "maximum": maximum, "active": active},
676682
)
683+
try:
684+
msg = await resp.json()
685+
except Exception:
686+
msg = {}
687+
if not msg.get("ok", True) and msg.get("msg"):
688+
warnings.warn(GatewayWarning(msg["msg"]))
677689

678690
def adapt_cluster(
679691
self, cluster_name, minimum=None, maximum=None, active=True, **kwargs

docs/source/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ both the cluster backend and the authentication protocol are pluggable.
9090
authentication
9191
security
9292
cluster-options
93+
resource-limits
9394

9495
.. toctree::
9596
:maxdepth: 1

docs/source/resource-limits.rst

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
Cluster Resource Limits
2+
=======================
3+
4+
By default users can create clusters with as many workers and resources as they
5+
want. In shared environments this may not always be desirable. To remedy this
6+
administrators can set per-cluster resource limits.
7+
8+
A few limits are available:
9+
10+
- :data:`c.ClusterConfig.cluster_max_cores`: Maximum number of cores per cluster
11+
- :data:`c.ClusterConfig.cluster_max_memory`: Maximum amount of memory per cluster
12+
- :data:`c.ClusterConfig.cluster_max_workers`: Maximum number of workers per cluster
13+
14+
If a cluster is at capacity for any of these limits, requests for new workers
15+
or workers will warn with an informative message saying they're at capacity.
16+
17+
Example
18+
-------
19+
20+
Here we limit each cluster to:
21+
22+
- A max of 80 active cores
23+
- A max of 1 TiB of RAM
24+
25+
.. code-block:: python
26+
27+
c.ClusterConfig.cluster_max_cores = 80
28+
c.ClusterConfig.cluster_max_memory = "1 T"

0 commit comments

Comments
 (0)