forked from aws-amplify/aws-appsync-apollo-extensions-swift
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAppSyncWebSocketClient.swift
289 lines (248 loc) · 10.8 KB
/
AppSyncWebSocketClient.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
import Apollo
import ApolloAPI
import ApolloWebSocket
import Combine
import Foundation
public class AppSyncWebSocketClient: NSObject, ApolloWebSocket.WebSocketClient, URLSessionDelegate {
private static let jsonEncoder = JSONEncoder()
// MARK: - ApolloWebSocket.WebSocketClient
public var request: URLRequest
public var delegate: ApolloWebSocket.WebSocketClientDelegate?
public var callbackQueue: DispatchQueue
// MARK: - Public
public var publisher: AnyPublisher<AppSyncWebSocketEvent, Never> {
return subject.eraseToAnyPublisher()
}
// MARK: - Internal
private let taskQueue: TaskQueue<Void>
/// The underlying URLSessionWebSocketTask
private var connection: URLSessionWebSocketTask? {
willSet {
connection?.cancel(with: .goingAway, reason: nil)
}
}
private let heartBeatsMonitor = PassthroughSubject<Void, Never>()
/// Internal wriable WebSocketEvent data stream
let subject = PassthroughSubject<AppSyncWebSocketEvent, Never>()
var cancellable: AnyCancellable?
var heartBeatMonitorCancellable: AnyCancellable?
public var isConnected: Bool {
connection?.state == .running
}
/// Interceptor for appending additional info before makeing the connection
private var authorizer: AppSyncAuthorizer
public convenience init(
endpointURL: URL,
authorizer: AppSyncAuthorizer,
callbackQueue: DispatchQueue = .main
) {
self.init(
endpointURL: endpointURL,
delegate: nil,
callbackQueue: callbackQueue,
authorizer: authorizer
)
}
init(
endpointURL: URL,
delegate: ApolloWebSocket.WebSocketClientDelegate?,
callbackQueue: DispatchQueue,
authorizer: AppSyncAuthorizer
) {
self.request = URLRequest(url: appSyncRealTimeEndpoint(endpointURL))
self.delegate = delegate
self.callbackQueue = callbackQueue
self.authorizer = authorizer
self.taskQueue = TaskQueue()
}
public func connect() {
AppSyncApolloLogger.debug("Calling Connect")
guard connection?.state != .running else {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket is already in connecting state")
return
}
subscribeToAppSyncResponse()
Task {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] Creating new connection and starting read")
self.connection = try await createWebSocketConnection()
// Perform reading from a WebSocket in a separate task recursively to avoid blocking the execution.
Task { await self.startReadMessage() }
self.connection?.resume()
}
}
public func disconnect(forceTimeout: TimeInterval?) {
AppSyncApolloLogger.debug("Calling Disconnect")
heartBeatMonitorCancellable?.cancel()
guard connection?.state == .running else {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] client should be in connected state to trigger disconnect")
return
}
connection?.cancel(with: .goingAway, reason: nil)
}
public func write(ping: Data, completion: (() -> Void)?) {
AppSyncApolloLogger.debug("Not called, not implemented.")
}
public func write(string: String) {
taskQueue.async { [weak self] in
guard let self else { return }
guard self.isConnected else {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] Attempting to write to a webSocket haven't been connected.")
return
}
guard let startRequest = AppSyncRealTimeStartRequest(from: string) else {
try await self.connection?.send(.string(string))
return
}
var request = self.request
request.httpBody = Data(startRequest.data.utf8)
var headers = try await authorizer.getWebSocketSubscriptionPayload(request: request)
headers.updateValue(await PackageInfo.userAgent, forKey: "User-Agent")
let interceptedEvent = AppSyncRealTimeStartRequest(
id: startRequest.id,
data: startRequest.data,
auth: headers
)
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] Sending subscription message: \(startRequest.data)")
guard let encodedjsonData = try? Self.jsonEncoder.encode(interceptedEvent),
let jsonString = String(data: encodedjsonData, encoding: .utf8)
else {
return
}
try await self.connection?.send(.string(jsonString))
}
}
// MARK: - Deinit
deinit {
self.subject.send(completion: .finished)
self.cancellable?.cancel()
self.heartBeatMonitorCancellable?.cancel()
}
// MARK: - Connect Internals
private func createWebSocketConnection() async throws -> URLSessionWebSocketTask {
guard let url = request.url else {
fatalError("""
Empty endpoint. This should not happen.
Please take a look at https://github.com/aws-amplify/aws-appsync-apollo-extensions-swift/issues
to see if there are any existing issues that match your scenario, and file an issue with
the details of the bug if there isn't.
""")
}
request.setValue("graphql-ws", forHTTPHeaderField: "Sec-WebSocket-Protocol")
request.setValue(appSyncApiEndpoint(url).host, forHTTPHeaderField: "host")
let authHeaders = try await authorizer.getWebsocketConnectionHeaders(endpoint: url)
for authHeader in authHeaders {
request.setValue(authHeader.value, forHTTPHeaderField: authHeader.key)
}
let urlSession = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
return urlSession.webSocketTask(with: request)
}
/**
Recusively read WebSocket data frames and publish to data stream.
*/
private func startReadMessage() async {
guard let connection = connection else {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket connection doesn't exist")
return
}
if connection.state == .canceling || connection.state == .completed {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket connection state is \(connection.state). Failed to read websocket message")
return
}
do {
let message = try await connection.receive()
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket received message: \(String(describing: message))")
switch message {
case .data(let data):
subject.send(.data(data))
case .string(let string):
subject.send(.string(string))
@unknown default:
break
}
} catch {
if connection.state == .running {
subject.send(.error(error))
} else {
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] read message failed with connection state \(connection.state), error \(error)")
}
}
await startReadMessage()
}
private func subscribeToAppSyncResponse() {
self.cancellable = subject
.handleEvents(receiveOutput: { [weak self] event in
self?.onReceiveWebSocketEvent(event)
})
.sink { [weak self] event in
switch event {
case .string(let string):
guard let data = string.data(using: .utf8),
let response = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
let type = response["type"] as? String
else {
break
}
switch type {
case "connection_ack":
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] connection ack, starting heart beat monitoring...")
if let payload = response["payload"] as? [String: Any] {
self?.monitorHeartBeat(payload)
}
case "ka":
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] keep alive")
self?.heartBeatsMonitor.send(())
default: break
}
default: break
}
}
}
private func onReceiveWebSocketEvent(_ event: AppSyncWebSocketEvent) {
switch event {
case .connected:
callbackQueue.async { [weak self] in
guard let self = self else { return }
self.delegate?.websocketDidConnect(socket: self)
}
case .data(let data):
callbackQueue.async { [weak self] in
guard let self = self else { return }
self.delegate?.websocketDidReceiveData(socket: self, data: data)
}
case .string(let string):
callbackQueue.async { [weak self] in
guard let self = self else { return }
self.delegate?.websocketDidReceiveMessage(socket: self, text: string)
}
case .disconnected(let closeCode, let string):
callbackQueue.async { [weak self] in
guard let self = self else { return }
AppSyncApolloLogger.debug("Disconnected closeCode \(closeCode), string \(String(describing: string))")
self.delegate?.websocketDidDisconnect(socket: self, error: nil)
}
case .error(let error):
callbackQueue.async { [weak self] in
guard let self = self else { return }
self.delegate?.websocketDidDisconnect(socket: self, error: error)
}
}
}
private func monitorHeartBeat(_ connectionAck: [String: Any]) {
let connectionTimeOutMs = (connectionAck["connectionTimeoutMs"] as? Int) ?? 300000
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] start monitoring heart beat with interval \(String(describing: connectionTimeOutMs))")
self.heartBeatMonitorCancellable = heartBeatsMonitor.eraseToAnyPublisher()
.debounce(for: .milliseconds(connectionTimeOutMs), scheduler: DispatchQueue.global(qos: .userInitiated))
.first()
.sink { [weak self] _ in
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] Keep alive timed out, disconnecting...")
self?.disconnect(forceTimeout: nil)
}
self.heartBeatsMonitor.send(())
}
}