forked from cloudwu/skynet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqueue.lua
84 lines (73 loc) · 1.76 KB
/
mqueue.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
-- This is a deprecated module, use skynet.queue instead.
local skynet = require "skynet"
local c = require "skynet.core"
local mqueue = {}
local init_once
local thread_id
local message_queue = {}
skynet.register_protocol {
name = "queue",
-- please read skynet.h for magic number 8
id = 8,
pack = skynet.pack,
unpack = skynet.unpack,
dispatch = function(session, from, ...)
table.insert(message_queue, {session = session, addr = from, ... })
if thread_id then
skynet.wakeup(thread_id)
thread_id = nil
end
end
}
local function do_func(f, msg)
return pcall(f, table.unpack(msg))
end
local function message_dispatch(f)
while true do
if #message_queue==0 then
thread_id = coroutine.running()
skynet.wait()
else
local msg = table.remove(message_queue,1)
local session = msg.session
if session == 0 then
local ok, msg = do_func(f, msg)
if ok then
if msg then
skynet.fork(message_dispatch,f)
error(string.format("[:%x] send a message to [:%x] return something", msg.addr, skynet.self()))
end
else
skynet.fork(message_dispatch,f)
error(string.format("[:%x] send a message to [:%x] throw an error : %s", msg.addr, skynet.self(),msg))
end
else
local data, size = skynet.pack(do_func(f,msg))
-- 1 means response
c.send(msg.addr, 1, session, data, size)
end
end
end
end
function mqueue.register(f)
assert(init_once == nil)
init_once = true
skynet.fork(message_dispatch,f)
end
local function catch(succ, ...)
if succ then
return ...
else
error(...)
end
end
function mqueue.call(addr, ...)
return catch(skynet.call(addr, "queue", ...))
end
function mqueue.send(addr, ...)
return skynet.send(addr, "queue", ...)
end
function mqueue.size()
return #message_queue
end
return mqueue