forked from cloudwu/skynet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclusterd.lua
122 lines (107 loc) · 3.02 KB
/
clusterd.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
local skynet = require "skynet"
local sc = require "socketchannel"
local socket = require "socket"
local cluster = require "cluster.core"
local config_name = skynet.getenv "cluster"
local node_address = {}
local node_session = {}
local command = {}
local function read_response(sock)
local sz = socket.header(sock:read(2))
local msg = sock:read(sz)
return cluster.unpackresponse(msg) -- session, ok, data
end
local function open_channel(t, key)
local host, port = string.match(node_address[key], "([^:]+):(.*)$")
local c = sc.channel {
host = host,
port = tonumber(port),
response = read_response,
nodelay = true,
}
assert(c:connect(true))
t[key] = c
node_session[key] = 1
return c
end
local node_channel = setmetatable({}, { __index = open_channel })
local function loadconfig()
local f = assert(io.open(config_name))
local source = f:read "*a"
f:close()
local tmp = {}
assert(load(source, "@"..config_name, "t", tmp))()
for name,address in pairs(tmp) do
assert(type(address) == "string")
if node_address[name] ~= address then
-- address changed
if rawget(node_channel, name) then
node_channel[name] = nil -- reset connection
end
node_address[name] = address
end
end
end
function command.reload()
loadconfig()
skynet.ret(skynet.pack(nil))
end
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
if port == nil then
addr, port = string.match(node_address[addr], "([^:]+):(.*)$")
end
skynet.call(gate, "lua", "open", { address = addr, port = port })
skynet.ret(skynet.pack(nil))
end
local function send_request(source, node, addr, msg, sz)
local request
local c = node_channel[node]
local session = node_session[node]
-- msg is a local pointer, cluster.packrequest will free it
request, node_session[node] = cluster.packrequest(addr, session , msg, sz)
return c:request(request, session)
end
function command.req(...)
local ok, msg, sz = pcall(send_request, ...)
if ok then
skynet.ret(msg, sz)
else
skynet.error(msg)
skynet.response()(false)
end
end
local proxy = {}
function command.proxy(source, node, name)
local fullname = node .. "." .. name
if proxy[fullname] == nil then
proxy[fullname] = skynet.newservice("clusterproxy", node, name)
end
skynet.ret(skynet.pack(proxy[fullname]))
end
local request_fd = {}
function command.socket(source, subcmd, fd, msg)
if subcmd == "data" then
local addr, session, msg = cluster.unpackrequest(msg)
local ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg)
local response
if ok then
response = cluster.packresponse(session, true, msg, sz)
else
response = cluster.packresponse(session, false, msg)
end
socket.write(fd, response)
elseif subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
skynet.call(source, "lua", "accept", fd)
else
skynet.error(string.format("socket %s %d : %s", subcmd, fd, msg))
end
end
skynet.start(function()
loadconfig()
skynet.dispatch("lua", function(session , source, cmd, ...)
local f = assert(command[cmd])
f(source, ...)
end)
end)