This repository was archived by the owner on Dec 16, 2022. It is now read-only.
forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtask.jl
63 lines (57 loc) · 1.38 KB
/
task.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
show(io::IO, t::Task) = print(io, "Task")
current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = t.done
function task_local_storage()
t = current_task()
if is(t.storage, nothing)
t.storage = ObjectIdDict()
end
(t.storage)::ObjectIdDict
end
task_local_storage(key) = task_local_storage()[key]
task_local_storage(key, val) = (task_local_storage()[key] = val)
function produce(v)
ct = current_task()
q = ct.consumers
if !is(q,nothing)
Q = q::Array{Any,1}
if !isempty(Q)
# make a task waiting for us runnable again
enq_work(pop!(Q))
end
end
yieldto(ct.last, v)
ct.parent = ct.last # always exit to last consumer
nothing
end
produce(v...) = produce(v)
function consume(P::Task)
while !(P.runnable || P.done)
yield(WaitFor(:consume, P))
end
ct = current_task()
prev = ct.last
ct.runnable = false
v = yieldto(P)
ct.last = prev
ct.runnable = true
if P.done
q = P.consumers
if !is(q, nothing)
Q = q::Array{Any,1}
while !isempty(Q)
enq_work(pop!(Q))
end
end
end
v
end
start(t::Task) = nothing
function done(t::Task, val)
t.result = consume(t)
istaskdone(t)
end
next(t::Task, val) = (t.result, nothing)
macro task(ex)
:(Task(()->$(esc(ex))))
end