Skip to content

To support linux better, replace operation queue with dispatch queue in BatchingFutureOperationQueue #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 95 additions & 37 deletions Sources/TSFFutures/BatchingFutureOperationQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors

import Dispatch
import Foundation

import NIO
import NIOConcurrencyHelpers
import TSCUtility


/// Run the given computations on a given array in batches, exercising
/// a specified amount of parallelism.
///
Expand All @@ -23,35 +24,67 @@ public struct LLBBatchingFutureOperationQueue {
/// Threads capable of running futures.
public let group: LLBFuturesDispatchGroup

/// Queue of outstanding operations.
@usableFromInline
let operationQueue: OperationQueue
/// Whether the queue is suspended.
@available(*, deprecated, message: "Property 'isSuspended' is deprecated.")
public var isSuspended: Bool {
// Cannot suspend a DispatchQueue
false
}

/// Because `LLBBatchingFutureOperationQueue` is a struct, the compiler
/// will claim that `maxOpCount`'s setter is `mutating`, even though
/// `OperationQueue` is a threadsafe class.
/// This method exists as a workaround to adjust the underlying concurrency
/// of the operation queue without unnecessary synchronization.
public func setMaxOpCount(_ maxOpCount: Int) {
operationQueue.maxConcurrentOperationCount = maxOpCount
concurrencyLimiter.maximumConcurrency = Self.bridged(maxOpCount: maxOpCount)
}

/// Maximum number of operations executed concurrently.
public var maxOpCount: Int {
get { operationQueue.maxConcurrentOperationCount }
get { concurrencyLimiter.maximumConcurrency }
set { self.setMaxOpCount(newValue) }
}

/// Return the number of operations currently queued.
@inlinable
public var opCount: Int {
return operationQueue.operationCount
}
public var opCount: Int { concurrencyLimiter.sharesInUse }

/// Name to be used for dispatch queue
private let name: String

/// QoS passed to DispatchQueue
private let qos: DispatchQoS

/// Lock protecting state.
private let lock = NIOConcurrencyHelpers.Lock()

/// Limits number of concurrent operations being executed
private let concurrencyLimiter: ConcurrencyLimiter

/// The queue of operations to run.
private var workQueue = NIO.CircularBuffer<DispatchWorkItem>()

@available(*, deprecated, message: "'qualityOfService' is deprecated: Use 'dispatchQoS'")
public init(
name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int,
qualityOfService: QualityOfService
) {
let dispatchQoS: DispatchQoS

switch qualityOfService {
case .userInteractive:
dispatchQoS = .userInteractive
case .userInitiated:
dispatchQoS = .userInitiated
case .utility:
dispatchQoS = .utility
case .background:
dispatchQoS = .background
default:
dispatchQoS = .default
}

/// Whether the queue is suspended.
@inlinable
public var isSuspended: Bool {
return operationQueue.isSuspended
self.init(name: name, group: group, maxConcurrentOperationCount: maxOpCount, dispatchQoS: dispatchQoS)
}

///
Expand All @@ -60,49 +93,74 @@ public struct LLBBatchingFutureOperationQueue {
/// - group: Threads capable of running futures.
/// - maxConcurrentOperationCount:
/// Operations to execute in parallel.
@inlinable
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int, qualityOfService: QualityOfService = .default) {
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int) {
self.init(name: name, group: group, maxConcurrentOperationCount: maxOpCount, dispatchQoS: .default)
}

public init(
name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCnt: Int,
dispatchQoS: DispatchQoS
) {
self.group = group
self.operationQueue = OperationQueue(tsf_withName: name, maxConcurrentOperationCount: maxOpCount)
self.operationQueue.qualityOfService = qualityOfService
self.name = name
self.qos = dispatchQoS

self.concurrencyLimiter = ConcurrencyLimiter(maximumConcurrency: Self.bridged(maxOpCount: maxOpCnt))
}

@inlinable
public func execute<T>(_ body: @escaping () throws -> T) -> LLBFuture<T> {
let promise = group.next().makePromise(of: T.self)
operationQueue.addOperation {
promise.fulfill(body)
return self.concurrencyLimiter.withReplenishableLimit(eventLoop: group.any()) { eventLoop in
let promise = eventLoop.makePromise(of: T.self)

DispatchQueue(label: self.name, qos: self.qos).async {
promise.fulfill(body)
}

return promise.futureResult
}
return promise.futureResult
}

@inlinable
// Deprecated: If we want to limit concurrency on EventLoopThreads, use FutureOperationQueue instead.
// A future returning body is expected to not block.
@available(*, deprecated, message: "Use FutureOperationQueue instead.")
public func execute<T>(_ body: @escaping () -> LLBFuture<T>) -> LLBFuture<T> {
let promise = group.next().makePromise(of: T.self)
operationQueue.addOperation {
let f = body()
f.cascade(to: promise)
return self.concurrencyLimiter.withReplenishableLimit(eventLoop: group.any()) { eventLoop in
let promise = eventLoop.makePromise(of: T.self)

DispatchQueue(label: self.name, qos: self.qos).async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you don't need to dispatch off. The body returns a future and therefore will be non-blocking.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ideally that should be the case, but if I understand correctly, a rogue body could still do blocking operations? Executing in a dispatch queue would keep us more aligned with the previous implementation where we do not block the event loop in such a case.

For e.g.,

batchFutOpQ.execute {
  someBlockingCall()
  return elg.makeSucceededFuture(())
}

(Note that this still blocks if there is a blocking call inside flatMap like return elg.makeSucceededFuture(()).flatMap{ someBlockingCall() }. But this would be the same behavior in previous implementation)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's true. But a rogue body could also just crash the program or worse. We shouldn't write overly defensive code and if we find such a rogue body, we should fix it instead. Every dispatch to another thread (and back) costs us performance so we should avoid it as much as possible.

Copy link
Author

@OmSaran OmSaran Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood now, makes sense. While I do agree we need to fix that, I think it is a non trivial task (pls correct me if I'm wrong here) to find and fix all usages. So I think it is better to do that as an enhancement as a different task since the objective of the task is to only get rid of using OperationQueue. Does that sound fair?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen any code that uses this which is blocking and returns a future? There shouldn't really be any such code (of course there might but I'd hope it's not widespread).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this on DM

body().cascade(to: promise)
}

// Wait for completion, to ensure we maintain at most N concurrent operations.
_ = try? f.wait()
return promise.futureResult
}
return promise.futureResult
}

/// Order-preserving parallel execution. Wait for everything to complete.
@inlinable
public func execute<A,T>(_ args: [A], minStride: Int = 1, _ body: @escaping (ArraySlice<A>) throws -> [T]) -> LLBFuture<[T]> {
public func execute<A, T>(_ args: [A], minStride: Int = 1, _ body: @escaping (ArraySlice<A>) throws -> [T])
-> LLBFuture<[T]>
{
let futures: [LLBFuture<[T]>] = executeNoWait(args, minStride: minStride, body)
let loop = futures.first?.eventLoop ?? group.next()
return LLBFuture<[T]>.whenAllSucceed(futures, on: loop).map{$0.flatMap{$0}}
return LLBFuture<[T]>.whenAllSucceed(futures, on: loop).map { $0.flatMap { $0 } }
}

/// Order-preserving parallel execution.
/// Do not wait for all executions to complete, returning individual futures.
@inlinable
public func executeNoWait<A,T>(_ args: [A], minStride: Int = 1, maxStride: Int = Int.max, _ body: @escaping (ArraySlice<A>) throws -> [T]) -> [LLBFuture<[T]>] {
let batches: [ArraySlice<A>] = args.tsc_sliceBy(maxStride: max(minStride, min(maxStride, args.count / maxOpCount)))
return batches.map{arg in execute{try body(arg)}}
public func executeNoWait<A, T>(
_ args: [A], minStride: Int = 1, maxStride: Int = Int.max, _ body: @escaping (ArraySlice<A>) throws -> [T]
) -> [LLBFuture<[T]>] {
let batches: [ArraySlice<A>] = args.tsc_sliceBy(
maxStride: max(minStride, min(maxStride, args.count / maxOpCount)))
return batches.map { arg in execute { try body(arg) } }
}

private static func bridged(maxOpCount: Int) -> Int {
if maxOpCount < 0 {
precondition(maxOpCount == OperationQueue.defaultMaxConcurrentOperationCount)
return System.coreCount
}
return maxOpCount
}
}
111 changes: 111 additions & 0 deletions Sources/TSFFutures/ConcurrencyLimiter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

import NIO
import NIOConcurrencyHelpers

internal final class ConcurrencyLimiter {
public var maximumConcurrency: Int {
get { lock.withLock { maximumConcurrency_ } }
set {
var waiters: [Waiter] = []
lock.withLockVoid {
let differenceInCapacity = (newValue - maximumConcurrency_)
unitsLeft += differenceInCapacity
maximumConcurrency_ = newValue
waiters = tryFulfillSomeLocked()
}
waiters.fulfillWaiters()
}
}
public var sharesInUse: Int { lock.withLock { maximumConcurrency_ - unitsLeft } }

private var unitsLeft: Int // protected by `self.lock`
private var waiters: CircularBuffer<Waiter> = [] // protected by `self.lock`
private let lock = Lock()
private var maximumConcurrency_: Int // protected by `self.lock`

public init(maximumConcurrency: Int) {
precondition(maximumConcurrency >= 0)

self.maximumConcurrency_ = maximumConcurrency
self.unitsLeft = maximumConcurrency
}

/// Reserves 1 unit of concurrency, executes body after which it restores the 1 unit.
public func withReplenishableLimit<T>(
eventLoop: EventLoop,
_ body: @escaping (EventLoop) -> EventLoopFuture<T>
) -> EventLoopFuture<T> {
return self.withdraw(eventLoop: eventLoop).flatMap { lease in
body(eventLoop).always { _ in
self.replenish(lease)
}
}
}

private func tryFulfillSomeLocked() -> [Waiter] {
var toSucceed: [Waiter] = []
let unitsLeftAtStart = self.unitsLeft

while !self.waiters.isEmpty, self.unitsLeft >= 1 {
let waiter = self.waiters.removeFirst()

self.unitsLeft -= 1
assert(self.unitsLeft >= 0)
toSucceed.append(waiter)
}

assert(unitsLeftAtStart - toSucceed.count == self.unitsLeft)
return toSucceed
}

private func replenish(_ lease: Lease) {
self.lock.withLock { () -> [Waiter] in
self.unitsLeft += 1
assert(self.unitsLeft <= self.maximumConcurrency_)
return self.tryFulfillSomeLocked()
}.fulfillWaiters()
}

/// Reserve 1 unit of the limit if available
private func withdraw(eventLoop: EventLoop) -> EventLoopFuture<Lease> {
let future = self.lock.withLock { () -> EventLoopFuture<Lease> in
if self.waiters.isEmpty && self.unitsLeft >= 1 {
self.unitsLeft -= 1

return eventLoop.makeSucceededFuture(Lease())
}

let promise = eventLoop.makePromise(of: Lease.self)
self.waiters.append(Waiter(promise: promise))

return promise.futureResult
}

return future
}

fileprivate struct Waiter {
var promise: EventLoopPromise<Lease>
}

fileprivate struct Lease {}
}

extension Array where Element == ConcurrencyLimiter.Waiter {
fileprivate func fulfillWaiters() {
self.forEach { waiter in
return waiter.promise.succeed(.init())
}
}
}
Loading