Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.

Commit 02a82f3

Browse files
author
Junchao Wu
committed
add protocol error object
throw TChannelException when client receives protocol error message
1 parent 431ca98 commit 02a82f3

File tree

6 files changed

+134
-44
lines changed

6 files changed

+134
-44
lines changed

tchannel/event.py

+12-12
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,8 @@
3232

3333
@unique
3434
class EventType(IntEnum):
35-
"""Types to represent system events
35+
"""Types to represent system events"""
3636

37-
Events:
38-
39-
send_request: before client sends request
40-
41-
send_response: after server sends response
42-
43-
receive_request: after server receive request
44-
45-
receive_response: after client receive response
46-
47-
"""
4837
before_send_request = 0x00
4938
after_send_request = 0x01
5039

@@ -57,6 +46,9 @@ class EventType(IntEnum):
5746
before_receive_response = 0x30
5847
after_receive_response = 0x31
5948

49+
after_receive_protocol_error = 0x40
50+
after_send_protocol_error = 0x41
51+
6052

6153
class EventHook(object):
6254
"""provide all event hook interfaces
@@ -103,6 +95,14 @@ def after_receive_response(self, response):
10395
"""Called after a ``CALL_RESP`` message is read."""
10496
pass
10597

98+
def after_receive_protocol_error(self, error):
99+
"""Called after a ''error'' message is read."""
100+
pass
101+
102+
def after_send_protocol_error(self, error):
103+
"""Called after a ''error'' message is sent."""
104+
pass
105+
106106

107107
class EventEmitter(object):
108108
def __init__(self):

tchannel/tornado/connection.py

+42-11
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,20 @@
3636
from ..event import EventType
3737
from ..exceptions import ConnectionClosedException
3838
from ..exceptions import InvalidErrorCodeException
39+
from ..exceptions import TChannelException
3940
from ..io import BytesIO
4041
from ..messages.common import PROTOCOL_VERSION
4142
from ..messages.common import FlagsType
4243
from ..messages.error import ErrorMessage
4344
from ..messages.types import Types
45+
from .data import ProtocolError
4446
from .message_factory import MessageFactory
4547

4648
try:
4749
import tornado.queues as queues # included in 4.2
4850
except ImportError:
4951
import toro as queues
5052

51-
5253
log = logging.getLogger('tchannel')
5354

5455

@@ -222,6 +223,17 @@ def _loop(self):
222223
future = self._outstanding.get(context.message_id)
223224
else:
224225
future = self._outstanding.pop(context.message_id)
226+
227+
if context.message.message_type == Types.ERROR:
228+
protocol_error = (
229+
self.response_message_factory.build_protocol_error(
230+
context.message,
231+
context.message_id,
232+
))
233+
234+
future.set_result(protocol_error)
235+
continue
236+
225237
if response and future.running():
226238
future.set_result(response)
227239
continue
@@ -368,7 +380,7 @@ def _extract_handshake_headers(self, message):
368380
)
369381

370382
(self.remote_host,
371-
self.remote_host_port) = message.host_port.rsplit(':', 1)
383+
self.remote_host_port) = message.host_port.rsplit(':', 1)
372384
self.remote_host_port = int(self.remote_host_port)
373385
self.remote_process_name = message.process_name
374386
self.requested_version = message.version
@@ -593,19 +605,38 @@ def send_request(self, request):
593605
# TODO: fire before_receive_response
594606

595607
def adapt_tracing(f):
596-
# fetch the request tracing for response
597-
f.result().tracing = request.tracing
598-
response_future.set_result(f.result())
599-
# event: receive_response
600-
if self.tchannel:
601-
self.tchannel.event_emitter.fire(
602-
EventType.after_receive_response,
603-
f.result(),
608+
if not f.exception():
609+
# fetch the request tracing for response
610+
f.result().tracing = request.tracing
611+
612+
if isinstance(f.result(), ProtocolError):
613+
protocol_error = f.result()
614+
response_future.set_exception(
615+
TChannelException(protocol_error.message)
616+
)
617+
# event: after_receive_protocol_error
618+
if self.tchannel:
619+
self.tchannel.event_emitter.fire(
620+
EventType.after_receive_protocol_error,
621+
protocol_error,
622+
)
623+
else:
624+
response = f.result()
625+
response_future.set_result(response)
626+
# event: after_receive_response
627+
if self.tchannel:
628+
self.tchannel.event_emitter.fire(
629+
EventType.after_receive_response,
630+
response,
631+
)
632+
else:
633+
# TODO unexpected exception
634+
response_future.set_exception(
635+
f.exception()
604636
)
605637

606638
tornado.ioloop.IOLoop.current().add_future(
607639
future,
608640
adapt_tracing,
609641
)
610-
611642
return response_future

tchannel/tornado/data.py

+19
Original file line numberDiff line numberDiff line change
@@ -316,3 +316,22 @@ def close_argstreams(self, force=False):
316316
for stream in self.argstreams:
317317
if stream.auto_close or force:
318318
stream.close()
319+
320+
321+
class ProtocolError(object):
322+
"""Object to represent protocol error message
323+
324+
Note: Internal use only.
325+
"""
326+
327+
def __init__(
328+
self,
329+
code,
330+
message,
331+
id=None,
332+
tracing=None,
333+
):
334+
self.code = code
335+
self.tracing = tracing
336+
self.id = id
337+
self.message = message

tchannel/tornado/message_factory.py

+26
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
from ..messages.common import Tracing
3535
from ..messages.common import generate_checksum
3636
from ..messages.common import verify_checksum
37+
from ..messages.error import ErrorMessage
3738
from ..zipkin.annotation import Endpoint
3839
from ..zipkin.trace import Trace
40+
from .data import ProtocolError
3941
from .data import Request
4042
from .data import Response
4143
from .stream import InMemStream
@@ -56,6 +58,30 @@ def __init__(self, remote_host=None, remote_host_port=None):
5658
self.in_checksum = {}
5759
self.out_checksum = {}
5860

61+
def build_raw_error_message(self, err):
62+
"""build protocol level error message based on Error object"""
63+
message = ErrorMessage(
64+
code=err.code,
65+
tracing=err.Tracing(err.tracing.span_id,
66+
err.tracing.parent_span_id,
67+
err.tracing.trace_id,
68+
err.tracing.traceflags),
69+
message=err.message,
70+
)
71+
72+
return message
73+
74+
def build_protocol_error(self, message, message_id=None):
75+
"""build protocol level error message based on Error object"""
76+
77+
error = ProtocolError(
78+
code=message.code,
79+
message=message.message,
80+
id=message_id,
81+
)
82+
83+
return error
84+
5985
def build_raw_request_message(self, request, args, is_completed=False):
6086
"""build protocol level message based on request and args.
6187

tchannel/zipkin/zipkin_trace.py

+30-14
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,50 @@ def __init__(self, tchannel=None, dst=None):
3939
# to dst. By default it writes to stdout
4040
self.tracer = DebugTracer(dst)
4141

42-
def before_send_request(self, context):
43-
if not context.tracing.traceflags:
42+
def before_send_request(self, request):
43+
if not request.tracing.traceflags:
4444
return
4545

4646
ann = annotation.client_send()
47-
context.tracing.annotations.append(ann)
47+
request.tracing.annotations.append(ann)
4848

49-
def before_receive_request(self, context):
50-
if not context.tracing.traceflags:
49+
def before_receive_request(self, request):
50+
if not request.tracing.traceflags:
5151
return
5252

5353
ann = annotation.server_recv()
54-
context.tracing.annotations.append(ann)
54+
request.tracing.annotations.append(ann)
5555

56-
def after_send_response(self, context):
57-
if not context.tracing.traceflags:
56+
def after_send_response(self, response):
57+
if not response.tracing.traceflags:
5858
return
5959

6060
# send out a pair of annotations{server_recv, server_send} to zipkin
6161
ann = annotation.server_send()
62-
context.tracing.annotations.append(ann)
63-
self.tracer.record([(context.tracing, context.tracing.annotations)])
62+
response.tracing.annotations.append(ann)
63+
self.tracer.record([(response.tracing, response.tracing.annotations)])
6464

65-
def after_receive_response(self, context):
66-
if not context.tracing.traceflags:
65+
def after_receive_response(self, response):
66+
if not response.tracing.traceflags:
6767
return
6868

6969
# send out a pair of annotations{client_recv, client_send} to zipkin
7070
ann = annotation.client_recv()
71-
context.tracing.annotations.append(ann)
72-
self.tracer.record([(context.tracing, context.tracing.annotations)])
71+
response.tracing.annotations.append(ann)
72+
self.tracer.record([(response.tracing, response.tracing.annotations)])
73+
74+
def after_receive_protocol_error(self, error):
75+
if not error.tracing.traceflags:
76+
return
77+
78+
ann = annotation.client_recv()
79+
error.tracing.annotations.append(ann)
80+
self.tracer.record([(error.tracing, error.tracing.annotations)])
81+
82+
def after_send_protocol_error(self, error):
83+
if not error.tracing.traceflags:
84+
return
85+
86+
ann = annotation.server_send()
87+
error.tracing.annotations.append(ann)
88+
self.tracer.record([(error.tracing, error.tracing.annotations)])

tests/integration/test_client_server.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424

2525
from tchannel import tcurl
2626
from tchannel.exceptions import ConnectionClosedException
27-
from tchannel.messages import Types
28-
from tchannel.messages.error import ErrorCode
27+
from tchannel.exceptions import TChannelException
2928
from tchannel.tornado import TChannel
3029
from tchannel.tornado.connection import StreamConnection
3130
from tchannel.tornado.data import Response
@@ -126,8 +125,7 @@ def test_endpoint_not_found(tchannel_server, call_response):
126125

127126
hostport = 'localhost:%d' % (tchannel_server.port)
128127

129-
response = yield tchannel.request(hostport).send(InMemStream(),
130-
InMemStream(),
131-
InMemStream())
132-
assert response.message_type == Types.ERROR
133-
assert response.code == ErrorCode.bad_request
128+
with pytest.raises(TChannelException):
129+
yield tchannel.request(hostport).send(InMemStream(),
130+
InMemStream(),
131+
InMemStream())

0 commit comments

Comments
 (0)