Skip to content
This repository was archived by the owner on Jan 5, 2024. It is now read-only.

Commit 923b9d8

Browse files
committed
queue: much simpler queue implementation.
This implementation does not support terminate() or backpressure, but it's a much simpler implementation and easier to make thread-safe.
1 parent 393d307 commit 923b9d8

File tree

3 files changed

+213
-301
lines changed

3 files changed

+213
-301
lines changed

tchannel/_queue.py

+111-129
Original file line numberDiff line numberDiff line change
@@ -22,174 +22,156 @@
2222
absolute_import, unicode_literals, division, print_function
2323
)
2424

25-
from collections import deque
25+
import threading
26+
from collections import namedtuple
2627

27-
from tornado.gen import TimeoutError
2828
from tornado.ioloop import IOLoop
2929
from tornado.queues import QueueEmpty
3030
from tornado.concurrent import Future
3131

32-
__all__ = ['Queue', 'QueueEmpty', 'TimeoutError']
32+
__all__ = ['Queue', 'QueueEmpty']
3333

3434

35-
class Queue(object):
36-
"""A specialized version of Tornado's Queue class.
35+
Node = namedtuple('Node', 'value next')
3736

38-
This class is an almost drop-in replacement for Tornado's Queue class. It
39-
behaves similar to Tornado's Queue except it provides a ``terminate()``
40-
function to fail all outstanding operations.
4137

42-
:param int maxsize:
43-
If specified, this is the buffer size for the queue. Once the capacity
44-
is reached, we will start applying backpressure on putters. If
45-
unspecified or None, the queue is unbuffered.
46-
"""
38+
class Queue(object):
39+
"""An unbounded, thread-safe asynchronous queue."""
4740

48-
__slots__ = ('_getters', '_putters', '_surplus', 'maxsize')
41+
__slots__ = ('_get', '_put', '_lock')
4942

5043
# How this works:
5144
#
52-
# Reads:
53-
# - Check if we have a value sitting in surplus. If yes, use that.
54-
# Otherwise,
55-
# - Check if we have a putter waiting to provide a value. If yes, use
56-
# that. Otherwise,
57-
# - Store the future in getters for later.
45+
# _get and _put are futures maintaining pointers to a linked list of
46+
# futures. The linked list is implemented as Node objects holding the
47+
# value and the next future.
48+
#
49+
# Node
50+
# +---+---+ +---+---+ E: Empty future
51+
# | 1 | F-|-->| 2 | E | F: Filled future
52+
# +---+---+ +---+---+
53+
# ^ ^
54+
# +---+ | +---+ |
55+
# | F-|-+ | F-|-+
56+
# +---+ +---+
57+
# _get _put
58+
#
59+
# When there's a put, we fill the current empty future with a Node
60+
# containing the value and a pointer to the next, newly created empty
61+
# future.
62+
#
63+
# +---+---+ +---+---+ +---+---+
64+
# | 1 | F-|-->| 2 | F-|-->| 3 | E |
65+
# +---+---+ +---+---+ +---+---+
66+
# ^ ^
67+
# +---+ | +---+ |
68+
# | F-|-+ | F-|-+
69+
# +---+ +---+
70+
# _get _put
5871
#
59-
# Writes:
60-
# - Check if we have a future waiting for a value in getters. If yes, use
61-
# that. Othrewise,
62-
# - Check if we have room in surplus. If yes, use that. Otherwise,
63-
# - Store the value and future in putters for later.
72+
# When there's a get, we read the value from the current Node, and move
73+
# _get to the next future.
6474
#
65-
# Invariants:
66-
# - Either getters is empty or both, surplus and putters are empty.
67-
# - If putters is non-empty, surplus is maxsize (which is more than 0).
75+
# +---+---+ +---+---+
76+
# | 2 | F-|-->| 3 | E |
77+
# +---+---+ +---+---+
78+
# ^ ^
79+
# +---+ | +---+ |
80+
# | F-|-+ | F-|-+
81+
# +---+ +---+
82+
# _get _put
83+
84+
def __init__(self):
85+
self._lock = threading.Lock()
86+
87+
# Space for the next Node.
88+
hole = Future()
89+
90+
# Pointer to the Future that will contain the next Node.
91+
self._get = Future()
92+
self._get.set_result(hole)
93+
94+
# Pointer to the next empty Future that should be filled with a Node.
95+
self._put = Future()
96+
self._put.set_result(hole)
97+
98+
def put(self, value):
99+
"""Puts an item into the queue.
68100
69-
def __init__(self, maxsize=None):
70-
if maxsize is None:
71-
maxsize = 0
72-
self.maxsize = maxsize
101+
Returns a Future that resolves to None once the value has been
102+
accepted by the queue.
103+
"""
104+
io_loop = IOLoop.current()
105+
new_hole = Future()
73106

74-
# collection of futures waiting for values
75-
self._getters = deque()
107+
new_put = Future()
108+
new_put.set_result(new_hole)
76109

77-
# collection of (value, future) pairs waiting to put values.
78-
self._putters = deque()
110+
with self._lock:
111+
self._put, put = new_put, self._put
79112

80-
# collection of values that have not yet been consumed
81-
self._surplus = deque()
113+
answer = Future()
82114

83-
def terminate(self, exc):
84-
"""Terminate all outstanding get requests with the given exception.
115+
def _on_put(future):
116+
if future.exception(): # pragma: no cover (never happens)
117+
return answer.set_exc_info(future.exc_info())
85118

86-
:param exc:
87-
An exception or an exc_info triple.
88-
"""
89-
if isinstance(exc, tuple):
90-
fail = (lambda f: f.set_exc_info(exc))
91-
else:
92-
fail = (lambda f: f.set_exception(exc))
93-
94-
while self._putters:
95-
_, future = self._putters.popleft()
96-
if future.running():
97-
fail(future)
98-
99-
while self._getters:
100-
future = self._getters.popleft()
101-
if future.running():
102-
fail(future)
103-
104-
def __receive_put(self):
105-
"""Receive a value from a waiting putter."""
106-
while self._putters:
107-
value, future = self._putters.popleft()
108-
if future.running():
109-
self._surplus.append(value)
110-
future.set_result(None)
111-
return
112-
113-
def get(self, timeout=None):
114-
"""Get the next item from the queue.
115-
116-
Returns a future that resolves to the next item.
117-
118-
:param timeout:
119-
If set, the future will resolve to a TimeoutError if a value is
120-
not received within the given time. The value for ``timeout`` may
121-
be anything accepted by ``IOLoop.add_timeout`` (a ``timedelta`` or
122-
an **absolute** time relative to ``IOLoop.time``).
123-
"""
124-
self.__receive_put()
119+
old_hole = put.result()
120+
old_hole.set_result(Node(value, new_hole))
121+
answer.set_result(None)
125122

126-
answer = Future()
127-
if self._surplus:
128-
answer.set_result(self._surplus.popleft())
129-
return answer
130-
131-
# Wait for a value
132-
if timeout is not None:
133-
_add_timeout(timeout, answer)
134-
self._getters.append(answer)
123+
io_loop.add_future(put, _on_put)
135124
return answer
136125

137126
def get_nowait(self):
138127
"""Returns a value from the queue without waiting.
139128
140129
Raises ``QueueEmpty`` if no values are available right now.
141130
"""
142-
self.__receive_put()
131+
new_get = Future()
143132

144-
if self._surplus:
145-
return self._surplus.popleft()
146-
raise QueueEmpty()
133+
with self._lock:
134+
if not self._get.done():
135+
raise QueueEmpty
136+
get, self._get = self._get, new_get
147137

148-
def put(self, value, timeout=None):
149-
"""Puts an item into the queue.
138+
hole = get.result()
139+
if not hole.done():
140+
# Restore the unfinished hole.
141+
new_get.set_result(hole)
142+
raise QueueEmpty
150143

151-
Returns a future that resolves to None once the value has been
152-
accepted by the queue.
144+
value, new_hole = hole.result()
145+
new_get.set_result(new_hole)
146+
return value
153147

154-
The value is accepted immediately if there is room in the queue or
155-
maxsize was not specified.
148+
def get(self):
149+
"""Gets the next item from the queue.
156150
157-
:param timeout:
158-
If set, the future will resolve to a TimeoutError if a value is
159-
not accepted within the given time. The value for ``timeout`` may
160-
be anything accepted by ``IOLoop.add_timeout`` (a ``timedelta`` or
161-
an **absolute** time relative to ``IOLoop.time``).
151+
Returns a Future that resolves to the next item once it is available.
162152
"""
153+
io_loop = IOLoop.current()
154+
new_get = Future()
155+
156+
with self._lock:
157+
get, self._get = self._get, new_get
163158

164159
answer = Future()
165160

166-
# If there's a getter waiting, send it the result.
167-
while self._getters:
168-
future = self._getters.popleft()
169-
if future.running():
170-
future.set_result(value)
171-
answer.set_result(None)
172-
return answer
173-
174-
# We have room. Put the value into surplus.
175-
if self.maxsize < 1 or len(self._surplus) < self.maxsize:
176-
self._surplus.append(value)
177-
answer.set_result(None)
178-
return answer
161+
def _on_node(future):
162+
if future.exception(): # pragma: no cover (never happens)
163+
return answer.set_exc_info(future.exc_info())
179164

180-
# Wait until there is room.
181-
if timeout is not None:
182-
_add_timeout(timeout, answer)
183-
self._putters.append((value, answer))
184-
return answer
165+
value, new_hole = future.result()
166+
new_get.set_result(new_hole)
167+
answer.set_result(value)
185168

169+
def _on_get(future):
170+
if future.exception(): # pragma: no cover (never happens)
171+
return answer.set_exc_info(future.exc_info())
186172

187-
def _add_timeout(timeout, future):
188-
io_loop = IOLoop.current()
173+
hole = future.result()
174+
io_loop.add_future(hole, _on_node)
189175

190-
def on_timeout():
191-
if future.running():
192-
future.set_exception(TimeoutError("timed out"))
193-
194-
t = io_loop.add_timeout(timeout, on_timeout)
195-
future.add_done_callback(lambda _: io_loop.remove_timeout(t))
176+
io_loop.add_future(get, _on_get)
177+
return answer

tchannel/tornado/connection.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ class Reader(object):
645645
def __init__(self, io_stream, capacity=None):
646646
capacity = capacity or 64
647647

648-
self.queue = queues.Queue(capacity)
648+
self.queue = queues.Queue()
649649
self.filling = False
650650
self.io_stream = io_stream
651651

@@ -701,7 +701,7 @@ class Writer(object):
701701
def __init__(self, io_stream, capacity=None):
702702
capacity = capacity or 64
703703

704-
self.queue = queues.Queue(capacity)
704+
self.queue = queues.Queue()
705705
self.draining = False
706706
self.io_stream = io_stream
707707
# Tracks message IDs for this connection.

0 commit comments

Comments
 (0)