Skip to content

Commit c40025a

Browse files
Adds concurrent access support to DataLoader
1 parent e6088e7 commit c40025a

File tree

3 files changed

+80
-34
lines changed

3 files changed

+80
-34
lines changed

Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ let package = Package(
1212
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
1313
],
1414
targets: [
15-
.target(name: "DataLoader", dependencies: ["NIO"]),
15+
.target(name: "DataLoader", dependencies: ["NIO", "NIOConcurrencyHelpers"]),
1616
.testTarget(name: "DataLoaderTests", dependencies: ["DataLoader"]),
1717
],
1818
swiftLanguageVersions: [.v5]

Sources/DataLoader/DataLoader.swift

+46-33
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import NIO
2+
import NIOConcurrencyHelpers
23

34
public enum DataLoaderFutureValue<T> {
45
case success(T)
@@ -23,6 +24,8 @@ final public class DataLoader<Key: Hashable, Value> {
2324

2425
private var cache = [Key: EventLoopFuture<Value>]()
2526
private var queue = LoaderQueue<Key, Value>()
27+
28+
private let lock = Lock()
2629

2730
public init(options: DataLoaderOptions<Key, Value> = DataLoaderOptions(), batchLoadFunction: @escaping BatchLoadFunction<Key, Value>) {
2831
self.options = options
@@ -32,36 +35,38 @@ final public class DataLoader<Key: Hashable, Value> {
3235
/// Loads a key, returning an `EventLoopFuture` for the value represented by that key.
3336
public func load(key: Key, on eventLoopGroup: EventLoopGroup) throws -> EventLoopFuture<Value> {
3437
let cacheKey = options.cacheKeyFunction?(key) ?? key
38+
39+
return try lock.withLock {
40+
if options.cachingEnabled, let cachedFuture = cache[cacheKey] {
41+
return cachedFuture
42+
}
3543

36-
if options.cachingEnabled, let cachedFuture = cache[cacheKey] {
37-
return cachedFuture
38-
}
39-
40-
let promise: EventLoopPromise<Value> = eventLoopGroup.next().makePromise()
41-
42-
if options.batchingEnabled {
43-
queue.append((key: key, promise: promise))
44-
} else {
45-
_ = try batchLoadFunction([key]).map { results in
46-
if results.isEmpty {
47-
promise.fail(DataLoaderError.noValueForKey("Did not return value for key: \(key)"))
48-
} else {
49-
let result = results[0]
50-
switch result {
51-
case .success(let value): promise.succeed(value)
52-
case .failure(let error): promise.fail(error)
44+
let promise: EventLoopPromise<Value> = eventLoopGroup.next().makePromise()
45+
46+
if options.batchingEnabled {
47+
queue.append((key: key, promise: promise))
48+
} else {
49+
_ = try batchLoadFunction([key]).map { results in
50+
if results.isEmpty {
51+
promise.fail(DataLoaderError.noValueForKey("Did not return value for key: \(key)"))
52+
} else {
53+
let result = results[0]
54+
switch result {
55+
case .success(let value): promise.succeed(value)
56+
case .failure(let error): promise.fail(error)
57+
}
5358
}
5459
}
5560
}
56-
}
5761

58-
let future = promise.futureResult
62+
let future = promise.futureResult
5963

60-
if options.cachingEnabled {
61-
cache[cacheKey] = future
62-
}
64+
if options.cachingEnabled {
65+
cache[cacheKey] = future
66+
}
6367

64-
return future
68+
return future
69+
}
6570
}
6671

6772
/// Loads multiple keys, promising an array of values:
@@ -107,7 +112,9 @@ final public class DataLoader<Key: Hashable, Value> {
107112
@discardableResult
108113
func clear(key: Key) -> DataLoader<Key, Value> {
109114
let cacheKey = options.cacheKeyFunction?(key) ?? key
110-
cache.removeValue(forKey: cacheKey)
115+
lock.withLockVoid {
116+
cache.removeValue(forKey: cacheKey)
117+
}
111118
return self
112119
}
113120

@@ -116,7 +123,9 @@ final public class DataLoader<Key: Hashable, Value> {
116123
/// method chaining.
117124
@discardableResult
118125
func clearAll() -> DataLoader<Key, Value> {
119-
cache.removeAll()
126+
lock.withLockVoid {
127+
cache.removeAll()
128+
}
120129
return self
121130
}
122131

@@ -125,12 +134,14 @@ final public class DataLoader<Key: Hashable, Value> {
125134
@discardableResult
126135
func prime(key: Key, value: Value, on eventLoop: EventLoopGroup) -> DataLoader<Key, Value> {
127136
let cacheKey = options.cacheKeyFunction?(key) ?? key
137+
138+
lock.withLockVoid {
139+
if cache[cacheKey] == nil {
140+
let promise: EventLoopPromise<Value> = eventLoop.next().makePromise()
141+
promise.succeed(value)
128142

129-
if cache[cacheKey] == nil {
130-
let promise: EventLoopPromise<Value> = eventLoop.next().makePromise()
131-
promise.succeed(value)
132-
133-
cache[cacheKey] = promise.futureResult
143+
cache[cacheKey] = promise.futureResult
144+
}
134145
}
135146

136147
return self
@@ -140,9 +151,11 @@ final public class DataLoader<Key: Hashable, Value> {
140151
///
141152
/// The client must run this manually to compete the `EventLoopFutures` of the keys.
142153
public func execute() throws {
143-
// Take the current loader queue, replacing it with an empty queue.
144-
let batch = self.queue
145-
self.queue = []
154+
var batch = LoaderQueue<Key, Value>()
155+
lock.withLockVoid {
156+
batch = self.queue
157+
self.queue = []
158+
}
146159

147160
// If a maxBatchSize was provided and the queue is longer, then segment the
148161
// queue into multiple batches, otherwise treat the queue as a single batch.

Tests/DataLoaderTests/DataLoaderTests.swift

+33
Original file line numberDiff line numberDiff line change
@@ -361,4 +361,37 @@ final class DataLoaderTests: XCTestCase {
361361

362362
XCTAssertTrue(loadCalls == [["B"]])
363363
}
364+
365+
// Caches repeated requests, even if initiated asyncronously
366+
func testCacheConcurrency() throws {
367+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
368+
defer {
369+
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
370+
}
371+
372+
let identityLoader = DataLoader<String, String>(options: DataLoaderOptions()) { keys in
373+
let results = keys.map { DataLoaderFutureValue.success($0) }
374+
375+
return eventLoopGroup.next().makeSucceededFuture(results)
376+
}
377+
378+
// Populate values from two different dispatch queues, running asynchronously
379+
var value1: EventLoopFuture<String> = eventLoopGroup.next().makeSucceededFuture("")
380+
var value2: EventLoopFuture<String> = eventLoopGroup.next().makeSucceededFuture("")
381+
DispatchQueue.init(label: "").async {
382+
value1 = try! identityLoader.load(key: "A", on: eventLoopGroup)
383+
}
384+
DispatchQueue.init(label: "").async {
385+
value2 = try! identityLoader.load(key: "A", on: eventLoopGroup)
386+
}
387+
388+
// Sleep for a few ms ensure that value1 & value2 are populated before continuing
389+
usleep(1000)
390+
391+
XCTAssertNoThrow(try identityLoader.execute())
392+
393+
// Test that the futures themselves are equal (not just the value).
394+
XCTAssertEqual(value1, value2)
395+
}
396+
364397
}

0 commit comments

Comments
 (0)