Skip to content

Commit 5641966

Browse files
authored
Add PoolStateMachine.RequestQueue (vapor#424)
1 parent 8babbcf commit 5641966

10 files changed

+383
-4
lines changed

Sources/ConnectionPoolModule/ConnectionRequest.swift

+4-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
44

55
public var id: ID
66

7-
private var continuation: CheckedContinuation<Connection, ConnectionPoolError>
7+
@usableFromInline
8+
private(set) var continuation: CheckedContinuation<Connection, any Error>
89

10+
@inlinable
911
init(
1012
id: Int,
11-
continuation: CheckedContinuation<Connection, ConnectionPoolError>
13+
continuation: CheckedContinuation<Connection, any Error>
1214
) {
1315
self.id = id
1416
self.continuation = continuation

Sources/ConnectionPoolModule/OneElementFastSequence.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct OneElementFastSequence<Element>: Sequence {
1717
}
1818

1919
@inlinable
20-
init(_ element: Element) {
20+
init(element: Element) {
2121
self.base = .one(element, reserveCapacity: 1)
2222
}
2323

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import DequeModule
2+
3+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
4+
extension PoolStateMachine {
5+
6+
/// A request queue, which can enqueue requests in O(1), dequeue requests in O(1) and even cancel requests in O(1).
7+
///
8+
/// While enqueueing and dequeueing on O(1) is trivial, cancellation is hard, as it normally requires a removal within the
9+
/// underlying Deque. However thanks to having an additional `requests` dictionary, we can remove the cancelled
10+
/// request from the dictionary and keep it inside the queue. Whenever we pop a request from the deque, we validate
11+
/// that it hasn't been cancelled in the meantime by checking if the popped request is still in the `requests` dictionary.
12+
@usableFromInline
13+
struct RequestQueue {
14+
@usableFromInline
15+
private(set) var queue: Deque<RequestID>
16+
17+
@usableFromInline
18+
private(set) var requests: [RequestID: Request]
19+
20+
@inlinable
21+
var count: Int {
22+
self.requests.count
23+
}
24+
25+
@inlinable
26+
var isEmpty: Bool {
27+
self.count == 0
28+
}
29+
30+
@usableFromInline
31+
init() {
32+
self.queue = .init(minimumCapacity: 256)
33+
self.requests = .init(minimumCapacity: 256)
34+
}
35+
36+
@inlinable
37+
mutating func queue(_ request: Request) {
38+
self.requests[request.id] = request
39+
self.queue.append(request.id)
40+
}
41+
42+
@inlinable
43+
mutating func pop(max: UInt16) -> OneElementFastSequence<Request> {
44+
var result = OneElementFastSequence<Request>()
45+
result.reserveCapacity(Int(max))
46+
var popped = 0
47+
while let requestID = self.queue.popFirst(), popped < max {
48+
if let requestIndex = self.requests.index(forKey: requestID) {
49+
popped += 1
50+
result.append(self.requests.remove(at: requestIndex).value)
51+
}
52+
}
53+
54+
assert(result.count <= max)
55+
return result
56+
}
57+
58+
@inlinable
59+
mutating func remove(_ requestID: RequestID) -> Request? {
60+
self.requests.removeValue(forKey: requestID)
61+
}
62+
63+
@inlinable
64+
mutating func removeAll() -> OneElementFastSequence<Request> {
65+
let result = OneElementFastSequence(self.requests.values)
66+
self.requests.removeAll()
67+
self.queue.removeAll()
68+
return result
69+
}
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#if canImport(Darwin)
2+
import Darwin
3+
#else
4+
import Glibc
5+
#endif
6+
7+
@usableFromInline
8+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
9+
struct PoolConfiguration {
10+
/// The minimum number of connections to preserve in the pool.
11+
///
12+
/// If the pool is mostly idle and the remote servers closes idle connections,
13+
/// the `ConnectionPool` will initiate new outbound connections proactively
14+
/// to avoid the number of available connections dropping below this number.
15+
@usableFromInline
16+
var minimumConnectionCount: Int = 0
17+
18+
/// The maximum number of connections to for this pool, to be preserved.
19+
@usableFromInline
20+
var maximumConnectionSoftLimit: Int = 10
21+
22+
@usableFromInline
23+
var maximumConnectionHardLimit: Int = 10
24+
25+
@usableFromInline
26+
var keepAliveDuration: Duration?
27+
28+
@usableFromInline
29+
var idleTimeoutDuration: Duration = .seconds(30)
30+
}
31+
32+
@usableFromInline
33+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
34+
struct PoolStateMachine<
35+
Connection: PooledConnection,
36+
ConnectionIDGenerator: ConnectionIDGeneratorProtocol,
37+
ConnectionID: Hashable & Sendable,
38+
Request: ConnectionRequestProtocol,
39+
RequestID,
40+
TimerCancellationToken
41+
> where Connection.ID == ConnectionID, ConnectionIDGenerator.ID == ConnectionID, RequestID == Request.ID {
42+
43+
@usableFromInline
44+
struct Timer: Hashable, Sendable {
45+
@usableFromInline
46+
enum Usecase: Sendable {
47+
case backoff
48+
case idleTimeout
49+
case keepAlive
50+
}
51+
52+
@usableFromInline
53+
var connectionID: ConnectionID
54+
55+
@usableFromInline
56+
var timerID: Int
57+
58+
@usableFromInline
59+
var duration: Duration
60+
61+
@usableFromInline
62+
var usecase: Usecase
63+
64+
@inlinable
65+
init(connectionID: ConnectionID, timerID: Int, duration: Duration, usecase: Usecase) {
66+
self.connectionID = connectionID
67+
self.timerID = timerID
68+
self.duration = duration
69+
self.usecase = usecase
70+
}
71+
}
72+
73+
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
@testable import _ConnectionPoolModule
2+
import XCTest
3+
4+
final class ConnectionRequestTests: XCTestCase {
5+
6+
func testHappyPath() async throws {
7+
let mockConnection = MockConnection(id: 1)
8+
let connection = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<MockConnection, any Error>) in
9+
let request = ConnectionRequest(id: 42, continuation: continuation)
10+
XCTAssertEqual(request.id, 42)
11+
continuation.resume(with: .success(mockConnection))
12+
}
13+
14+
XCTAssert(connection === mockConnection)
15+
}
16+
17+
func testSadPath() async throws {
18+
do {
19+
_ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<MockConnection, any Error>) in
20+
continuation.resume(with: .failure(ConnectionPoolError.requestCancelled))
21+
}
22+
XCTFail("This point should not be reached")
23+
} catch {
24+
XCTAssertEqual(error as? ConnectionPoolError, .requestCancelled)
25+
}
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import _ConnectionPoolModule
2+
3+
final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
4+
typealias Connection = MockConnection
5+
6+
struct ID: Hashable {
7+
var objectID: ObjectIdentifier
8+
9+
init(_ request: MockRequest) {
10+
self.objectID = ObjectIdentifier(request)
11+
}
12+
}
13+
14+
var id: ID { ID(self) }
15+
16+
17+
static func ==(lhs: MockRequest, rhs: MockRequest) -> Bool {
18+
lhs.id == rhs.id
19+
}
20+
21+
func hash(into hasher: inout Hasher) {
22+
hasher.combine(self.id)
23+
}
24+
25+
func complete(with: Result<Connection, ConnectionPoolError>) {
26+
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
@testable import _ConnectionPoolModule
2+
3+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
4+
struct MockTimerCancellationToken: Hashable, Sendable {
5+
var connectionID: MockConnection.ID
6+
var timerID: Int
7+
var duration: Duration
8+
var usecase: TestPoolStateMachine.Timer.Usecase
9+
10+
init(_ timer: TestPoolStateMachine.Timer) {
11+
self.connectionID = timer.connectionID
12+
self.timerID = timer.timerID
13+
self.duration = timer.duration
14+
self.usecase = timer.usecase
15+
}
16+
}

Tests/ConnectionPoolModuleTests/OneElementFastSequence.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ final class OneElementFastSequenceTests: XCTestCase {
3535
}
3636
XCTAssertEqual(array.capacity, 8)
3737

38-
var oneElemSequence = OneElementFastSequence<Int>(1)
38+
var oneElemSequence = OneElementFastSequence<Int>(element: 1)
3939
oneElemSequence.reserveCapacity(8)
4040
oneElemSequence.append(2)
4141
guard case .n(let array) = oneElemSequence.base else {

0 commit comments

Comments
 (0)