Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.

Commit eb13932

Browse files
committedMar 16, 2015
add test
1 parent d41eefd commit eb13932

13 files changed

+318
-165
lines changed
 

‎Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
project := tchannel
22

33
flake8 := flake8
4-
pytest := py.test -s --tb short --cov-config .coveragerc --cov \
4+
pytest := py.test -sv --tb short --cov-config .coveragerc --cov \
55
$(project) tests
66

77
html_report := --cov-report html

‎requirements-test.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Testing and coverage
22
pytest
33
pytest-cov
4+
pytest-tornado
45

56
# Test all the pythons
67
tox

‎tchannel/tchannel.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
import tornado.ioloop
99
import tornado.iostream
1010

11-
from .tornado.connection import TornadoConnection
1211
from .exceptions import InvalidMessageException
1312
from .messages import CallRequestMessage
14-
from .timeout import timeout
13+
from .tornado.connection import TornadoConnection
14+
from .tornado.timeout import timeout
1515

1616

1717
log = logging.getLogger('tchannel')
@@ -31,15 +31,15 @@ def __init__(self, process_name=None):
3131
@tornado.gen.coroutine
3232
def add_peer(self, hostport):
3333
if hostport in self.peers:
34-
return
34+
raise tornado.gen.Return(self.peers[hostport])
3535

3636
peer = yield self.make_out_connection(hostport)
3737
self.peers[hostport] = peer
3838
raise tornado.gen.Return(peer)
3939

4040
def remove_peer(self, hostport):
4141
# TODO: Connection cleanup
42-
self.peers.pop(hostport)
42+
return self.peers.pop(hostport)
4343

4444
@tornado.gen.coroutine
4545
def get_peer(self, hostport):
@@ -50,10 +50,13 @@ def get_peer(self, hostport):
5050
raise tornado.gen.Return(peer)
5151

5252
@tornado.gen.coroutine
53-
def make_out_connection(self, hostport):
53+
def make_out_connection(self, hostport, sock=None):
5454
host, port = hostport.rsplit(":", 1)
5555

5656
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57+
58+
# TODO: change this to tornado.tcpclient.TCPClient to do async DNS
59+
# lookups.
5760
stream = tornado.iostream.IOStream(sock)
5861

5962
log.debug("connecting to hostport %s", hostport)

‎tchannel/tcurl.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def tcurl(host, endpoint, headers, body):
4040
print " arg2:", getattr(response, 'arg_2', None)
4141
print " arg3:", getattr(response, 'arg_3', None)
4242

43+
raise tornado.gen.Return(response)
44+
4345

4446
def parse_args():
4547
parser = argparse.ArgumentParser()
@@ -104,5 +106,5 @@ def main():
104106
)
105107

106108

107-
if __name__ == '__main__':
109+
if __name__ == '__main__': # pragma: no cover
108110
main()

‎tchannel/timeout.py

-19
This file was deleted.

‎tchannel/tornado/connection.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ def write(self, data, callback=None):
2626
"""
2727
return self._stream.write(bytes(data), callback=callback)
2828

29-
def close(self):
30-
self._stream.close()
31-
3229

3330
class TornadoConnection(Connection):
3431
"""Handle speaking TChannel over a Tornado connection."""
@@ -51,3 +48,6 @@ def await(self, callback=None):
5148

5249
def handle_calls(self, handler):
5350
return self.await(callback=self.wrap(handler))
51+
52+
def close(self):
53+
return self._connection._stream.close()

‎tchannel/tornado/timeout.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from __future__ import absolute_import
2+
3+
import contextlib2
4+
import tornado.ioloop
5+
6+
7+
@contextlib2.contextmanager
8+
def timeout(connection, seconds=2):
9+
10+
handle = tornado.ioloop.IOLoop.instance().call_later(
11+
seconds,
12+
connection.close,
13+
)
14+
15+
yield handle
16+
17+
tornado.ioloop.IOLoop.instance().remove_timeout(handle)

‎tests/integration/__init__.py

Whitespace-only changes.

‎tests/integration/conftest.py

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import absolute_import
2+
3+
import socket
4+
5+
import pytest
6+
7+
from tests.integration.server_manager import ServerManager
8+
9+
10+
@pytest.yield_fixture
11+
def server_manager(random_open_port):
12+
with ServerManager(random_open_port) as manager:
13+
yield manager
14+
15+
16+
@pytest.fixture
17+
def random_open_port():
18+
"""Find and return a random open TCP port."""
19+
sock = socket.socket(socket.AF_INET)
20+
try:
21+
sock.bind(('', 0))
22+
return sock.getsockname()[1]
23+
finally:
24+
sock.close()

‎tests/integration/server_manager.py

+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
from __future__ import absolute_import
2+
3+
import socket
4+
import threading
5+
try:
6+
import SocketServer
7+
except ImportError:
8+
import socketserver as SocketServer
9+
from contextlib import contextmanager
10+
11+
import tchannel.socket as tchannel
12+
import tchannel.messages as tmessage
13+
14+
15+
class Expectation(object):
16+
"""Represents an expectation for the ServerManager."""
17+
def __init__(self, matcher):
18+
assert matcher is not None
19+
20+
# expectation.matches(req) accepts a Message and returns True or
21+
# False.
22+
self.matches = matcher
23+
24+
# expectation.respond(context, connection) accepts the context and the
25+
# connection and writes output to the connection.
26+
self._respond = None
27+
28+
@classmethod
29+
def messageType(cls, msg_typ):
30+
"""Build an expectation that expects a mesasge with the given type."""
31+
return cls(lambda msg: msg.message_type == msg_typ)
32+
33+
@property
34+
def respond(self):
35+
# Do nothing if an action setter wasn't called.
36+
if self._respond:
37+
return self._respond
38+
else:
39+
return (lambda ctx, conn: None)
40+
41+
def and_return(self, resp):
42+
"""Write the given Message as a response."""
43+
def respond(ctx, conn):
44+
return conn.frame_and_write(resp)
45+
self._respond = respond
46+
47+
48+
class ServerManager(object):
49+
"""Provides a dynamically configurable TChannel server."""
50+
TIMEOUT = 0.15
51+
52+
def __init__(self, port, timeout=None):
53+
manager = self
54+
self.port = port
55+
self.timeout = timeout or self.TIMEOUT
56+
57+
class Handler(SocketServer.BaseRequestHandler):
58+
def setup(self):
59+
self.request.settimeout(manager.timeout)
60+
self.tchan_conn = tchannel.SocketConnection(self.request)
61+
62+
def handle(self):
63+
(host, port) = self.request.getsockname()
64+
self.tchan_conn.await_handshake(headers={
65+
'host_port': '%s:%s' % (host, port),
66+
'process_name': 'tchannel_server-%s' % port
67+
})
68+
self.tchan_conn.handle_calls(manager.handle_call)
69+
70+
self.server = SocketServer.TCPServer(("", port), Handler)
71+
self.thread = None
72+
self._expectations = []
73+
74+
def expect_ping(self):
75+
"""Expect a Ping request.
76+
77+
Returns an Expectation to allow setting the response behavior."""
78+
exp = Expectation.messageType(
79+
tmessage.PingRequestMessage.message_type
80+
)
81+
82+
self._expectations.append(exp)
83+
return exp
84+
85+
def expect_call_request(self, endpoint):
86+
def matcher(message):
87+
return (
88+
message.message_type == tmessage.CallRequestMessage.message_type and
89+
message.arg_1 == endpoint
90+
)
91+
exp = Expectation(matcher)
92+
self._expectations.append(exp)
93+
return exp
94+
95+
def handle_call(self, context, connection):
96+
for exp in self._expectations:
97+
if exp.matches(context.message):
98+
exp.respond(context, connection)
99+
100+
@contextmanager
101+
def client_connection(self):
102+
"""Get an initiated Connection to this TChannel server."""
103+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104+
sock.settimeout(self.timeout)
105+
try:
106+
(host, port) = ('localhost', self.port)
107+
sock.connect((host, port))
108+
109+
conn = tchannel.SocketConnection(sock)
110+
conn.initiate_handshake(headers={
111+
'host_port': '%s:%s' % (host, port),
112+
'process_name': 'tchannel_client-%s' % port
113+
})
114+
conn.await_handshake_reply()
115+
yield conn
116+
finally:
117+
sock.close()
118+
119+
def start(self):
120+
assert self.thread is None, 'server already started'
121+
self.thread = threading.Thread(target=self.server.serve_forever)
122+
self.thread.start()
123+
124+
def stop(self):
125+
self.server.shutdown()
126+
127+
def __enter__(self):
128+
self.start()
129+
return self
130+
131+
def __exit__(self, *args):
132+
self.stop()
+52-136
Original file line numberDiff line numberDiff line change
@@ -1,151 +1,67 @@
11
from __future__ import absolute_import
22

3-
import socket
4-
import threading
5-
try:
6-
import SocketServer
7-
except ImportError:
8-
import socketserver as SocketServer
9-
from contextlib import contextmanager
10-
113
import pytest
124

13-
import tchannel.socket as tchannel
145
import tchannel.messages as tmessage
6+
from tchannel.tchannel import TChannel
7+
from tchannel.tcurl import tcurl
158

169

1710
@pytest.fixture
18-
def random_open_port():
19-
"""Find and return a random open TCP port."""
20-
sock = socket.socket(socket.AF_INET)
21-
try:
22-
sock.bind(('', 0))
23-
return sock.getsockname()[1]
24-
finally:
25-
sock.close()
26-
27-
28-
class Expectation(object):
29-
"""Represents an expectation for the ServerManager."""
30-
def __init__(self, matcher):
31-
assert matcher is not None
32-
33-
# expectation.matches(req) accepts a Message and returns True or
34-
# False.
35-
self.matches = matcher
36-
37-
# expectation.respond(context, connection) accepts the context and the
38-
# connection and writes output to the connection.
39-
self._respond = None
40-
41-
@classmethod
42-
def messageType(cls, msg_typ):
43-
"""Build an expectation that expects a mesasge with the given type."""
44-
return cls(lambda msg: msg.message_type == msg_typ)
45-
46-
@property
47-
def respond(self):
48-
# Do nothing if an action setter wasn't called.
49-
if self._respond:
50-
return self._respond
51-
else:
52-
return (lambda ctx, conn: None)
53-
54-
def and_return(self, resp):
55-
"""Write the given Message as a response."""
56-
def respond(ctx, conn):
57-
return conn.frame_and_write(resp)
58-
self._respond = respond
59-
60-
61-
class ServerManager(object):
62-
"""Provides a dynamically configurable TChannel server."""
63-
TIMEOUT = 0.15
64-
65-
def __init__(self, port, timeout=None):
66-
manager = self
67-
self.port = port
68-
self.timeout = timeout or self.TIMEOUT
69-
70-
class Handler(SocketServer.BaseRequestHandler):
71-
def setup(self):
72-
self.request.settimeout(manager.timeout)
73-
self.tchan_conn = tchannel.SocketConnection(self.request)
74-
75-
def handle(self):
76-
(host, port) = self.request.getsockname()
77-
self.tchan_conn.await_handshake(headers={
78-
'host_port': '%s:%s' % (host, port),
79-
'process_name': 'tchannel_server-%s' % port
80-
})
81-
self.tchan_conn.handle_calls(manager.handle_call)
82-
83-
self.server = SocketServer.TCPServer(("", port), Handler)
84-
self.thread = None
85-
self._expectations = []
86-
87-
def expect_ping(self):
88-
"""Expect a Ping request.
89-
90-
Returns an Expectation to allow setting the response behavior."""
91-
exp = Expectation.messageType(
92-
tmessage.PingRequestMessage.message_type
93-
)
94-
95-
self._expectations.append(exp)
96-
return exp
97-
98-
def handle_call(self, context, connection):
99-
for exp in self._expectations:
100-
if exp.matches(context.message):
101-
exp.respond(context, connection)
102-
103-
@contextmanager
104-
def client_connection(self):
105-
"""Get an initiated Connection to this TChannel server."""
106-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107-
sock.settimeout(self.timeout)
108-
try:
109-
(host, port) = ('localhost', self.port)
110-
sock.connect((host, port))
111-
112-
conn = tchannel.SocketConnection(sock)
113-
conn.initiate_handshake(headers={
114-
'host_port': '%s:%s' % (host, port),
115-
'process_name': 'tchannel_client-%s' % port
116-
})
117-
conn.await_handshake_reply()
118-
yield conn
119-
finally:
120-
sock.close()
121-
122-
def start(self):
123-
assert self.thread is None, 'server already started'
124-
self.thread = threading.Thread(target=self.server.serve_forever)
125-
self.thread.start()
126-
127-
def stop(self):
128-
self.server.shutdown()
129-
130-
def __enter__(self):
131-
self.start()
132-
return self
133-
134-
def __exit__(self, *args):
135-
self.stop()
136-
137-
138-
@pytest.yield_fixture
139-
def server_manager(random_open_port):
140-
with ServerManager(random_open_port) as manager:
141-
yield manager
142-
143-
144-
def test_ping_pong(server_manager):
11+
def call_response():
12+
resp = tmessage.CallResponseMessage()
13+
resp.flags = 0
14+
resp.code = 0
15+
resp.span_id = 0
16+
resp.parent_id = 0
17+
resp.trace_id = 0
18+
resp.traceflags = 0
19+
resp.headers = {}
20+
resp.checksum_type = 0
21+
resp.checksum = 0
22+
resp.arg_1 = 'hello'
23+
resp.arg_2 = ''
24+
resp.arg_3 = 'world'
25+
return resp
26+
27+
28+
def test_tcp_ping_pong(server_manager):
14529
with server_manager.client_connection() as conn:
14630
resp = tmessage.PingResponseMessage()
14731
server_manager.expect_ping().and_return(resp)
14832

14933
for i in range(1000):
15034
conn.ping()
15135
assert resp == next(conn).message
36+
37+
38+
@pytest.mark.gen_test
39+
def test_tchannel_call_request(server_manager, call_response):
40+
endpoint = 'tchannelpeertest'
41+
call_response.arg_1 = endpoint
42+
43+
server_manager.expect_call_request(endpoint).and_return(call_response)
44+
45+
tchannel = TChannel()
46+
47+
hostport = 'localhost:%d' % (server_manager.port)
48+
49+
response = yield tchannel.request(hostport).send(endpoint, '', '')
50+
51+
assert response.arg_1 == call_response.arg_1
52+
assert response.arg_3 == call_response.arg_3
53+
54+
55+
@pytest.mark.gen_test
56+
def test_tcurl(server_manager, call_response):
57+
endpoint = 'tcurltest'
58+
call_response.arg_1 = endpoint
59+
60+
server_manager.expect_call_request(endpoint).and_return(call_response)
61+
62+
hostport = 'localhost:%d' % (server_manager.port)
63+
64+
response = yield tcurl(hostport, endpoint, '', '')
65+
66+
assert response.arg_1 == call_response.arg_1
67+
assert response.arg_3 == call_response.arg_3

‎tests/test_tchannel.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from __future__ import absolute_import
2+
3+
import socket
4+
5+
import pytest
6+
import tornado.gen
7+
import tornado.ioloop
8+
import tornado.testing
9+
10+
from tchannel.tchannel import TChannel
11+
from tchannel.tornado.connection import TornadoConnection
12+
13+
14+
@pytest.mark.gen_test
15+
def test_add_peer_caching():
16+
"Connections are long-lived and should not be recreated."""
17+
tchannel = TChannel()
18+
tchannel.peers = {'foo': 'bar'}
19+
result = yield tchannel.add_peer('foo')
20+
assert result == 'bar'
21+
22+
23+
def test_remove_peer():
24+
tchannel = TChannel()
25+
tchannel.peers = {'foo': 'bar'}
26+
assert tchannel.remove_peer('foo') == 'bar'
27+
28+
29+
@pytest.mark.gen_test
30+
def test_get_peer_with_caching():
31+
tchannel = TChannel()
32+
tchannel.peers = {'foo': 'bar'}
33+
result = yield tchannel.get_peer('foo')
34+
assert result == 'bar'
35+
36+
37+
#@pytest.mark.gen_test
38+
#def test_tchannel_make_out_connection():
39+
#server_sock, client_sock = socket.socketpair()
40+
41+
#server_stream = tornado.iostream.IOStream(server_sock)
42+
#client_stream = tornado.iostream.IOStream(client_sock)
43+
44+
#server_conn = TornadoConnection(server_stream)
45+
#client_conn = TornadoConnection(client_stream)
46+
47+
#hostname = server_sock.getsockname()
48+
49+
#tchannel = TChannel()
50+
#conn = yield tchannel.make_out_connection(':', sock=server_sock)

‎tests/tornado/test_timeout.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import absolute_import
2+
3+
import pytest
4+
import tornado.gen
5+
import tornado.ioloop
6+
import tornado.testing
7+
8+
from tchannel.tornado.timeout import timeout
9+
10+
11+
class TimeoutTestCase(tornado.testing.AsyncTestCase):
12+
13+
@pytest.fixture(autouse=True)
14+
def make_server_client(self, tornado_pair):
15+
self.server, self.client = tornado_pair
16+
17+
def get_new_ioloop(self):
18+
return tornado.ioloop.IOLoop.instance()
19+
20+
@tornado.testing.gen_test
21+
def test_server_timeout(self):
22+
with timeout(self.client, seconds=0.001):
23+
future = self.client.initiate_handshake(headers={})
24+
yield tornado.gen.sleep(0.001)
25+
yield future
26+
27+
assert self.client.closed

0 commit comments

Comments
 (0)
This repository has been archived.