Skip to content

Commit c1c032e

Browse files
committed
bugfix: socketchannel order mode may blocked. (used by redis driver)
1 parent 7486e53 commit c1c032e

File tree

4 files changed

+45
-39
lines changed

4 files changed

+45
-39
lines changed

lualib/dns.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ local function suspend(tid, name, qtype)
265265
co = coroutine.running(),
266266
}
267267
request_pool[tid] = req
268-
skynet.wait()
268+
skynet.wait(req.co)
269269
local answers = request_pool[tid].answers
270270
request_pool[tid] = nil
271271
assert(answers, "no ip")

lualib/skynet.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,10 @@ function skynet.yield()
275275
return skynet.sleep(0)
276276
end
277277

278-
function skynet.wait()
278+
function skynet.wait(co)
279279
local session = c.genid()
280280
local ret, msg = coroutine_yield("SLEEP", session)
281-
local co = coroutine.running()
281+
co = co or coroutine.running()
282282
sleep_session[co] = nil
283283
session_id_coroutine[session] = nil
284284
end

lualib/socket.lua

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ end
3030
local function suspend(s)
3131
assert(not s.co)
3232
s.co = coroutine.running()
33-
skynet.wait()
33+
skynet.wait(s.co)
3434
-- wakeup closing corouting every time suspend,
3535
-- because socket.close() will wait last socket buffer operation before clear the buffer.
3636
if s.closing then
@@ -232,7 +232,7 @@ function socket.close(id)
232232
-- wait reading coroutine read the buffer.
233233
assert(not s.closing)
234234
s.closing = coroutine.running()
235-
skynet.wait()
235+
skynet.wait(s.closing)
236236
else
237237
suspend(s)
238238
end
@@ -361,7 +361,7 @@ function socket.lock(id)
361361
else
362362
local co = coroutine.running()
363363
table.insert(lock_set, co)
364-
skynet.wait()
364+
skynet.wait(co)
365365
end
366366
end
367367

lualib/socketchannel.lua

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,17 @@ local function dispatch_by_session(self)
115115
end
116116
end
117117

118+
local wait_response
119+
118120
local function pop_response(self)
119-
return table.remove(self.__request, 1), table.remove(self.__thread, 1)
121+
while true do
122+
local func,co = table.remove(self.__request, 1), table.remove(self.__thread, 1)
123+
if func then
124+
return func, co
125+
end
126+
wait_response = coroutine.running()
127+
skynet.wait(wait_response)
128+
end
120129
end
121130

122131
local function push_response(self, response, co)
@@ -127,46 +136,43 @@ local function push_response(self, response, co)
127136
-- response is a function, push it to __request
128137
table.insert(self.__request, response)
129138
table.insert(self.__thread, co)
139+
if wait_response then
140+
skynet.wakeup(wait_response)
141+
wait_response = nil
142+
end
130143
end
131144
end
132145

133146
local function dispatch_by_order(self)
134147
while self.__sock do
135148
local func, co = pop_response(self)
136-
if func == nil then
137-
if not socket.block(self.__sock[1]) then
138-
close_channel_socket(self)
139-
wakeup_all(self)
140-
end
141-
else
142-
local ok, result_ok, result_data, padding = pcall(func, self.__sock)
143-
if ok then
144-
if padding and result_ok then
145-
-- if padding is true, wait for next result_data
146-
-- self.__result_data[co] is a table
147-
local result = self.__result_data[co] or {}
148-
self.__result_data[co] = result
149-
table.insert(result, result_data)
150-
else
151-
self.__result[co] = result_ok
152-
if result_ok and self.__result_data[co] then
153-
table.insert(self.__result_data[co], result_data)
154-
else
155-
self.__result_data[co] = result_data
156-
end
157-
skynet.wakeup(co)
158-
end
149+
local ok, result_ok, result_data, padding = pcall(func, self.__sock)
150+
if ok then
151+
if padding and result_ok then
152+
-- if padding is true, wait for next result_data
153+
-- self.__result_data[co] is a table
154+
local result = self.__result_data[co] or {}
155+
self.__result_data[co] = result
156+
table.insert(result, result_data)
159157
else
160-
close_channel_socket(self)
161-
local errmsg
162-
if result_ok ~= socket_error then
163-
errmsg = result_ok
158+
self.__result[co] = result_ok
159+
if result_ok and self.__result_data[co] then
160+
table.insert(self.__result_data[co], result_data)
161+
else
162+
self.__result_data[co] = result_data
164163
end
165-
self.__result[co] = socket_error
166-
self.__result_data[co] = errmsg
167164
skynet.wakeup(co)
168-
wakeup_all(self, errmsg)
169165
end
166+
else
167+
close_channel_socket(self)
168+
local errmsg
169+
if result_ok ~= socket_error then
170+
errmsg = result_ok
171+
end
172+
self.__result[co] = socket_error
173+
self.__result_data[co] = errmsg
174+
skynet.wakeup(co)
175+
wakeup_all(self, errmsg)
170176
end
171177
end
172178
end
@@ -288,7 +294,7 @@ local function block_connect(self, once)
288294
-- connecting in other coroutine
289295
local co = coroutine.running()
290296
table.insert(self.__connecting, co)
291-
skynet.wait()
297+
skynet.wait(co)
292298
else
293299
self.__connecting[1] = true
294300
try_connect(self, once)
@@ -319,7 +325,7 @@ end
319325
local function wait_for_response(self, response)
320326
local co = coroutine.running()
321327
push_response(self, response, co)
322-
skynet.wait()
328+
skynet.wait(co)
323329

324330
local result = self.__result[co]
325331
self.__result[co] = nil

0 commit comments

Comments
 (0)