-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathtransport.jl
91 lines (75 loc) · 2.21 KB
/
transport.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
abstract type AbstractTransport end
"""Transport layer over ZMQ sockets"""
struct ZMQTransport <: AbstractTransport
ctx::Context
sock::Socket
mode::Int # REQ/REP
bound::Bool
function ZMQTransport(addr::String, mode::Int, bound::Bool, ctx::Context=Context())
sock = Socket(ctx, mode)
if bound
ZMQ.bind(sock, addr)
else
ZMQ.connect(sock, addr)
end
new(ctx, sock, mode, bound)
end
ZMQTransport(ip, port, mode::Int, bound::Bool, ctx::Context=Context()) = ZMQTransport("tcp://$ip:$port", mode, bound, ctx)
end
function sendrecv(conn::ZMQTransport, msgstr)
@debug("sending request", msgstr)
ZMQ.send(conn.sock, ZMQ.Message(msgstr))
respstr = unsafe_string(ZMQ.recv(conn.sock))
@debug("received response", respstr)
respstr
end
function sendresp(conn::ZMQTransport, msgstr)
@debug("sending response", msgstr)
ZMQ.send(conn.sock, ZMQ.Message(msgstr))
end
function recvreq(conn::ZMQTransport)
reqstr = unsafe_string(ZMQ.recv(conn.sock))
@debug("received request", reqstr)
reqstr
end
function close(conn::ZMQTransport)
close(conn.sock)
# close(conn.ctx)
end
"""Transport layer over in-process Channels"""
struct InProcTransport <: AbstractTransport
name::Symbol
function InProcTransport(name::Symbol)
if !(name in keys(InProcChannels))
InProcChannels[name] = (Channel{Any}(1), Channel{Any}(1))
end
new(name)
end
end
const InProcChannels = Dict{Symbol,Tuple{Channel{Any},Channel{Any}}}()
function sendrecv(conn::InProcTransport, msg)
clntq,srvrq = InProcChannels[conn.name]
@debug("sending request", msg)
put!(srvrq, msg)
resp = take!(clntq)
@debug("received response", resp)
resp
end
function sendresp(conn::InProcTransport, msg)
clntq,srvrq = InProcChannels[conn.name]
@debug("sending response", msg)
put!(clntq, msg)
nothing
end
function recvreq(conn::InProcTransport)
clntq,srvrq = InProcChannels[conn.name]
req = take!(srvrq)
@debug("received request", req)
req
end
function close(conn::InProcTransport)
if conn.name in keys(InProcChannels)
delete!(InProcChannels, conn.name)
end
nothing
end