forked from cloudwu/skynet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclusterd.lua
182 lines (164 loc) · 4.41 KB
/
clusterd.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
local skynet = require "skynet"
local sc = require "socketchannel"
local socket = require "socket"
local cluster = require "cluster.core"
local config_name = skynet.getenv "cluster"
local node_address = {}
local node_session = {}
local command = {}
local function read_response(sock)
local sz = socket.header(sock:read(2))
local msg = sock:read(sz)
return cluster.unpackresponse(msg) -- session, ok, data, padding
end
local function open_channel(t, key)
local host, port = string.match(node_address[key], "([^:]+):(.*)$")
local c = sc.channel {
host = host,
port = tonumber(port),
response = read_response,
nodelay = true,
}
assert(c:connect(true))
t[key] = c
return c
end
local node_channel = setmetatable({}, { __index = open_channel })
local function loadconfig()
local f = assert(io.open(config_name))
local source = f:read "*a"
f:close()
local tmp = {}
assert(load(source, "@"..config_name, "t", tmp))()
for name,address in pairs(tmp) do
assert(type(address) == "string")
if node_address[name] ~= address then
-- address changed
if rawget(node_channel, name) then
node_channel[name] = nil -- reset connection
end
node_address[name] = address
end
end
end
function command.reload()
loadconfig()
skynet.ret(skynet.pack(nil))
end
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
if port == nil then
addr, port = string.match(node_address[addr], "([^:]+):(.*)$")
end
skynet.call(gate, "lua", "open", { address = addr, port = port })
skynet.ret(skynet.pack(nil))
end
local function send_request(source, node, addr, msg, sz)
local session = node_session[node] or 1
-- msg is a local pointer, cluster.packrequest will free it
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
node_session[node] = new_session
-- node_channel[node] may yield or throw error
local c = node_channel[node]
return c:request(request, session, padding)
end
function command.req(...)
local ok, msg, sz = pcall(send_request, ...)
if ok then
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
else
skynet.error(msg)
skynet.response()(false)
end
end
local proxy = {}
function command.proxy(source, node, name)
local fullname = node .. "." .. name
if proxy[fullname] == nil then
proxy[fullname] = skynet.newservice("clusterproxy", node, name)
end
skynet.ret(skynet.pack(proxy[fullname]))
end
local register_name = {}
function command.register(source, name, addr)
assert(register_name[name] == nil)
addr = addr or source
local old_name = register_name[addr]
if old_name then
register_name[old_name] = nil
end
register_name[addr] = name
register_name[name] = addr
skynet.ret(nil)
skynet.error(string.format("Register [%s] :%08x", name, addr))
end
local large_request = {}
function command.socket(source, subcmd, fd, msg)
if subcmd == "data" then
local sz
local addr, session, msg, padding = cluster.unpackrequest(msg)
if padding then
local req = large_request[session] or { addr = addr }
large_request[session] = req
table.insert(req, msg)
return
else
local req = large_request[session]
if req then
large_request[session] = nil
table.insert(req, msg)
msg,sz = cluster.concat(req)
addr = req.addr
end
if not msg then
local response = cluster.packresponse(session, false, "Invalid large req")
socket.write(fd, response)
return
end
end
local ok, response
if addr == 0 then
local name = skynet.unpack(msg, sz)
local addr = register_name[name]
if addr then
ok = true
msg, sz = skynet.pack(addr)
else
ok = false
msg = "name not found"
end
else
ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
end
if ok then
response = cluster.packresponse(session, true, msg, sz)
if type(response) == "table" then
for _, v in ipairs(response) do
socket.lwrite(fd, v)
end
else
socket.write(fd, response)
end
else
response = cluster.packresponse(session, false, msg)
socket.write(fd, response)
end
elseif subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
skynet.call(source, "lua", "accept", fd)
else
large_request = {}
skynet.error(string.format("socket %s %d : %s", subcmd, fd, msg))
end
end
skynet.start(function()
loadconfig()
skynet.dispatch("lua", function(session , source, cmd, ...)
local f = assert(command[cmd])
f(source, ...)
end)
end)