Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.

Commit 4d34a9e

Browse files
author
Junchao Wu
committed
add tchannel streaming functionality
1 parent c88b0da commit 4d34a9e

29 files changed

+1030
-388
lines changed

examples/handler.py

+37-12
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,54 @@
2121
from __future__ import absolute_import
2222

2323
import random
24-
import time
2524

2625
import tornado.gen
2726

2827
from tchannel.tornado.dispatch import TornadoDispatcher
28+
from tchannel.tornado.stream import InMemStream
29+
from tchannel.tornado.util import print_arg, get_arg
2930

3031

32+
@tornado.gen.coroutine
3133
def say_hi(request, response, opts):
32-
response.write(arg3="hi")
34+
arg2 = yield get_arg(request, 1)
35+
arg3 = yield get_arg(request, 2)
36+
response.argstreams = [
37+
InMemStream(request.endpoint),
38+
InMemStream(arg2),
39+
InMemStream(arg3)
40+
]
3341

3442

43+
@tornado.gen.coroutine
3544
def say_ok(request, response, opts):
36-
response.write(arg3="ok")
45+
yield print_arg(request, 1)
46+
yield print_arg(request, 2)
3747

48+
response.argstreams = [
49+
InMemStream(),
50+
InMemStream(),
51+
InMemStream("world")]
3852

53+
54+
@tornado.gen.coroutine
3955
def echo(request, response, opts):
40-
response.write(arg3=request.message.args[2])
56+
# stream args right back to request side
57+
print "streaming"
58+
response.argstreams = [
59+
InMemStream(request.endpoint),
60+
request.argstreams[1],
61+
request.argstreams[2]
62+
]
4163

4264

4365
@tornado.gen.coroutine
4466
def slow(request, response, opts):
4567
yield tornado.gen.sleep(random.random())
46-
response.write(arg3="done")
47-
48-
49-
def blocking(request, response, opts):
50-
time.sleep(random.random())
51-
response.write(arg3="yawn")
68+
response.argstreams = [
69+
InMemStream(),
70+
InMemStream(),
71+
InMemStream("done")]
5272

5373

5474
def get_example_handler():
@@ -58,10 +78,15 @@ def get_example_handler():
5878
dispatcher.register("ok", say_ok)
5979
dispatcher.register("echo", echo)
6080
dispatcher.register("slow", slow)
61-
dispatcher.register("blocking", blocking)
6281

6382
@dispatcher.route("bye")
6483
def say_bye(request, response, opts):
65-
response.write("bye bye!")
84+
yield print_arg(request, 1)
85+
yield print_arg(request, 2)
86+
87+
response.argstreams = [
88+
InMemStream(),
89+
InMemStream(),
90+
InMemStream("world")]
6691

6792
return dispatcher

examples/options.py

+5
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ def get_args():
3131
"--host",
3232
dest="host", default="localhost"
3333
)
34+
parser.add_argument(
35+
"--file",
36+
dest="filename"
37+
)
38+
3439
return parser.parse_args()

examples/stream_client.py

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import os
2+
import sys
3+
from options import get_args
4+
import tornado
5+
import tornado.ioloop
6+
from tchannel.tornado import TChannel
7+
from tchannel.tornado.stream import InMemStream, PipeStream
8+
from tchannel.tornado.util import print_arg
9+
10+
11+
@tornado.gen.coroutine
12+
def send_stream(arg1, arg2, arg3, host):
13+
tchannel = TChannel()
14+
response = yield tchannel.request(host).send(
15+
arg1,
16+
arg2,
17+
arg3)
18+
19+
yield print_arg(response, 0)
20+
yield print_arg(response, 1)
21+
yield print_arg(response, 2)
22+
23+
24+
def main():
25+
args = get_args()
26+
27+
arg1 = InMemStream("echo")
28+
arg2 = InMemStream()
29+
arg3 = InMemStream()
30+
31+
ioloop = tornado.ioloop.IOLoop.current()
32+
if args.filename == "stdin":
33+
arg3 = PipeStream(sys.stdin.fileno())
34+
send_stream(arg1, arg2, arg3, args.host)
35+
ioloop.start()
36+
elif args.filename:
37+
f = os.open(args.filename, os.O_RDONLY)
38+
arg3 = PipeStream(f)
39+
ioloop.run_sync(lambda: send_stream(arg1, arg2, arg3, args.host))
40+
else:
41+
raise NotImplementedError()
42+
43+
if __name__ == '__main__': # pragma: no cover
44+
main()

examples/tchannel_server.py

-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import tornado.ioloop
2424

2525
from tchannel.tornado import TChannel
26-
2726
from options import get_args
2827
from handler import get_example_handler
2928

examples/tornado_client.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
import tornado.ioloop
2929

3030
from options import get_args
31-
from tchannel.tornado.connection import TornadoConnection
31+
from tchannel.tornado.connection import StreamConnection
3232

3333

3434
@tornado.gen.coroutine
3535
def main():
3636

3737
args = get_args()
38-
conn = yield TornadoConnection.outgoing('%s:%d' % (args.host, args.port))
38+
conn = yield StreamConnection.outgoing('%s:%d' % (args.host, args.port))
3939

4040
N = 10000
4141
before = time.time()

tchannel/dispatch.py

-162
This file was deleted.

tchannel/glossary.py

-7
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,3 @@
2222
#
2323
# Message ID 0xffffffff is reserved
2424
MAX_MESSAGE_ID = 0xfffffffe
25-
26-
# Largest payload supported by the system.
27-
#
28-
# 64KB Max frame size
29-
# 16B (size:2 | type:1 | reserved:1 | id:4 | reserved:8)
30-
# 1 2 Bytes can represent 0~2**16-1
31-
MAX_PAYLOAD_SIZE = 0xFFEF # 64*1024 - 16 - 1

tchannel/handler.py

+24-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ class BaseRequestHandler(RequestHandler):
5656

5757
_HANDLER_NAMES = {
5858
Types.PING_REQ: 'ping',
59-
Types.CALL_REQ: 'call'
59+
Types.CALL_REQ: 'pre_call',
60+
Types.CALL_REQ_CONTINUE: 'pre_call'
6061
}
6162

6263
def __init__(self):
@@ -76,14 +77,33 @@ def handle(self, context, connection):
7677
handler_name = "handle_" + self._HANDLER_NAMES[message.message_type]
7778
return getattr(self, handler_name)(message_id, message, connection)
7879

80+
def handle_pre_call(self, message_id, message, connection):
81+
"""Handle incoming request message including CallRequestMessage and
82+
CallRequestContinueMessage
83+
84+
This method will build the User friendly request object based on the
85+
incoming messages.
86+
87+
It passes all the messages into the message_factory to build the init
88+
request object. Only when it get a CallRequestMessage and a completed
89+
arg_1=argstream[0], the message_factory will return a request object.
90+
Then it will trigger the async call_handle call.
91+
92+
:param message_id: message id
93+
:param message: CallRequestMessage or CallRequestContinueMessage
94+
:param connection: tornado connection
95+
"""
96+
req = connection.request_message_factory.build(message_id, message)
97+
# call handler only for the call request message not continue message
98+
if req:
99+
self.handle_call(req, connection)
100+
79101
def handle_ping(self, message_id, ping, connection):
80102
return connection.write(PingResponseMessage(), message_id)
81103

82-
def handle_call(self, message_id, call, connection):
104+
def handle_call(self, call, connection):
83105
"""Handle an incoming call.
84106
85-
:param message_id:
86-
Message ID of the request
87107
:param call:
88108
CallRequestMessage containing information about the call
89109
:param connection:

tchannel/io.py

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from __future__ import absolute_import
2222

23+
2324
try:
2425
from cStringIO import StringIO as BytesIO
2526
except ImportError: # pragma: no cover

0 commit comments

Comments
 (0)