Skip to content

Commit a40dd44

Browse files
committedAug 26, 2020
Refactor the protocols to provide unified reader API
1 parent ac6c3b7 commit a40dd44

File tree

5 files changed

+191
-181
lines changed

5 files changed

+191
-181
lines changed
 

‎Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ let package = Package(
5454
.target(
5555
name: "TSFCASUtilities",
5656
dependencies: [
57-
"TSFCAS",
57+
"TSFCAS", "TSFCASFileTree",
5858
]
5959
),
6060
.testTarget(

‎Sources/TSFCASUtilities/LinkedListStream.swift

+35-81
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,52 @@
88

99
import TSCUtility
1010
import TSFCAS
11+
import TSFCASFileTree
12+
13+
private extension String {
14+
func prepending(_ prefix: String) -> String {
15+
return prefix + self
16+
}
17+
}
1118

1219
/// Basic writer implementation that resembles a linked list where each node contains control data (like the channel)
1320
/// and refs[0] always points to the dataID of the data chunk and refs[1] has the data ID for the next node in the
1421
/// chain, if it's not the last node. This implementation is not thread safe.
15-
public struct LLBLinkedListStreamWriter: LLBCASStreamWriter {
22+
public struct LLBLinkedListStreamWriter {
1623
private let db: LLBCASDatabase
24+
private let ext: String
1725

1826
public private(set) var latestID: LLBFuture<LLBDataID>?
1927

20-
public init(_ db: LLBCASDatabase) {
28+
public init(_ db: LLBCASDatabase, ext: String? = nil) {
2129
self.db = db
30+
self.ext = ext?.prepending(".") ?? ""
2231
}
2332

2433
@discardableResult
2534
public mutating func append(data: LLBByteBuffer, channel: UInt8, _ ctx: Context) -> LLBFuture<LLBDataID> {
2635
let latest = (
2736
// Append on the previously cached node, or use nil as sentinel if this is the first write.
2837
latestID?.map { $0 } ?? db.group.next().makeSucceededFuture(nil)
29-
).flatMap { [db] (currentDataID: LLBDataID?) -> LLBFuture<LLBDataID> in
30-
db.put(data: data, ctx).flatMap { [db] contentID in
31-
// Put the channel as the only byte in the control node.
32-
let controlData = LLBByteBuffer(bytes: [channel])
33-
// compactmap to remove the currentDataID in case it was the nil sentinel.
34-
let refs: [LLBDataID] = [contentID, currentDataID].compactMap { $0 }
35-
return db.put(refs: refs, data: controlData, ctx)
38+
).flatMap { [db, ext] (currentDataID: LLBDataID?) -> LLBFuture<LLBDataID> in
39+
db.put(data: data, ctx).flatMap { [db, ext] contentID in
40+
41+
var entries = [
42+
LLBDirectoryEntryID(
43+
info: .init(name: "\(channel)\(ext)", type: .plainFile, size: data.readableBytes),
44+
id: contentID
45+
),
46+
]
47+
48+
if let currentDataID = currentDataID {
49+
entries.append(
50+
LLBDirectoryEntryID(
51+
info: .init(name: "prev", type: .directory, size: 0),
52+
id: currentDataID
53+
)
54+
)
55+
}
56+
return LLBCASFileTree.create(files: entries, in: db, ctx).map { $0.id }
3657
}
3758
}
3859

@@ -41,77 +62,10 @@ public struct LLBLinkedListStreamWriter: LLBCASStreamWriter {
4162
}
4263
}
4364

44-
public struct LLBLinkedListStreamReader: LLBCASStreamReader {
45-
private let db: LLBCASDatabase
46-
47-
public init(_ db: LLBCASDatabase) {
48-
self.db = db
49-
}
50-
51-
public func read(
52-
id: LLBDataID,
53-
channels: [UInt8]?,
54-
lastReadID: LLBDataID?,
55-
_ ctx: Context,
56-
readerBlock: @escaping (UInt8, LLBByteBuffer) throws -> Bool
57-
) -> LLBFuture<Void> {
58-
return innerRead(
59-
id: id,
60-
channels: channels,
61-
lastReadID: lastReadID,
62-
ctx,
63-
readerBlock: readerBlock
64-
).map { _ in () }
65-
}
66-
67-
private func innerRead(
68-
id: LLBDataID,
69-
channels: [UInt8]? = nil,
70-
lastReadID: LLBDataID? = nil,
71-
_ ctx: Context,
72-
readerBlock: @escaping (UInt8, LLBByteBuffer) throws -> Bool
73-
) -> LLBFuture<Bool> {
74-
if id == lastReadID {
75-
return db.group.next().makeSucceededFuture(true)
76-
}
77-
78-
return db.get(id, ctx).flatMap { [db] node -> LLBFuture<Bool> in
79-
guard let node = node, let channel = node.data.getBytes(at: 0, length: 1)?.first else {
80-
return db.group.next().makeFailedFuture(LLBCASStreamError.invalid)
81-
}
82-
83-
let readChainFuture: LLBFuture<Bool>
84-
// If there are 2 refs, it's an intermediate node in the linked list, so we schedule a read of the next
85-
// node before scheduling a read of the current node.
86-
if node.refs.count == 2 {
87-
readChainFuture = self.innerRead(
88-
id: node.refs[1],
89-
channels: channels,
90-
lastReadID: lastReadID,
91-
ctx,
92-
readerBlock: readerBlock
93-
)
94-
} else {
95-
// If this is the last node, schedule a sentinel read that returns to keep on reading.
96-
readChainFuture = db.group.next().makeSucceededFuture(true)
97-
}
98-
99-
return readChainFuture.flatMap { [db] shouldContinue -> LLBFuture<Bool> in
100-
// If we don't want to continue reading, or if the channel is not requested, close the current chain
101-
// and propagate the desire to keep on reading.
102-
guard shouldContinue && channels?.contains(channel) ?? true else {
103-
return db.group.next().makeSucceededFuture(shouldContinue)
104-
}
105-
106-
// Read the node since it's expected to exist.
107-
return db.get(node.refs[0], ctx).flatMapThrowing { (dataNode: LLBCASObject?) -> Bool in
108-
guard let dataNode = dataNode else {
109-
throw LLBCASStreamError.missing
110-
}
111-
112-
return try readerBlock(channel, dataNode.data)
113-
}
114-
}
115-
}
65+
public extension LLBLinkedListStreamWriter {
66+
@discardableResult
67+
@inlinable
68+
mutating func append(data: LLBByteBuffer, _ ctx: Context) -> LLBFuture<LLBDataID> {
69+
return append(data: data, channel: 0, ctx)
11670
}
11771
}

‎Sources/TSFCASUtilities/StreamProtocol.swift

-89
This file was deleted.
+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// This source file is part of the Swift.org open source project
2+
//
3+
// Copyright (c) 2020 Apple Inc. and the Swift project authors
4+
// Licensed under Apache License v2.0 with Runtime Library Exception
5+
//
6+
// See http://swift.org/LICENSE.txt for license information
7+
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
8+
9+
import TSCUtility
10+
import TSFCAS
11+
import TSFCASFileTree
12+
13+
/// Implements the reading logic to read any kind of streaming data storage implemented. Currently it's hardcoded to
14+
/// read the LinkedList stream contents, but could get extended in the future when new storage structures are
15+
/// implemented. This should be the unified API to read streaming content, so that readers do not need to understand
16+
/// which writer was used to store the data.
17+
public struct LLBCASStreamReader {
18+
private let db: LLBCASDatabase
19+
20+
public init(_ db: LLBCASDatabase) {
21+
self.db = db
22+
}
23+
24+
public func read(
25+
id: LLBDataID,
26+
channels: [UInt8]?,
27+
lastReadID: LLBDataID?,
28+
_ ctx: Context,
29+
readerBlock: @escaping (UInt8, LLBByteBufferView) throws -> Bool
30+
) -> LLBFuture<Void> {
31+
return innerRead(
32+
id: id,
33+
channels: channels,
34+
lastReadID: lastReadID,
35+
ctx,
36+
readerBlock: readerBlock
37+
).map { _ in () }
38+
}
39+
40+
private func innerRead(
41+
id: LLBDataID,
42+
channels: [UInt8]? = nil,
43+
lastReadID: LLBDataID? = nil,
44+
_ ctx: Context,
45+
readerBlock: @escaping (UInt8, LLBByteBufferView) throws -> Bool
46+
) -> LLBFuture<Bool> {
47+
if id == lastReadID {
48+
return db.group.next().makeSucceededFuture(true)
49+
}
50+
51+
return LLBCASFSClient(db).load(id, ctx).flatMap { node in
52+
guard let tree = node.tree else {
53+
return db.group.next().makeFailedFuture(LLBCASStreamError.invalid)
54+
}
55+
56+
let readChainFuture: LLBFuture<Bool>
57+
58+
// If there is a "prev" directory, treat this as a linked list implementation. This is an implementation
59+
// detail that could be cleaned up later, but the idea is that there's a single unified reader entity.
60+
if let (id, _) = tree.lookup("prev") {
61+
readChainFuture = self.innerRead(
62+
id: id,
63+
channels: channels,
64+
lastReadID: lastReadID,
65+
ctx,
66+
readerBlock: readerBlock
67+
)
68+
} else {
69+
// If this is the last node, schedule a sentinel read that returns to keep on reading.
70+
readChainFuture = db.group.next().makeSucceededFuture(true)
71+
}
72+
73+
return readChainFuture.flatMap { [db] shouldContinue -> LLBFuture<Bool> in
74+
// If we don't want to continue reading, or if the channel is not requested, close the current chain
75+
// and propagate the desire to keep on reading.
76+
guard shouldContinue else {
77+
return db.group.next().makeSucceededFuture(shouldContinue)
78+
}
79+
80+
let files = tree.files.filter {
81+
$0.type == .plainFile
82+
}
83+
84+
guard files.count == 1, let (contentID, _) = tree.lookup(files[0].name) else {
85+
return db.group.next().makeFailedFuture(LLBCASStreamError.invalid)
86+
}
87+
88+
let channelOpt = files.first.flatMap { $0.name.split(separator: ".").first }.flatMap { UInt8($0) }
89+
90+
guard let channel = channelOpt,
91+
channels?.contains(channel) ?? true else {
92+
return db.group.next().makeSucceededFuture(true)
93+
}
94+
95+
return LLBCASFSClient(db).load(contentID, ctx).flatMap { node in
96+
guard let blob = node.blob else {
97+
return db.group.next().makeFailedFuture(LLBCASStreamError.missing)
98+
}
99+
100+
return blob.read(ctx).flatMapThrowing { byteBufferView in
101+
return try readerBlock(channel, byteBufferView)
102+
}
103+
}
104+
}
105+
}
106+
}
107+
}
108+
109+
// Convenience extension for default parameters.
110+
public extension LLBCASStreamReader {
111+
@inlinable
112+
func read(
113+
id: LLBDataID,
114+
_ ctx: Context,
115+
readerBlock: @escaping (UInt8, LLBByteBufferView) throws -> Bool
116+
) -> LLBFuture<Void> {
117+
return read(id: id, channels: nil, lastReadID: nil, ctx, readerBlock: readerBlock)
118+
}
119+
120+
@inlinable
121+
func read(
122+
id: LLBDataID,
123+
channels: [UInt8],
124+
_ ctx: Context,
125+
readerBlock: @escaping (UInt8, LLBByteBufferView) throws -> Bool
126+
) -> LLBFuture<Void> {
127+
return read(id: id, channels: channels, lastReadID: nil, ctx, readerBlock: readerBlock)
128+
}
129+
130+
@inlinable
131+
func read(
132+
id: LLBDataID,
133+
lastReadID: LLBDataID,
134+
_ ctx: Context,
135+
readerBlock: @escaping (UInt8, LLBByteBufferView) throws -> Bool
136+
) -> LLBFuture<Void> {
137+
return read(id: id, channels: nil, lastReadID: lastReadID, ctx, readerBlock: readerBlock)
138+
}
139+
}
140+
141+
/// Common error types for stream protocol implementations.
142+
public enum LLBCASStreamError: Error {
143+
case invalid
144+
case missing
145+
}

0 commit comments

Comments
 (0)
Please sign in to comment.