Skip to content

Commit a090899

Browse files
committed
improve cluster
1 parent 3a5de32 commit a090899

File tree

7 files changed

+135
-20
lines changed

7 files changed

+135
-20
lines changed

HISTORY.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ Dev version
55
* Add worker thread weight
66
* Add skynet.queue
77
* Bugfix: socketchannel
8+
* cluster can throw error
9+
* Add readline and writeline to clientsocket lib
10+
* Add cluster.reload to reload config file
811

912
v0.4.1 (2014-7-7)
1013
-----------

examples/client.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ end
3939

4040
while true do
4141
dispatch()
42-
local cmd = socket.readline()
42+
local cmd = socket.readstdin()
4343
if cmd then
4444
local args = {}
4545
string.gsub(cmd, '[^ ]+', function(v) table.insert(args, v) end )

lualib-src/lua-clientsocket.c

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,19 @@ unpack(lua_State *L, uint8_t *buffer, int sz, int n) {
125125
boolean (true: data, false: block, nil: close)
126126
string last
127127
*/
128+
129+
struct socket_buffer {
130+
void * buffer;
131+
int sz;
132+
};
133+
128134
static int
129-
lrecv(lua_State *L) {
135+
recv_socket(lua_State *L, char *tmp, struct socket_buffer *result) {
130136
int fd = luaL_checkinteger(L,1);
131137
size_t sz = 0;
132138
const char * last = lua_tolstring(L,2,&sz);
133139
luaL_checktype(L, 3, LUA_TTABLE);
134140

135-
char tmp[CACHE_SIZE];
136141
char * buffer;
137142
int r = recv(fd, tmp, CACHE_SIZE, 0);
138143
if (r == 0) {
@@ -163,10 +168,64 @@ lrecv(lua_State *L) {
163168
lua_pushnil(L);
164169
lua_rawseti(L, 3, i);
165170
}
171+
result->buffer = buffer;
172+
result->sz = r + sz;
173+
return -1;
174+
}
166175

167-
return unpack(L, (uint8_t *)buffer, r+sz, 0);
176+
static int
177+
lrecv(lua_State *L) {
178+
struct socket_buffer sb;
179+
char tmp[CACHE_SIZE];
180+
int ret = recv_socket(L, tmp, &sb);
181+
if (ret < 0) {
182+
return unpack(L, sb.buffer, sb.sz, 0);
183+
} else {
184+
return ret;
185+
}
168186
}
169187

188+
static int
189+
unpack_line(lua_State *L, uint8_t *buffer, int sz, int n) {
190+
if (sz == 0)
191+
goto _block;
192+
if (buffer[0] == '\n') {
193+
return unpack_line(L, buffer+1, sz-1, n);
194+
}
195+
int i;
196+
for (i=1;i<sz;i++) {
197+
if (buffer[i] == '\n') {
198+
++n;
199+
lua_pushlstring(L, (const char *)buffer, i);
200+
lua_rawseti(L, 3, n);
201+
buffer += i + 1;
202+
sz -= i + 1;
203+
return unpack_line(L, buffer, sz, n);
204+
}
205+
}
206+
_block:
207+
lua_pushboolean(L, n==0 ? 0:1);
208+
if (sz == 0) {
209+
lua_pushnil(L);
210+
} else {
211+
lua_pushlstring(L, (const char *)buffer, sz);
212+
}
213+
return 2;
214+
}
215+
216+
static int
217+
lreadline(lua_State *L) {
218+
struct socket_buffer sb;
219+
char tmp[CACHE_SIZE];
220+
int ret = recv_socket(L, tmp, &sb);
221+
if (ret < 0) {
222+
return unpack_line(L, sb.buffer, sb.sz, 0);
223+
} else {
224+
return ret;
225+
}
226+
}
227+
228+
170229
static int
171230
lusleep(lua_State *L) {
172231
int n = luaL_checknumber(L, 1);
@@ -219,7 +278,7 @@ readline_stdin(void * arg) {
219278
}
220279

221280
static int
222-
lreadline(lua_State *L) {
281+
lreadstdin(lua_State *L) {
223282
struct queue *q = lua_touserdata(L, lua_upvalueindex(1));
224283
LOCK(q);
225284
if (q->head == q->tail) {
@@ -236,6 +295,18 @@ lreadline(lua_State *L) {
236295
return 1;
237296
}
238297

298+
static int
299+
lwriteline(lua_State *L) {
300+
size_t sz = 0;
301+
int fd = luaL_checkinteger(L,1);
302+
const char * msg = luaL_checklstring(L, 2, &sz);
303+
block_send(L, fd, msg, sz);
304+
char nl[1] = { '\n' };
305+
block_send(L, fd, nl, 1);
306+
307+
return 0;
308+
}
309+
239310
int
240311
luaopen_clientsocket(lua_State *L) {
241312
luaL_checkversion(L);
@@ -245,14 +316,16 @@ luaopen_clientsocket(lua_State *L) {
245316
{ "send", lsend },
246317
{ "close", lclose },
247318
{ "usleep", lusleep },
319+
{ "readline", lreadline },
320+
{ "writeline", lwriteline },
248321
{ NULL, NULL },
249322
};
250323
luaL_newlib(L, l);
251324

252325
struct queue * q = lua_newuserdata(L, sizeof(*q));
253326
memset(q, 0, sizeof(*q));
254-
lua_pushcclosure(L, lreadline, 1);
255-
lua_setfield(L, -2, "readline");
327+
lua_pushcclosure(L, lreadstdin, 1);
328+
lua_setfield(L, -2, "readstdin");
256329

257330
pthread_t pid ;
258331
pthread_create(&pid, NULL, readline_stdin, q);

lualib-src/lua-cluster.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
uint32_t next_session
1616
*/
1717

18-
#define TEMP_LENGTH 0x10002
18+
#define TEMP_LENGTH 0x10007
1919

2020
static void
2121
fill_uint32(uint8_t * buf, uint32_t n) {
@@ -146,6 +146,7 @@ lunpackrequest(lua_State *L) {
146146

147147
/*
148148
int session
149+
boolean ok
149150
lightuserdata msg
150151
int sz
151152
return string response
@@ -155,15 +156,27 @@ lpackresponse(lua_State *L) {
155156
uint32_t session = luaL_checkunsigned(L,1);
156157
// clusterd.lua:command.socket call lpackresponse,
157158
// and the msg/sz is return by skynet.rawcall , so don't free(msg)
158-
void * msg = lua_touserdata(L,2);
159-
size_t sz = luaL_checkunsigned(L, 3);
159+
int ok = lua_toboolean(L,2);
160+
void * msg;
161+
size_t sz;
162+
163+
if (lua_type(L,3) == LUA_TSTRING) {
164+
msg = (void *)lua_tolstring(L, 3, &sz);
165+
if (sz > 0x1000) {
166+
sz = 0x1000;
167+
}
168+
} else {
169+
msg = lua_touserdata(L,3);
170+
sz = luaL_checkunsigned(L, 4);
171+
}
160172

161173
uint8_t buf[TEMP_LENGTH];
162-
fill_header(L, buf, sz+4, msg);
174+
fill_header(L, buf, sz+5, msg);
163175
fill_uint32(buf+2, session);
164-
memcpy(buf+6,msg,sz);
176+
buf[6] = ok;
177+
memcpy(buf+7,msg,sz);
165178

166-
lua_pushlstring(L, (const char *)buf, sz+6);
179+
lua_pushlstring(L, (const char *)buf, sz+7);
167180

168181
return 1;
169182
}
@@ -178,13 +191,13 @@ static int
178191
lunpackresponse(lua_State *L) {
179192
size_t sz;
180193
const char * buf = luaL_checklstring(L, 1, &sz);
181-
if (sz < 4) {
194+
if (sz < 5) {
182195
return 0;
183196
}
184197
uint32_t session = unpack_uint32((const uint8_t *)buf);
185198
lua_pushunsigned(L, session);
186-
lua_pushboolean(L, 1);
187-
lua_pushlstring(L, buf+4, sz-4);
199+
lua_pushboolean(L, buf[4]);
200+
lua_pushlstring(L, buf+5, sz-5);
188201

189202
return 3;
190203
}

lualib/cluster.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ function cluster.open(port)
1616
end
1717
end
1818

19+
function cluster.reload()
20+
skynet.call(clusterd, "lua", "reload")
21+
end
22+
1923
skynet.init(function()
2024
clusterd = skynet.uniqueservice("clusterd")
2125
end)

lualib/skynet.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ end
325325

326326
function skynet.rawcall(addr, typename, msg, sz)
327327
local p = proto[typename]
328+
if watching_service[addr] == false then
329+
error("Service is dead")
330+
end
328331
local session = assert(c.send(addr, p.id , nil , msg, sz), "call to invalid address")
329332
return yield_call(addr, session)
330333
end

service/clusterd.lua

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,14 @@ local cluster = require "cluster.c"
55

66
local config_name = skynet.getenv "cluster"
77
local node_address = {}
8-
assert(loadfile(config_name, "t", node_address))()
8+
9+
local function loadconfig()
10+
local f = assert(io.open(config_name))
11+
local source = f:read "*a"
12+
f:close()
13+
assert(load(source, "@"..config_name, "t", node_address))()
14+
end
15+
916
local node_session = {}
1017
local command = {}
1118

@@ -30,6 +37,11 @@ end
3037

3138
local node_channel = setmetatable({}, { __index = open_channel })
3239

40+
function command.reload()
41+
loadconfig()
42+
skynet.ret(skynet.pack(nil))
43+
end
44+
3345
function command.listen(source, addr, port)
3446
local gate = skynet.newservice("gate")
3547
if port == nil then
@@ -45,6 +57,7 @@ function command.req(source, node, addr, msg, sz)
4557
local session = node_session[node]
4658
-- msg is a local pointer, cluster.packrequest will free it
4759
request, node_session[node] = cluster.packrequest(addr, session , msg, sz)
60+
local ok, result = pcall(c.request, c, request, session)
4861
skynet.ret(c:request(request, session))
4962
end
5063

@@ -53,8 +66,13 @@ local request_fd = {}
5366
function command.socket(source, subcmd, fd, msg)
5467
if subcmd == "data" then
5568
local addr, session, msg = cluster.unpackrequest(msg)
56-
local msg, sz = skynet.rawcall(addr, "lua", msg)
57-
local response = cluster.packresponse(session, msg, sz)
69+
local ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg)
70+
local response
71+
if ok then
72+
response = cluster.packresponse(session, true, msg, sz)
73+
else
74+
response = cluster.packresponse(session, false, msg)
75+
end
5876
socket.write(fd, response)
5977
elseif subcmd == "open" then
6078
skynet.error(string.format("socket accept from %s", msg))
@@ -65,7 +83,8 @@ function command.socket(source, subcmd, fd, msg)
6583
end
6684

6785
skynet.start(function()
68-
skynet.dispatch("lua", function(_, source, cmd, ...)
86+
loadconfig()
87+
skynet.dispatch("lua", function(session , source, cmd, ...)
6988
local f = assert(command[cmd])
7089
f(source, ...)
7190
end)

0 commit comments

Comments
 (0)