Skip to content

Commit 9ab5da7

Browse files
committed
cancel epoll before closing
1 parent abeff5a commit 9ab5da7

File tree

2 files changed

+38
-20
lines changed

2 files changed

+38
-20
lines changed

FlyingSocks/Sources/SocketPool+ePoll.swift

+38-6
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,14 @@ public extension AsyncSocketPool where Self == SocketPool<ePoll> {
4545
public struct ePoll: EventQueue {
4646

4747
private(set) var file: Socket.FileDescriptor
48+
private(set) var canary: Socket.FileDescriptor
4849
private(set) var existing: [Socket.FileDescriptor: Socket.Events]
4950
private let eventsLimit: Int
5051
private let logger: any Logging
5152

5253
public init(maxEvents limit: Int, logger: some Logging = .disabled) {
5354
self.file = .invalid
55+
self.canary = .invalid
5456
self.existing = [:]
5557
self.eventsLimit = limit
5658
self.logger = logger
@@ -59,19 +61,33 @@ public struct ePoll: EventQueue {
5961
public mutating func open() throws {
6062
existing = [:]
6163
self.file = try Self.makeQueue()
64+
self.canary = try Self.makeEventTrigger()
65+
66+
var event = CSystemLinux.epoll_event()
67+
let options: EPOLLEvents = [EPOLLEvents.edgeTriggered, EPOLLEvents.read]
68+
event.events = options.rawValue
69+
event.data.fd = canary.rawValue
70+
guard epoll_ctl(file.rawValue, EPOLL_CTL_ADD, canary.rawValue, &event) != -1 else {
71+
throw SocketError.makeFailed("epoll_ctl EPOLL_CTL_ADD")
72+
}
6273
}
6374

6475
public mutating func stop() throws {
6576
existing = [:]
66-
guard file != .invalid else {
77+
guard canary != .invalid else {
6778
throw SocketError.disconnected
6879
}
69-
defer { file = .invalid }
70-
try Self.closeQueue(file: file)
80+
eventfd_write(canary.rawValue, 1);
81+
canary = .invalid
82+
7183
}
7284

7385
public mutating func close() throws {
74-
// should really close file here
86+
guard file != .invalid else {
87+
throw SocketError.disconnected
88+
}
89+
defer { file = .invalid }
90+
try Self.closeQueue(file: file)
7591
}
7692

7793
public mutating func addEvents(_ events: Socket.Events, for socket: Socket.FileDescriptor) throws {
@@ -117,22 +133,29 @@ public struct ePoll: EventQueue {
117133
}
118134

119135
public func getNotifications() throws -> [EventNotification] {
136+
guard canary != .invalid else {
137+
throw SocketError.disconnected
138+
}
120139
var events = Array(repeating: epoll_event(), count: eventsLimit)
121140
let status = CSystemLinux.epoll_wait(file.rawValue, &events, Int32(eventsLimit), -1)
122141
guard status > 0 else {
123142
throw SocketError.makeFailed("epoll wait")
124143
}
125144

126-
return events
145+
return try events
127146
.prefix(Int(status))
128147
.map(makeNotification)
129148
}
130149

131-
func makeNotification(from event: epoll_event) -> EventNotification {
150+
func makeNotification(from event: epoll_event) throws -> EventNotification {
132151
var notification = EventNotification.make(from: event)
133152
if notification.events.isEmpty, let existing = existing[notification.file] {
134153
notification.events = existing
135154
}
155+
156+
if event.data.fd == self.canary.rawValue {
157+
throw SocketError.disconnected
158+
}
136159
return notification
137160
}
138161

@@ -144,6 +167,14 @@ public struct ePoll: EventQueue {
144167
return file
145168
}
146169

170+
static func makeEventTrigger(file: Int32 = CSystemLinux.eventfd(0, Int32(EFD_NONBLOCK))) throws -> Socket.FileDescriptor {
171+
let file = Socket.FileDescriptor(rawValue: file)
172+
guard file != .invalid else {
173+
throw SocketError.makeFailed("eventfd")
174+
}
175+
return file
176+
}
177+
147178
static func closeQueue(file: Socket.FileDescriptor) throws {
148179
guard file != .invalid else { return }
149180
guard Socket.close(file.rawValue) >= 0 else {
@@ -194,6 +225,7 @@ private struct EPOLLEvents: OptionSet, Hashable {
194225
static let rdhup = EPOLLEvents(rawValue: EPOLLRDHUP.rawValue)
195226
static let err = EPOLLEvents(rawValue: EPOLLERR.rawValue)
196227
static let pri = EPOLLEvents(rawValue: EPOLLPRI.rawValue)
228+
static let edgeTriggered = EPOLLEvents(rawValue: EPOLLET.rawValue)
197229
}
198230

199231
private extension Socket.Events {

FlyingSocks/Sources/SocketPool.swift

-14
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ public final actor SocketPool<Queue: EventQueue>: AsyncSocketPool {
114114
}
115115
}
116116

117-
#if canImport(Darwin)
118117
private func getNotifications() async throws -> [EventNotification] {
119118
try Task.checkCancellation()
120119
return try await withIdentifiableThrowingContinuation(isolation: self) { continuation in
@@ -128,19 +127,6 @@ public final actor SocketPool<Queue: EventQueue>: AsyncSocketPool {
128127
Task { await self.stopQueue() }
129128
}
130129
}
131-
#else
132-
// EPOLL does not throw error when queue FD is closed while currently waiting for events.
133-
private func getNotifications() async throws -> [EventNotification] {
134-
let continuation = CancellingContinuation<[EventNotification], any Swift.Error>()
135-
dispatchQueue.async { [queue] in
136-
let result = Result {
137-
try queue.getNotifications()
138-
}
139-
continuation.resume(with: result)
140-
}
141-
return try await continuation.value
142-
}
143-
#endif
144130

145131
private func stopQueue() {
146132
try? queue.stop()

0 commit comments

Comments
 (0)