-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathDataLoader.swift
229 lines (188 loc) · 7.2 KB
/
DataLoader.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import Algorithms
import AsyncCollections
public enum DataLoaderValue<T: Sendable>: Sendable {
case success(T)
case failure(Error)
}
public typealias BatchLoadFunction<Key: Hashable & Sendable, Value: Sendable> =
@Sendable (_ keys: [Key]) async throws -> [DataLoaderValue<Value>]
private typealias LoaderQueue<Key: Hashable & Sendable, Value: Sendable> = [(
key: Key,
channel: Channel<Value, Error>
)]
/// DataLoader creates a public API for loading data from a particular
/// data back-end with unique keys such as the id column of a SQL table
/// or document name in a MongoDB database, given a batch loading function.
///
/// Each DataLoader instance contains a unique memoized cache. Use caution
/// when used in long-lived applications or those which serve many users
/// with different access permissions and consider creating a new instance
/// per data request.
public actor DataLoader<Key: Hashable & Sendable, Value: Sendable> {
private let batchLoadFunction: BatchLoadFunction<Key, Value>
private let options: DataLoaderOptions<Key, Value>
private var cache = [Key: Channel<Value, Error>]()
private var queue = LoaderQueue<Key, Value>()
private var dispatchScheduled = false
public init(
options: DataLoaderOptions<Key, Value> = DataLoaderOptions(),
batchLoadFunction: @escaping BatchLoadFunction<Key, Value>
) {
self.options = options
self.batchLoadFunction = batchLoadFunction
}
/// Loads a key, returning the value represented by that key.
public func load(key: Key) async throws -> Value {
let cacheKey = options.cacheKeyFunction?(key) ?? key
if options.cachingEnabled, let cached = cache[cacheKey] {
return try await cached.value
}
let channel = Channel<Value, Error>()
if options.batchingEnabled {
queue.append((key: key, channel: channel))
if let executionPeriod = options.executionPeriod, !dispatchScheduled {
Task.detached {
try await Task.sleep(nanoseconds: executionPeriod)
try await self.execute()
}
dispatchScheduled = true
}
} else {
Task.detached {
do {
let results = try await self.batchLoadFunction([key])
if results.isEmpty {
await channel
.fail(
DataLoaderError
.noValueForKey("Did not return value for key: \(key)")
)
} else {
let result = results[0]
switch result {
case let .success(value):
await channel.fulfill(value)
case let .failure(error):
await channel.fail(error)
}
}
} catch {
await channel.fail(error)
}
}
}
if options.cachingEnabled {
cache[cacheKey] = channel
}
return try await channel.value
}
/// Loads multiple keys, promising an array of values:
///
/// ```swift
/// async let aAndB = try myLoader.loadMany(keys: [ "a", "b" ])
/// ```
///
/// This is equivalent to the more verbose:
///
/// ```swift
/// async let aAndB = [
/// myLoader.load(key: "a"),
/// myLoader.load(key: "b")
/// ]
/// ```
/// or
/// ```swift
/// async let a = myLoader.load(key: "a")
/// async let b = myLoader.load(key: "b")
/// ```
public func loadMany(keys: [Key]) async throws -> [Value] {
guard !keys.isEmpty else {
return []
}
return try await keys.concurrentMap { try await self.load(key: $0) }
}
/// Clears the value at `key` from the cache, if it exists. Returns itself for
/// method chaining.
@discardableResult
public func clear(key: Key) -> DataLoader<Key, Value> {
let cacheKey = options.cacheKeyFunction?(key) ?? key
cache.removeValue(forKey: cacheKey)
return self
}
/// Clears the entire cache. To be used when some event results in unknown
/// invalidations across this particular `DataLoader`. Returns itself for
/// method chaining.
@discardableResult
public func clearAll() -> DataLoader<Key, Value> {
cache.removeAll()
return self
}
/// Adds the provied key and value to the cache. If the key already exists, no
/// change is made. Returns itself for method chaining.
@discardableResult
public func prime(key: Key, value: Value) async throws -> DataLoader<Key, Value> {
let cacheKey = options.cacheKeyFunction?(key) ?? key
if cache[cacheKey] == nil {
let channel = Channel<Value, Error>()
Task.detached {
await channel.fulfill(value)
}
cache[cacheKey] = channel
}
return self
}
public func execute() async throws {
// Take the current loader queue, replacing it with an empty queue.
let batch = queue
queue = []
if dispatchScheduled {
dispatchScheduled = false
}
guard !batch.isEmpty else {
return ()
}
// If a maxBatchSize was provided and the queue is longer, then segment the
// queue into multiple batches, otherwise treat the queue as a single batch.
if let maxBatchSize = options.maxBatchSize, maxBatchSize > 0, maxBatchSize < batch.count {
try await batch.chunks(ofCount: maxBatchSize).asyncForEach { slicedBatch in
try await self.executeBatch(batch: Array(slicedBatch))
}
} else {
try await executeBatch(batch: batch)
}
}
private func executeBatch(batch: LoaderQueue<Key, Value>) async throws {
let keys = batch.map { $0.key }
if keys.isEmpty {
return
}
// Step through the values, resolving or rejecting each Promise in the
// loaded queue.
do {
let values = try await batchLoadFunction(keys)
if values.count != keys.count {
throw DataLoaderError
.typeError(
"The function did not return an array of the same length as the array of keys. \nKeys count: \(keys.count)\nValues count: \(values.count)"
)
}
for entry in batch.enumerated() {
let result = values[entry.offset]
switch result {
case let .failure(error):
await entry.element.channel.fail(error)
case let .success(value):
await entry.element.channel.fulfill(value)
}
}
} catch {
await failedExecution(batch: batch, error: error)
}
}
private func failedExecution(batch: LoaderQueue<Key, Value>, error: Error) async {
for (key, channel) in batch {
_ = clear(key: key)
await channel.fail(error)
}
}
}