Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.

Commit b6ddfaa

Browse files
author
Junchao Wu
committed
add streaming logic to python tchannel inbound server
1 parent d6f5975 commit b6ddfaa

15 files changed

+458
-89
lines changed

tchannel/exceptions.py

+3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class InvalidChecksumException(TChannelException):
5353
"""Represent invalid checksum type in the message"""
5454
pass
5555

56+
class StreamingException(TChannelException):
57+
"""Represent Streaming Message Exception"""
58+
pass
5659

5760
class InvalidErrorCodeException(TChannelException):
5861
"""Represent Invalid Error Code exception"""

tchannel/handler.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,14 @@ class TChannelRequest(object):
102102

103103
def __init__(self, context, conn):
104104
self.message = context.message
105-
self.header = getattr(self.message, 'arg_2', None)
106-
self.body = getattr(self.message, 'arg_3', None)
107-
self.method = getattr(self.message, 'arg_1', None)
105+
106+
try:
107+
self.method = self.message.args[0]
108+
self.header = self.message.args[1]
109+
self.body = self.message.args[2]
110+
except:
111+
pass
112+
108113
self.connection = conn
109114
self.context = context
110115
self.id = context.message_id
@@ -116,21 +121,27 @@ class TChannelResponse(object):
116121
"""TChannel Response Wrapper"""
117122

118123
__slots__ = ('_connection', '_request',
119-
'resp_msg', 'id')
124+
'resp_msg', 'id', 'body',
125+
'headers')
120126

121127
def __init__(self, request, conn):
122128
self._connection = conn
123129
self._request = request
124-
self.resp_msg = CallResponseMessage()
130+
self.body = ""
131+
self.headers = ""
125132
self.id = request.id
133+
self.resp_msg = None
126134

127135
def write(self, chunk):
128136
# build response message
129-
self.resp_msg.arg_3 += chunk
137+
self.body += chunk
130138

131139
def finish(self):
140+
# TODO add status code into arg_1 area
141+
self.resp_msg = CallResponseMessage(
142+
args=["", self.headers, self.body]
143+
)
132144
self._connection.finish(self)
133-
self.resp_msg = CallResponseMessage()
134145

135146
def update_resp_id(self):
136147
self.id += 1

tchannel/messages/__init__.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from .init_response import InitResponseMessage, init_res_rw
1010
from .ping_request import PingRequestMessage, ping_req_rw
1111
from .ping_response import PingResponseMessage, ping_res_rw
12+
from .call_request_continue import call_req_c_rw
13+
from .call_response_continue import call_res_c_rw
1214

1315
RW = {
1416
Types.CALL_REQ: call_req_rw,
@@ -17,15 +19,18 @@
1719
Types.INIT_REQ: init_req_rw,
1820
Types.INIT_RES: init_res_rw,
1921
Types.PING_REQ: ping_req_rw,
20-
Types.PING_RES: ping_res_rw
22+
Types.PING_RES: ping_res_rw,
23+
Types.CALL_REQ_CONTINUE: call_req_c_rw,
24+
Types.CALL_RES_CONTINUE: call_res_c_rw,
2125
}
2226

23-
2427
__all__ = [
2528
"RW",
2629
"ChecksumType",
2730
"CallRequestMessage",
31+
"CallRequestContinueMessage",
2832
"CallResponseMessage",
33+
"CallResponseContinueMessage",
2934
"ErrorMessage",
3035
"InitRequestMessage",
3136
"InitResponseMessage",

tchannel/messages/base.py

+12
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,15 @@ def __str__(self):
2525
str(self.__class__.__name__),
2626
", ".join(attrs)
2727
)
28+
29+
def encode(self):
30+
""" Encode all the strings in the msg using
31+
encode type from common.py
32+
"""
33+
pass
34+
35+
def decode(self):
36+
""" Decode all the strings in the msg using
37+
decode type from common.py
38+
"""
39+
pass

tchannel/messages/call_continue.py

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from __future__ import absolute_import
2+
3+
from .types import Types
4+
from .. import rw
5+
from . import common, ChecksumType
6+
from .base import BaseMessage
7+
8+
9+
class CallContinueMessage(BaseMessage):
10+
"""Represent a continuation of a call request (across multiple frames)."""
11+
max_args_num = 3
12+
13+
__slots__ = (
14+
'flags',
15+
'checksum',
16+
'args',
17+
)
18+
19+
def __init__(
20+
self,
21+
flags=0,
22+
checksum=None,
23+
args=None,
24+
):
25+
self.flags = flags
26+
if checksum is not None:
27+
checksum = common.ChecksumType.standardize(checksum)
28+
self.checksum = checksum or \
29+
(common.ChecksumType.none, None)
30+
31+
self.args = args or []
32+
33+
def encode(self):
34+
self.args = list(map(
35+
lambda arg: self.encode_str(arg),
36+
self.args))
37+
38+
def decode(self):
39+
self.args = list(map(
40+
lambda arg: self.decode_str(arg),
41+
self.args))
42+
43+
def fragment(self, space_left, fragment_msg):
44+
"""Streaming Message got fragmented based on
45+
payload size. All the data within space_left
46+
will be kept. All the rest will be shifted to
47+
next fragment message.
48+
49+
:param space_left: space left for current frame
50+
:param fragment_msg: the type is either
51+
CallRequestMessage or CallResponseMessage
52+
53+
54+
:return: None if there is space left
55+
or next fragment message
56+
"""
57+
new_args = []
58+
for i, arg in enumerate(self.args):
59+
if space_left >= 2: # 2bytes for size
60+
space_left -= 2
61+
62+
if arg is not None:
63+
arg_length = len(arg)
64+
if space_left < arg_length:
65+
fragment_msg.args.append(arg[space_left+1:])
66+
new_args.append(arg[:space_left])
67+
space_left = 0
68+
else:
69+
new_args.append(arg)
70+
space_left -= arg_length
71+
if space_left <= 2:
72+
# boundary for arg
73+
fragment_msg.args.append("")
74+
else:
75+
new_args.append(arg)
76+
else:
77+
for l in range(i, len(self.args)):
78+
fragment_msg.args.append(self.args[l])
79+
break
80+
81+
self.args = new_args
82+
if space_left >= 0 and len(fragment_msg.args) == 0:
83+
# don't need to fragment any more
84+
self.flags = 0x00
85+
return None
86+
else:
87+
self.flags = 0x01
88+
return fragment_msg
89+

tchannel/messages/call_request.py

+44-29
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,20 @@
11
from __future__ import absolute_import
22

3-
from .base import BaseMessage
3+
from .call_request_continue import CallRequestContinueMessage
44
from .types import Types
55
from .. import rw
6-
from . import common
6+
from . import common, ChecksumType
77

88

9-
class CallRequestMessage(BaseMessage):
9+
class CallRequestMessage(CallRequestContinueMessage):
1010
"""Initiate an RPC call."""
1111
message_type = Types.CALL_REQ
1212

1313
__slots__ = (
14-
'flags',
1514
'ttl',
1615
'tracing',
17-
1816
'service',
1917
'headers',
20-
21-
'checksum',
22-
23-
'arg_1',
24-
'arg_2',
25-
'arg_3',
2618
)
2719

2820
def __init__(
@@ -33,42 +25,65 @@ def __init__(
3325
service=None,
3426
headers=None,
3527
checksum=None,
36-
arg_1=None,
37-
arg_2=None,
38-
arg_3=None,
28+
args=None,
3929
):
40-
self.flags = flags
30+
super(CallRequestMessage, self).__init__(flags, checksum, args)
4131
self.ttl = ttl
4232
self.tracing = tracing or common.Tracing(0, 0, 0, 0)
4333
self.service = service or ''
4434
self.headers = dict(headers) if headers else {}
45-
if checksum is not None:
46-
checksum = common.ChecksumType.standardize(checksum)
47-
self.checksum = checksum or \
48-
(common.ChecksumType.none, None)
4935

50-
self.arg_1 = arg_1 or ''
51-
self.arg_2 = arg_2 or ''
52-
self.arg_3 = arg_3 or ''
36+
def encode(self):
37+
super(CallRequestMessage, self).encode()
38+
self.service = self.service.encode(common.ENCODE_TYPE)
39+
self.headers = dict(map(
40+
lambda (k, v): self.encode_pair(k, v),
41+
self.headers.iteritems()))
42+
43+
def decode(self):
44+
super(CallRequestMessage, self).decode()
45+
self.service = self.service.decode(common.DECODE_TYPE)
46+
self.headers = dict(map(
47+
lambda (k, v): self.decode_pair(k, v),
48+
self.headers.iteritems()))
49+
50+
def get_meta_size(self):
51+
size = 0
52+
size += 1 # flags: 1
53+
size += 4 # ttl: 4
54+
size += 25 # tracing: 24 | traceflags: 1
55+
size += 1 # service~1
56+
size += len(self.service)
57+
size += 1 # nh: 1
58+
for k, v in self.headers:
59+
size += 1 # hk~1
60+
size += len(k)
61+
size += 1 # hv~1
62+
size += len(v)
63+
64+
size += 1 # csumtype: 1
65+
size += 1 if self.checksum[0] != ChecksumType.none else 0
66+
return size
5367

68+
def get_size(self):
69+
size = self.get_meta_size()
70+
71+
for arg in self.args:
72+
size += 2
73+
size += len(arg)
74+
return size
5475

5576
call_req_rw = rw.instance(
5677
CallRequestMessage,
5778
("flags", rw.number(1)), # flags:1
5879
("ttl", rw.number(4)), # ttl:4
59-
6080
("tracing", common.tracing_rw), # tracing:24
6181
# traceflags: 1
62-
6382
("service", rw.len_prefixed_string(rw.number(1))), # service~1
6483
("headers", rw.headers( # nh:1 (hk~1 hv~1){nh}
6584
rw.number(1),
6685
rw.len_prefixed_string(rw.number(1))
6786
)),
68-
6987
("checksum", common.checksum_rw), # csumtype:1 (csum:4){0, 1}
70-
71-
("arg_1", rw.len_prefixed_string(rw.number(2), is_binary=True)), # arg1~2
72-
("arg_2", rw.len_prefixed_string(rw.number(2), is_binary=True)), # arg2~2
73-
("arg_3", rw.len_prefixed_string(rw.number(2), is_binary=True)), # arg3~2
88+
("args", rw.args(rw.number(2))), # [arg1~2, arg2~2, arg3~2]
7489
)
+23-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
from __future__ import absolute_import
22

3-
from .call_request import CallRequestMessage
43
from .types import Types
4+
from .. import rw
5+
from . import common
6+
from .call_continue import CallContinueMessage
57

68

7-
class CallRequestContinueMessage(CallRequestMessage):
9+
class CallRequestContinueMessage(CallContinueMessage):
810
"""Represent a continuation of a call request (across multiple frames)."""
911
message_type = Types.CALL_REQ_CONTINUE
1012

11-
# TODO
13+
def __init__(
14+
self,
15+
flags=0,
16+
checksum=None,
17+
args=None,
18+
):
19+
super(CallRequestContinueMessage, self).__init__(flags, checksum, args)
20+
21+
def fragment(self, space_left):
22+
fragment_msg = CallRequestContinueMessage()
23+
return super(CallRequestContinueMessage, self).fragment(space_left, fragment_msg)
24+
25+
26+
call_req_c_rw = rw.instance(
27+
CallRequestContinueMessage,
28+
("flags", rw.number(1)), # flags:1
29+
("checksum", common.checksum_rw), # csumtype:1 (csum:4){0, 1}
30+
("args", rw.args(rw.number(2))), # [arg1~2, arg2~2, arg3~2]
31+
)

0 commit comments

Comments
 (0)