Skip to content

Commit cc72b39

Browse files
committedJul 6, 2020
Add some test coverage for TSFFutures
1 parent 902406e commit cc72b39

11 files changed

+930
-0
lines changed
 

‎Package.swift

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ let package = Package(
3232
"NIO", "SwiftToolsSupport-auto",
3333
]
3434
),
35+
.testTarget(
36+
name: "TSFFuturesTests",
37+
dependencies: [
38+
"TSFFutures",
39+
]
40+
),
3541
.target(
3642
name: "TSFUtility",
3743
dependencies: [

‎Sources/TSFFutures/Canceller.swift

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public final class LLBCanceller {
8383

8484
guard finalReason_ == nil else {
8585
// Already cancelled or abandoned.
86+
mutex_.unlock()
8687
return
8788
}
8889

‎Sources/TSFFutures/FutureDeduplicator.swift

+88
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,92 @@ public class LLBFutureDeduplicator<Key: Hashable, Value> {
148148

149149
return promise.futureResult
150150
}
151+
152+
@inlinable
153+
public func values(for keys: [Key], with resolver: ([Key]) -> LLBFuture<[Value]>) -> [LLBFuture<Value>] {
154+
let (rv, created, promises): ([LLBFuture<Value>], [Key], [LLBPromise<Value>]) = lock.withLock {
155+
let now = DispatchTime.now()
156+
157+
var rv = [LLBFuture<Value>]()
158+
var created = [Key]()
159+
var promises = [LLBPromise<Value>]()
160+
let maxResultCount = keys.count
161+
rv.reserveCapacity(maxResultCount)
162+
created.reserveCapacity(maxResultCount)
163+
promises.reserveCapacity(maxResultCount)
164+
165+
for key in keys {
166+
if let valueFuture = lockedCacheGet(key: key) {
167+
rv.append(valueFuture)
168+
continue
169+
}
170+
171+
if let (resultPromise, expires) = inFlightRequests[key] {
172+
if expires >= now {
173+
rv.append(resultPromise.futureResult)
174+
continue
175+
}
176+
}
177+
178+
// Stop deduplicating requests after a long-ish timeout,
179+
// just in case there is a particularly long CAS lag.
180+
let expires = now + partialResultExpiration
181+
182+
// Cache for future use.
183+
let promise = self.group.next().makePromise(of: Value.self)
184+
self.inFlightRequests[key] = (promise, expires)
185+
rv.append(promise.futureResult)
186+
created.append(key)
187+
promises.append(promise)
188+
}
189+
190+
return (rv, created, promises)
191+
}
192+
193+
// If the promise was created, fetch from the database.
194+
guard created.count > 0 else {
195+
return rv
196+
}
197+
198+
resolver(created).map { results in
199+
self.lock.withLockVoid {
200+
for idx in 0..<created.count {
201+
let key = created[idx]
202+
let promise = promises[idx]
203+
guard self.inFlightRequests[key]?.result.futureResult === promise.futureResult else {
204+
continue
205+
}
206+
// Resolved, done.
207+
self.inFlightRequests[key] = nil
208+
}
209+
for (key, promise) in zip(created, promises) {
210+
self.lockedCacheSet(key, promise.futureResult)
211+
}
212+
}
213+
for (promise, result) in zip(promises, results) {
214+
promise.succeed(result)
215+
}
216+
}.whenFailure { error in
217+
let now = DispatchTime.now()
218+
let expires: DispatchTimeInterval? = self.expirationInterval(error)
219+
self.lock.withLockVoid {
220+
for (key, promise) in zip(created, promises) {
221+
guard self.inFlightRequests[key]?.result.futureResult === promise.futureResult else {
222+
continue
223+
}
224+
if let interval = expires {
225+
self.inFlightRequests[key] = (promise, now + interval)
226+
} else {
227+
self.inFlightRequests[key] = nil
228+
}
229+
}
230+
}
231+
232+
for promise in promises {
233+
promise.fail(error)
234+
}
235+
}
236+
237+
return rv
238+
}
151239
}

‎Sources/TSFFutures/OrderManager.swift

+181
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// This source file is part of the Swift.org open source project
2+
//
3+
// Copyright (c) 2020 Apple Inc. and the Swift project authors
4+
// Licensed under Apache License v2.0 with Runtime Library Exception
5+
//
6+
// See http://swift.org/LICENSE.txt for license information
7+
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
8+
9+
import Dispatch
10+
11+
import NIO
12+
import NIOConcurrencyHelpers
13+
14+
15+
/// The `OrderManager` allows explicitly specify dependencies between
16+
/// various callbacks. This is necessary to avoid or induce race conditions
17+
/// in otherwise timing-dependent code, making such code deterministic.
18+
/// The semantics is as follows: the `OrderManager` invokes callbacks
19+
/// specified as arguments to the `order(_:_)` functions starting with 1.
20+
/// The callbacks of order `n` are guaranteed to run to completion before
21+
/// callbacks of order `n+1` are run. If there's a gap in the callbacks order
22+
/// sequence, the callbacks are suspended until the missing callback is
23+
/// registered, `reset()` is called, or global timeout occurs.
24+
/// In addition to that, `reset()` restarts the global timeout.
25+
///
26+
/// Example:
27+
///
28+
/// let manager = OrderManager(on: ...)
29+
/// manager.order(3, { print("3") })
30+
/// manager.order(1, { print("1") })
31+
/// manager.order(2, { print("2") })
32+
/// try manager.order(4).wait()
33+
///
34+
/// The following will be printed out:
35+
///
36+
/// 1
37+
/// 2
38+
/// 3
39+
///
40+
41+
public class LLBOrderManager {
42+
43+
// A safety timer, not to be exceeded.
44+
private let cancelTimer = DispatchSource.makeTimerSource()
45+
private let timeout: DispatchTimeInterval
46+
47+
private typealias WaitListElement = (order: Int, promise: LLBPromise<Void>, file: String, line: Int)
48+
private let lock = NIOConcurrencyHelpers.Lock()
49+
private var waitlist = [WaitListElement]()
50+
private var nextToRun = 1
51+
52+
private var eventLoop: EventLoop {
53+
lock.withLock {
54+
switch groupDesignator {
55+
case let .managedGroup(group):
56+
return group.next()
57+
case let .externallySuppliedGroup(group):
58+
return group.next()
59+
}
60+
}
61+
}
62+
63+
private enum GroupDesignator {
64+
case managedGroup(LLBFuturesDispatchGroup)
65+
case externallySuppliedGroup(LLBFuturesDispatchGroup)
66+
}
67+
private var groupDesignator: GroupDesignator
68+
69+
public enum Error: Swift.Error {
70+
case orderManagerReset(file: String, line: Int)
71+
}
72+
73+
public init(timeout: DispatchTimeInterval = .seconds(60)) {
74+
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
75+
self.groupDesignator = GroupDesignator.managedGroup(group)
76+
self.timeout = timeout
77+
restartInactivityTimer()
78+
cancelTimer.setEventHandler { [weak self] in
79+
guard let self = self else { return }
80+
_ = self.reset()
81+
self.cancelTimer.cancel()
82+
}
83+
cancelTimer.resume()
84+
}
85+
86+
public init(on loop: EventLoop, timeout: DispatchTimeInterval = .seconds(60)) {
87+
self.groupDesignator = GroupDesignator.externallySuppliedGroup(loop)
88+
self.timeout = timeout
89+
restartInactivityTimer()
90+
cancelTimer.setEventHandler { [weak self] in
91+
guard let self = self else { return }
92+
_ = self.reset()
93+
self.cancelTimer.cancel()
94+
}
95+
cancelTimer.resume()
96+
}
97+
98+
private func restartInactivityTimer() {
99+
cancelTimer.schedule(deadline: DispatchTime.now() + timeout, repeating: .never)
100+
101+
}
102+
103+
/// Run a specified callback in a particular order.
104+
@discardableResult
105+
public func order<T>(_ n: Int, file: String = #file, line: Int = #line, _ callback: @escaping () throws -> T) -> EventLoopFuture<T> {
106+
let promise = eventLoop.makePromise(of: Void.self)
107+
108+
lock.withLockVoid {
109+
waitlist.append((order: n, promise: promise, file: file, line: line))
110+
}
111+
112+
let future = promise.futureResult.flatMapThrowing {
113+
try callback()
114+
}
115+
116+
future.whenComplete { _ in
117+
self.lock.withLockVoid {
118+
if n == self.nextToRun {
119+
self.nextToRun += 1
120+
}
121+
}
122+
self.unblockWaiters()
123+
}
124+
125+
unblockWaiters()
126+
return future
127+
}
128+
129+
@discardableResult
130+
public func order(_ n: Int, file: String = #file, line: Int = #line) -> EventLoopFuture<Void> {
131+
return order(n, file: file, line: line, {})
132+
}
133+
134+
private func unblockWaiters() {
135+
let wakeup: [EventLoopPromise<Void>] = lock.withLock {
136+
let wakeupPromises = waitlist
137+
.filter({$0.order <= nextToRun})
138+
.map({$0.promise})
139+
waitlist = waitlist.filter({$0.order > nextToRun})
140+
return wakeupPromises
141+
}
142+
wakeup.forEach { $0.succeed(()) }
143+
}
144+
145+
/// Fail all ordered callbacks. Not calling the callback functions
146+
/// specified as argument to order(_:_), but failing the outcome.
147+
public func reset(file: String = #file, line: Int = #line) -> EventLoopFuture<Void> {
148+
restartInactivityTimer()
149+
let lock = self.lock
150+
let toCancel: [WaitListElement] = lock.withLock {
151+
let cancelList = waitlist
152+
waitlist = []
153+
nextToRun = Int.max
154+
return cancelList
155+
}
156+
let error = Error.orderManagerReset(file: file, line: line)
157+
let futures: [EventLoopFuture<Void>] = toCancel.sorted(by: {$0.order < $1.order}).map {
158+
$0.promise.fail(error)
159+
return $0.promise.futureResult.flatMapErrorThrowing { _ in () }
160+
}
161+
162+
return EventLoopFuture.whenAllSucceed(futures, on: eventLoop).map { [weak self] _ in
163+
guard let self = self else { return }
164+
lock.withLockVoid {
165+
assert(self.waitlist.isEmpty)
166+
self.nextToRun = 1
167+
}
168+
}
169+
}
170+
171+
deinit {
172+
let designator = groupDesignator
173+
reset().whenSuccess {
174+
guard case let .managedGroup(group) = designator else { return }
175+
let q = DispatchQueue(label: "tsf.OrderManager")
176+
q.async { try! group.syncShutdownGracefully() }
177+
}
178+
cancelTimer.setEventHandler { }
179+
cancelTimer.cancel()
180+
}
181+
}

0 commit comments

Comments
 (0)
Please sign in to comment.