-
Notifications
You must be signed in to change notification settings - Fork 14
To support linux better, replace operation queue with dispatch queue in BatchingFutureOperationQueue #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
OmSaran
wants to merge
7
commits into
apple:main
Choose a base branch
from
OmSaran:batch-dispatchq
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
To support linux better, replace operation queue with dispatch queue in BatchingFutureOperationQueue #31
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
15ca20b
To support linux better, replace operation queue with dispatch queue …
OmSaran 17d4826
Remove OperationQueue implementation altogether and use equivalent di…
OmSaran 7ad076c
Introduce concurreny limiter abstraction, a trimmed down
OmSaran 9fae9d4
Deprecate execute interface that takes a future returning body
OmSaran e577a58
Improve maxOpCount check
OmSaran 6767fa5
Merge branch 'main' into batch-dispatchq
vlm 2c3ca6b
Merge branch 'main' into batch-dispatchq
vlm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Swift.org open source project | ||
// | ||
// Copyright (c) 2022 Apple Inc. and the Swift project authors | ||
// Licensed under Apache License v2.0 with Runtime Library Exception | ||
// | ||
// See https://swift.org/LICENSE.txt for license information | ||
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import NIO | ||
import NIOConcurrencyHelpers | ||
|
||
internal final class ConcurrencyLimiter { | ||
public var maximumConcurrency: Int { | ||
get { lock.withLock { maximumConcurrency_ } } | ||
set { | ||
var waiters: [Waiter] = [] | ||
lock.withLockVoid { | ||
let differenceInCapacity = (newValue - maximumConcurrency_) | ||
unitsLeft += differenceInCapacity | ||
maximumConcurrency_ = newValue | ||
waiters = tryFulfillSomeLocked() | ||
} | ||
waiters.fulfillWaiters() | ||
} | ||
} | ||
public var sharesInUse: Int { lock.withLock { maximumConcurrency_ - unitsLeft } } | ||
|
||
private var unitsLeft: Int // protected by `self.lock` | ||
private var waiters: CircularBuffer<Waiter> = [] // protected by `self.lock` | ||
private let lock = Lock() | ||
private var maximumConcurrency_: Int // protected by `self.lock` | ||
|
||
public init(maximumConcurrency: Int) { | ||
precondition(maximumConcurrency >= 0) | ||
|
||
self.maximumConcurrency_ = maximumConcurrency | ||
self.unitsLeft = maximumConcurrency | ||
} | ||
|
||
/// Reserves 1 unit of concurrency, executes body after which it restores the 1 unit. | ||
public func withReplenishableLimit<T>( | ||
eventLoop: EventLoop, | ||
_ body: @escaping (EventLoop) -> EventLoopFuture<T> | ||
) -> EventLoopFuture<T> { | ||
return self.withdraw(eventLoop: eventLoop).flatMap { lease in | ||
body(eventLoop).always { _ in | ||
self.replenish(lease) | ||
} | ||
} | ||
} | ||
|
||
private func tryFulfillSomeLocked() -> [Waiter] { | ||
var toSucceed: [Waiter] = [] | ||
let unitsLeftAtStart = self.unitsLeft | ||
|
||
while !self.waiters.isEmpty, self.unitsLeft >= 1 { | ||
let waiter = self.waiters.removeFirst() | ||
|
||
self.unitsLeft -= 1 | ||
assert(self.unitsLeft >= 0) | ||
toSucceed.append(waiter) | ||
} | ||
|
||
assert(unitsLeftAtStart - toSucceed.count == self.unitsLeft) | ||
return toSucceed | ||
} | ||
|
||
private func replenish(_ lease: Lease) { | ||
self.lock.withLock { () -> [Waiter] in | ||
self.unitsLeft += 1 | ||
assert(self.unitsLeft <= self.maximumConcurrency_) | ||
return self.tryFulfillSomeLocked() | ||
}.fulfillWaiters() | ||
} | ||
|
||
/// Reserve 1 unit of the limit if available | ||
private func withdraw(eventLoop: EventLoop) -> EventLoopFuture<Lease> { | ||
let future = self.lock.withLock { () -> EventLoopFuture<Lease> in | ||
if self.waiters.isEmpty && self.unitsLeft >= 1 { | ||
self.unitsLeft -= 1 | ||
|
||
return eventLoop.makeSucceededFuture(Lease()) | ||
} | ||
|
||
let promise = eventLoop.makePromise(of: Lease.self) | ||
self.waiters.append(Waiter(promise: promise)) | ||
|
||
return promise.futureResult | ||
} | ||
|
||
return future | ||
} | ||
|
||
fileprivate struct Waiter { | ||
var promise: EventLoopPromise<Lease> | ||
} | ||
|
||
fileprivate struct Lease {} | ||
} | ||
|
||
extension Array where Element == ConcurrencyLimiter.Waiter { | ||
fileprivate func fulfillWaiters() { | ||
self.forEach { waiter in | ||
return waiter.promise.succeed(.init()) | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here you don't need to dispatch off. The
body
returns a future and therefore will be non-blocking.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ideally that should be the case, but if I understand correctly, a rogue
body
could still do blocking operations? Executing in a dispatch queue would keep us more aligned with the previous implementation where we do not block the event loop in such a case.For e.g.,
(Note that this still blocks if there is a blocking call inside
flatMap
likereturn elg.makeSucceededFuture(()).flatMap{ someBlockingCall() }
. But this would be the same behavior in previous implementation)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's true. But a rogue body could also just crash the program or worse. We shouldn't write overly defensive code and if we find such a rogue body, we should fix it instead. Every dispatch to another thread (and back) costs us performance so we should avoid it as much as possible.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood now, makes sense. While I do agree we need to fix that, I think it is a non trivial task (pls correct me if I'm wrong here) to find and fix all usages. So I think it is better to do that as an enhancement as a different task since the objective of the task is to only get rid of using
OperationQueue
. Does that sound fair?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you seen any code that uses this which is blocking and returns a future? There shouldn't really be any such code (of course there might but I'd hope it's not widespread).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this on DM