forked from dask/dask-gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_local_backend.py
56 lines (40 loc) · 1.82 KB
/
test_local_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import pytest
from .utils_test import (
LocalTestingBackend,
temp_gateway,
wait_for_workers,
with_retries,
)
@pytest.mark.asyncio
async def test_local_cluster_backend():
async with temp_gateway(backend_class=LocalTestingBackend) as g:
async with g.gateway_client() as gateway:
async with gateway.new_cluster() as cluster:
db_cluster = g.gateway.backend.db.get_cluster(cluster.name)
res = await g.gateway.backend.do_check_clusters([db_cluster])
assert res == [True]
await cluster.scale(3)
await wait_for_workers(cluster, exact=3)
await cluster.scale(1)
await wait_for_workers(cluster, exact=1)
db_workers = list(db_cluster.workers.values())
async def test():
res = await g.gateway.backend.do_check_workers(db_workers)
assert sum(res) == 1
await with_retries(test, 20)
async with cluster.get_client(set_as_default=False) as client:
res = await client.submit(lambda x: x + 1, 1)
assert res == 2
await cluster.scale(0)
await wait_for_workers(cluster, exact=0)
async def test():
res = await g.gateway.backend.do_check_workers(db_workers)
assert sum(res) == 0
await with_retries(test, 20)
# No-op for shutdown of already shutdown worker
db_worker = db_workers[0]
res = await g.gateway.backend.do_stop_worker(db_worker)
async def test():
res = await g.gateway.backend.do_check_clusters([db_cluster])
assert res == [False]
await with_retries(test, 20)