Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.

Commit 568298d

Browse files
author
Junchao Wu
committed
add event hook into tchannel
add zipkin tracing
1 parent ac9f528 commit 568298d

35 files changed

+1731
-68
lines changed

examples/handler.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131

3232
@tornado.gen.coroutine
33-
def say_hi(request, response, opts):
33+
def say_hi(request, response, proxy):
3434
arg2 = yield request.arg2()
3535
arg3 = yield request.arg3()
3636
response.argstreams = [
@@ -41,7 +41,7 @@ def say_hi(request, response, opts):
4141

4242

4343
@tornado.gen.coroutine
44-
def say_ok(request, response, opts):
44+
def say_ok(request, response, proxy):
4545
yield print_arg(request, 1)
4646
yield print_arg(request, 2)
4747

@@ -52,9 +52,10 @@ def say_ok(request, response, opts):
5252

5353

5454
@tornado.gen.coroutine
55-
def echo(request, response, opts):
55+
def echo(request, response, proxy):
56+
print "echo"
57+
yield tornado.gen.sleep(1)
5658
# stream args right back to request side
57-
print "streaming"
5859
response.argstreams = [
5960
InMemStream(request.endpoint),
6061
request.argstreams[1],
@@ -63,7 +64,7 @@ def echo(request, response, opts):
6364

6465

6566
@tornado.gen.coroutine
66-
def slow(request, response, opts):
67+
def slow(request, response, proxy):
6768
yield tornado.gen.sleep(random.random())
6869
response.argstreams = [
6970
InMemStream(),
@@ -80,7 +81,7 @@ def get_example_handler():
8081
dispatcher.register("slow", slow)
8182

8283
@dispatcher.route("bye")
83-
def say_bye(request, response, opts):
84+
def say_bye(request, response, proxy):
8485
yield print_arg(request, 1)
8586
yield print_arg(request, 2)
8687

examples/stream_client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
import os
2222
import sys
23-
from options import get_args
2423
import tornado
2524
import tornado.ioloop
25+
26+
from options import get_args
2627
from tchannel.tornado import TChannel
2728
from tchannel.tornado.stream import InMemStream, PipeStream
2829
from tchannel.tornado.util import print_arg
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

tchannel/event.py

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Copyright (c) 2015 Uber Technologies, Inc.
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining a copy
4+
# of this software and associated documentation files (the "Software"), to deal
5+
# in the Software without restriction, including without limitation the rights
6+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
# copies of the Software, and to permit persons to whom the Software is
8+
# furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in
11+
# all copies or substantial portions of the Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
# THE SOFTWARE.
20+
21+
from __future__ import absolute_import
22+
from enum import IntEnum
23+
import logging
24+
25+
log = logging.getLogger('tchannel')
26+
27+
28+
class EventType(IntEnum):
29+
"""Types to represent system events
30+
31+
Events:
32+
33+
send_request: before client sends request
34+
35+
send_response: after server sends response
36+
37+
receive_request: after server receive request
38+
39+
receive_response: after client receive response
40+
41+
"""
42+
send_request = 0x00,
43+
send_response = 0x01,
44+
receive_request = 0x02,
45+
receive_response = 0x03
46+
47+
48+
class EventHook(object):
49+
"""provide all event hook interfaces
50+
51+
Customized Hook should should inherit from EventHook class and implement
52+
the events' hooks that it wants to listen.
53+
54+
Example::
55+
56+
TraceHook(EventHook):
57+
def send_request(self, context):
58+
....
59+
60+
"""
61+
62+
def send_request(self, context):
63+
"""Event hook for sending request
64+
65+
:param context:
66+
request object to send
67+
"""
68+
pass
69+
70+
def send_response(self, context):
71+
"""Event hook for sending response
72+
73+
:param context:
74+
response object sent
75+
"""
76+
pass
77+
78+
def receive_request(self, context):
79+
"""Event hook for receiving request
80+
81+
:param context:
82+
request object received
83+
"""
84+
pass
85+
86+
def receive_response(self, context):
87+
"""Event hook for receiving response
88+
89+
:param context:
90+
response object received
91+
"""
92+
pass
93+
94+
95+
class EventEmitter(object):
96+
def __init__(self):
97+
self.hooks = []
98+
99+
def register_hook(self, hook):
100+
self.hooks.append(hook)
101+
102+
def fire(self, event, *args, **kwargs):
103+
# TODO find proper hook name
104+
event_hook_name = event.name
105+
for hook in self.hooks:
106+
try:
107+
getattr(hook, event_hook_name)(*args, **kwargs)
108+
except Exception as e:
109+
log.error(e.message)

tchannel/tornado/connection.py

+53-8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import os
2525
import socket
2626
import sys
27+
from tchannel.event import EventType
2728

2829
import tornado.gen
2930
import tornado.iostream
@@ -75,13 +76,24 @@ class TornadoConnection(object):
7576
CALL_REQ_TYPES = frozenset([Types.CALL_REQ, Types.CALL_REQ_CONTINUE])
7677
CALL_RES_TYPES = frozenset([Types.CALL_RES, Types.CALL_RES_CONTINUE])
7778

78-
def __init__(self, connection):
79+
def __init__(self, connection, tchannel=None):
7980
assert connection, "connection is required"
8081

8182
self.closed = False
8283
self.connection = connection
8384

84-
self.remote_host = None
85+
sockname = connection.socket.getsockname()
86+
if len(sockname) == 2:
87+
(self.remote_host,
88+
self.remote_host_port) = sockname
89+
elif len(sockname) == 1:
90+
self.remote_host = sockname[0]
91+
self.remote_host_port = 0
92+
else:
93+
self.remote_host = "0.0.0.0"
94+
self.remote_host_port = 0
95+
96+
self.remote_host_port = int(self.remote_host_port)
8597
self.remote_process_name = None
8698
self.requested_version = PROTOCOL_VERSION
8799

@@ -90,8 +102,10 @@ def __init__(self, connection):
90102

91103
# We need to use two separate message factories to avoid message ID
92104
# collision while assembling fragmented messages.
93-
self.request_message_factory = MessageFactory()
94-
self.response_message_factory = MessageFactory()
105+
self.request_message_factory = MessageFactory(self.remote_host,
106+
self.remote_host_port)
107+
self.response_message_factory = MessageFactory(self.remote_host,
108+
self.remote_host_port)
95109

96110
# Queue of unprocessed incoming calls.
97111
self._messages = queues.Queue()
@@ -103,6 +117,8 @@ def __init__(self, connection):
103117
# handshake has been performed.
104118
self._loop_running = False
105119

120+
self.tchannel = tchannel
121+
106122
connection.set_close_callback(self._on_close)
107123

108124
def next_message_id(self):
@@ -344,14 +360,16 @@ def _extract_handshake_headers(self, message):
344360
'Missing required header: process_name'
345361
)
346362

347-
self.remote_host = message.host_port
363+
(self.remote_host,
364+
self.remote_host_port) = message.host_port.rsplit(':', 1)
365+
self.remote_host_port = int(self.remote_host_port)
348366
self.remote_process_name = message.process_name
349367
self.requested_version = message.version
350368

351369
@classmethod
352370
@tornado.gen.coroutine
353371
def outgoing(cls, hostport, process_name=None, serve_hostport=None,
354-
handler=None):
372+
handler=None, tchannel=None):
355373
"""Initiate a new connection to the given host.
356374
357375
:param hostport:
@@ -384,7 +402,7 @@ def outgoing(cls, hostport, process_name=None, serve_hostport=None,
384402
"Couldn't connect to %s" % hostport, e
385403
)
386404

387-
connection = cls(stream)
405+
connection = cls(stream, tchannel)
388406
log.debug("Performing handshake with %s", hostport)
389407
yield connection.initiate_handshake(headers={
390408
'host_port': serve_hostport,
@@ -521,12 +539,22 @@ def post_response(self, response):
521539
try:
522540
response.close_argstreams()
523541
yield self._stream(response, self.response_message_factory)
542+
543+
# event: send_response
544+
if self.tchannel:
545+
self.tchannel.event_emitter.fire(
546+
EventType.send_response, response)
524547
finally:
525548
response.close_argstreams(force=True)
526549

527550
@tornado.gen.coroutine
528551
def post_request(self, request):
529552
"""send the given request and response is not required"""
553+
554+
# event: send_request
555+
if self.tchannel:
556+
self.tchannel.event_emitter.fire(EventType.send_request, request)
557+
530558
try:
531559
request.close_argstreams()
532560
yield self._stream(request, self.request_message_factory)
@@ -554,4 +582,21 @@ def send_request(self, request):
554582
future = tornado.gen.Future()
555583
self._outstanding[request.id] = future
556584
self.post_request(request)
557-
return future
585+
586+
# the actual future that caller will yield
587+
res_future = tornado.gen.Future()
588+
589+
def adapt_tracing(f):
590+
# fetch the request tracing for response
591+
f.result().tracing = request.tracing
592+
res_future.set_result(f.result())
593+
# event: receive_response
594+
if self.tchannel:
595+
self.tchannel.event_emitter.fire(
596+
EventType.receive_response, f.result())
597+
598+
tornado.ioloop.IOLoop.current().add_future(
599+
future,
600+
adapt_tracing)
601+
602+
return res_future

0 commit comments

Comments
 (0)