Skip to content

Commit f3d24f4

Browse files
committedAug 2, 2024
make a bunch of stuff Sendable & add thread safety
1 parent 3c476af commit f3d24f4

12 files changed

+120
-92
lines changed
 

‎Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ let package = Package(
2323
targets: ["TSFCAS", "TSFCASFileTree", "TSFCASUtilities"]),
2424
],
2525
dependencies: [
26-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.32.0"),
26+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.68.0"),
2727
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.20.3"),
2828
.package(url: "https://github.com/apple/swift-tools-support-core.git", "0.2.7" ..< "0.6.0"),
2929
],

‎Sources/TSFCAS/Database.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public struct LLBCASFeatures: Codable {
4444
/// A content-addressable database protocol
4545
///
4646
/// THREAD-SAFETY: The database is expected to be thread-safe.
47-
public protocol LLBCASDatabase: AnyObject {
47+
public protocol LLBCASDatabase: AnyObject & Sendable {
4848
var group: LLBFuturesDispatchGroup { get }
4949

5050
/// Get the supported features of this database implementation
@@ -152,10 +152,10 @@ public extension Context {
152152

153153
var db: LLBCASDatabase {
154154
get {
155-
guard let db = self[ObjectIdentifier(LLBCASDatabase.self)] else {
155+
guard let db = self[ObjectIdentifier(LLBCASDatabase.self), as: LLBCASDatabase.self] else {
156156
fatalError("no CAS database")
157157
}
158-
return db as! LLBCASDatabase
158+
return db
159159
}
160160
set {
161161
self[ObjectIdentifier(LLBCASDatabase.self)] = newValue

‎Sources/TSFCAS/Implementations/InMemoryCASDatabase.swift

+28-22
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,25 @@ import TSFUtility
1717

1818

1919
/// A simple in-memory implementation of the `LLBCASDatabase` protocol.
20-
public final class LLBInMemoryCASDatabase {
21-
/// The content.
22-
private var content = [LLBDataID: LLBCASObject]()
20+
public final class LLBInMemoryCASDatabase: Sendable {
21+
struct State: Sendable {
22+
/// The content.
23+
var content = [LLBDataID: LLBCASObject]()
2324

24-
/// Threads capable of running futures.
25-
public var group: LLBFuturesDispatchGroup
25+
var totalDataBytes: Int = 0
26+
}
27+
28+
private let state: NIOLockedValueBox<State> = NIOLockedValueBox(State())
2629

27-
/// The lock protecting content.
28-
let lock = NIOConcurrencyHelpers.NIOLock()
30+
/// Threads capable of running futures.
31+
public let group: LLBFuturesDispatchGroup
2932

3033
/// The total number of data bytes in the database (this does not include the size of refs).
3134
public var totalDataBytes: Int {
32-
return lock.withLock { _totalDataBytes }
35+
return self.state.withLockedValue { state in
36+
return state.totalDataBytes
37+
}
3338
}
34-
fileprivate var _totalDataBytes: Int = 0
3539

3640
/// Create an in-memory database.
3741
public init(group: LLBFuturesDispatchGroup) {
@@ -41,23 +45,23 @@ public final class LLBInMemoryCASDatabase {
4145
/// Delete the data in the database.
4246
/// Intentionally not exposed via the CASDatabase protocol.
4347
public func delete(_ id: LLBDataID, recursive: Bool) -> LLBFuture<Void> {
44-
lock.withLockVoid {
45-
unsafeDelete(id, recursive: recursive)
48+
self.state.withLockedValue { state in
49+
unsafeDelete(state: &state, id, recursive: recursive)
4650
}
4751
return group.next().makeSucceededFuture(())
4852
}
49-
private func unsafeDelete(_ id: LLBDataID, recursive: Bool) {
50-
guard let object = content[id] else {
53+
private func unsafeDelete(state: inout State, _ id: LLBDataID, recursive: Bool) {
54+
guard let object = state.content[id] else {
5155
return
5256
}
53-
_totalDataBytes -= object.data.readableBytes
57+
state.totalDataBytes -= object.data.readableBytes
5458

5559
guard recursive else {
5660
return
5761
}
5862

5963
for ref in object.refs {
60-
unsafeDelete(ref, recursive: recursive)
64+
unsafeDelete(state: &state, ref, recursive: recursive)
6165
}
6266
}
6367
}
@@ -68,12 +72,14 @@ extension LLBInMemoryCASDatabase: LLBCASDatabase {
6872
}
6973

7074
public func contains(_ id: LLBDataID, _ ctx: Context) -> LLBFuture<Bool> {
71-
let result = lock.withLock { self.content.index(forKey: id) != nil }
75+
let result = self.state.withLockedValue { state in
76+
state.content.index(forKey: id) != nil
77+
}
7278
return group.next().makeSucceededFuture(result)
7379
}
7480

7581
public func get(_ id: LLBDataID, _ ctx: Context) -> LLBFuture<LLBCASObject?> {
76-
let result = lock.withLock { self.content[id] }
82+
let result = self.state.withLockedValue { state in state.content[id] }
7783
return group.next().makeSucceededFuture(result)
7884
}
7985

@@ -86,13 +92,13 @@ extension LLBInMemoryCASDatabase: LLBCASDatabase {
8692
}
8793

8894
public func put(knownID id: LLBDataID, refs: [LLBDataID] = [], data: LLBByteBuffer, _ ctx: Context) -> LLBFuture<LLBDataID> {
89-
lock.withLockVoid {
90-
guard content[id] == nil else {
91-
assert(content[id]?.data == data, "put data for id doesn't match")
95+
self.state.withLockedValue { state in
96+
guard state.content[id] == nil else {
97+
assert(state.content[id]?.data == data, "put data for id doesn't match")
9298
return
9399
}
94-
_totalDataBytes += data.readableBytes
95-
content[id] = LLBCASObject(refs: refs, data: data)
100+
state.totalDataBytes += data.readableBytes
101+
state.content[id] = LLBCASObject(refs: refs, data: data)
96102
}
97103
return group.next().makeSucceededFuture(id)
98104
}

‎Sources/TSFCAS/Object.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import NIOFoundationCompat
1515

1616
// MARK:- CASObject Definition -
1717

18-
public struct LLBCASObject: Equatable {
18+
public struct LLBCASObject: Equatable, Sendable {
1919
/// The list of references.
2020
public let refs: [LLBDataID]
2121

@@ -54,7 +54,7 @@ public protocol LLBCASObjectConstructable {
5454

5555
public extension LLBCASObject {
5656
init(rawBytes: Data) throws {
57-
let pb = try LLBPBCASObject.init(serializedData: rawBytes)
57+
let pb = try LLBPBCASObject(serializedBytes: rawBytes)
5858
var data = LLBByteBufferAllocator().buffer(capacity: pb.data.count)
5959
data.writeBytes(pb.data)
6060
self.init(refs: pb.refs, data: data)
@@ -71,7 +71,7 @@ public extension LLBCASObject {
7171
extension LLBCASObject: LLBSerializable {
7272
public init(from rawBytes: LLBByteBuffer) throws {
7373
let pb = try rawBytes.withUnsafeReadableBytes {
74-
try LLBPBCASObject.init(serializedData: Data(bytesNoCopy: UnsafeMutableRawPointer(mutating: $0.baseAddress!), count: $0.count, deallocator: .none))
74+
try LLBPBCASObject(serializedBytes: Data(bytesNoCopy: UnsafeMutableRawPointer(mutating: $0.baseAddress!), count: $0.count, deallocator: .none))
7575
}
7676
let refs = pb.refs
7777
var data = LLBByteBufferAllocator().buffer(capacity: pb.data.count)

‎Sources/TSFCASFileTree/Context.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public extension Context {
1818

1919
var fileTreeImportOptions: LLBCASFileTree.ImportOptions? {
2020
get {
21-
guard let options = self[ObjectIdentifier(LLBCASFileTree.ImportOptions.self)] as? LLBCASFileTree.ImportOptions else {
21+
guard let options = self[ObjectIdentifier(LLBCASFileTree.ImportOptions.self), as: LLBCASFileTree.ImportOptions.self] else {
2222
return nil
2323
}
2424

@@ -36,7 +36,7 @@ public extension Context {
3636

3737
var fileTreeExportStorageBatcher: LLBBatchingFutureOperationQueue? {
3838
get {
39-
guard let options = self[ObjectIdentifier(Self.fileTreeExportStorageBatcherKey)] as? LLBBatchingFutureOperationQueue else {
39+
guard let options = self[ObjectIdentifier(Self.fileTreeExportStorageBatcherKey), as: LLBBatchingFutureOperationQueue.self] else {
4040
return nil
4141
}
4242

‎Sources/TSFCASFileTree/FileInfo.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ extension LLBFileInfo {
4141
public static func deserialize(from buffer: LLBByteBuffer) throws -> Self {
4242
return try buffer.withUnsafeReadableBytesWithStorageManagement { (buffer, mgr) in
4343
_ = mgr.retain()
44-
return try Self.init(serializedData: Data(
44+
return try Self.init(serializedBytes: Data(
4545
bytesNoCopy: UnsafeMutableRawPointer(mutating: buffer.baseAddress!),
4646
count: buffer.count,
4747
deallocator: .custom({ _,_ in mgr.release() }
@@ -51,7 +51,7 @@ extension LLBFileInfo {
5151

5252
@inlinable
5353
public static func deserialize(from buffer: UnsafeBufferPointer<UInt8>) throws -> Self {
54-
return try Self.init(serializedData: Data(
54+
return try Self.init(serializedBytes: Data(
5555
// NOTE: This doesn't actually mutate, which is why this is safe.
5656
bytesNoCopy: UnsafeMutableRawPointer(mutating: buffer.baseAddress!),
5757
count: buffer.count, deallocator: .none))

‎Sources/TSFCASFileTree/FileTreeImport.swift

+9-7
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public protocol LLBCASFileTreeImportProgressStats: AnyObject {
4141
public extension LLBCASFileTree {
4242

4343
/// Serialization format.
44-
enum WireFormat: String, CaseIterable {
44+
enum WireFormat: String, CaseIterable, Sendable {
4545
/// Binary encoding for directory and file data
4646
case binary
4747
/// Binary encoding with data compression applied.
@@ -56,7 +56,7 @@ public extension LLBCASFileTree {
5656
case compressionFailed(String)
5757
}
5858

59-
enum ImportPhase: Int, Comparable {
59+
enum ImportPhase: Int, Comparable, Sendable {
6060
case AssemblingPaths
6161
case EstimatingSize
6262
case CheckIfUploaded
@@ -76,7 +76,7 @@ public extension LLBCASFileTree {
7676
}
7777
}
7878

79-
struct PreservePosixDetails {
79+
struct PreservePosixDetails: Sendable {
8080
/// Preserve POSIX file permissions.
8181
public var preservePosixMode = false
8282

@@ -91,7 +91,7 @@ public extension LLBCASFileTree {
9191
}
9292

9393
/// Modifiers for the default behavior of the `import` call.
94-
struct ImportOptions {
94+
struct ImportOptions: Sendable {
9595
/// The serialization format for persisting the CASTrees.
9696
public var wireFormat: WireFormat
9797

@@ -126,7 +126,7 @@ public extension LLBCASFileTree {
126126
/// match expectation, it is not recursed into.
127127
/// NB: The filter argument is an absolute path _relative to the
128128
/// import location_. A top level imported directory becomes "/".
129-
public var pathFilter: ((String) -> Bool)?
129+
public var pathFilter: (@Sendable (String) -> Bool)?
130130

131131
/// Shared queues for operations that should be limited by mainly the
132132
/// data drive parallelism, network concurrency parallelism,
@@ -152,7 +152,7 @@ public extension LLBCASFileTree {
152152
}
153153
}
154154

155-
final class ImportProgressStats: LLBCASFileTreeImportProgressStats, CustomDebugStringConvertible {
155+
final class ImportProgressStats: LLBCASFileTreeImportProgressStats, CustomDebugStringConvertible, Sendable {
156156

157157
/// Number of plain files to import (not directories).
158158
let toImportFiles_ = ManagedAtomic<Int>(0)
@@ -334,7 +334,7 @@ public extension LLBCASFileTree {
334334
}
335335

336336

337-
private final class CASTreeImport {
337+
private final class CASTreeImport: Sendable {
338338

339339
let importPath: AbsolutePath
340340
let options: LLBCASFileTree.ImportOptions
@@ -1212,6 +1212,8 @@ private final class CASTreeImport {
12121212
}
12131213

12141214
// NB: does .wait(), therefore only safe on BatchingFutureOperationQueue.
1215+
@available(*, noasync, message: "This method blocks indefinitely, don't use from 'async' or SwiftNIO EventLoops")
1216+
@available(*, deprecated, message: "This method blocks indefinitely and returns a future")
12151217
private func executeWithBackpressure<T>(on queue: LLBFutureOperationQueue, loop: LLBFuturesDispatchLoop, size: Int = 1, default stopValue: T, _ body: @escaping () -> LLBFuture<T>) -> LLBFuture<T> {
12161218
guard finalResultPromise.isCompleted == false else {
12171219
return loop.makeSucceededFuture(stopValue)

‎Sources/TSFFutures/BatchingFutureOperationQueue.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import TSCUtility
1919
/// For some blocking operations (such as file system accesses) executing
2020
/// them on the NIO loops is very expensive since it blocks the event
2121
/// processing machinery. Here we use extra threads for such operations.
22-
public struct LLBBatchingFutureOperationQueue {
22+
public struct LLBBatchingFutureOperationQueue: Sendable {
2323

2424
/// Threads capable of running futures.
2525
public let group: LLBFuturesDispatchGroup

‎Sources/TSFFutures/CancellablePromise.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ case promiseLeaked
2222
/// - The writer access that fulfills the promise or cancels it, getting
2323
/// back indication of whether or not that operation was successful.
2424
/// - The reader access that checks if the promise has been fulfilled.
25-
open class LLBCancellablePromise<T> {
25+
open class LLBCancellablePromise<T>: @unchecked /* because inheritance... */ Sendable {
2626
/// Underlying promise. Private to avoid messing with out outside
2727
/// of CancellablePromise lifecycle protection.
2828
private let promise: LLBPromise<T>

0 commit comments

Comments
 (0)
Please sign in to comment.