Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 31d3938

Browse files
authored
fix: yield single buffers (#233)
* fix: yield single buffers Messages are serialized to multiple buffers, intsead of yield each buffer one by one, create single buffers that contain the whole serialized message. This greatly improves transport performance as writing one big buffer is a lot faster than writing lots of small buffers to network sockets etc. Before: ``` testing 0.40.x-mplex sender 3276811 messages 17 invocations sender 6553636 bufs 17 b 24197 ms 105 MB in 32 B chunks in 24170ms ``` After: ``` testing 0.40.x-mplex sender 3276811 messages 1638408 invocations 1638411 bufs 68 b 8626 ms 105 MB in 32 B chunks in 8611ms ``` * chore: update comment
1 parent 279ad47 commit 31d3938

File tree

3 files changed

+21
-30
lines changed

3 files changed

+21
-30
lines changed

src/encode.ts

+12-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Source } from 'it-stream-types'
22
import varint from 'varint'
3+
import { Uint8ArrayList } from 'uint8arraylist'
34
import { allocUnsafe } from './alloc-unsafe.js'
45
import { Message, MessageTypes } from './message-types.js'
56

@@ -15,9 +16,9 @@ class Encoder {
1516
}
1617

1718
/**
18-
* Encodes the given message and returns it and its header
19+
* Encodes the given message and adds it to the passed list
1920
*/
20-
write (msg: Message): Uint8Array[] {
21+
write (msg: Message, list: Uint8ArrayList): void {
2122
const pool = this._pool
2223
let offset = this._poolOffset
2324

@@ -41,16 +42,11 @@ class Encoder {
4142
this._poolOffset = offset
4243
}
4344

45+
list.append(header)
46+
4447
if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) {
45-
return [
46-
header,
47-
...(msg.data instanceof Uint8Array ? [msg.data] : msg.data)
48-
]
48+
list.append(msg.data)
4949
}
50-
51-
return [
52-
header
53-
]
5450
}
5551
}
5652

@@ -61,12 +57,16 @@ const encoder = new Encoder()
6157
*/
6258
export async function * encode (source: Source<Message | Message[]>) {
6359
for await (const msg of source) {
60+
const list = new Uint8ArrayList()
61+
6462
if (Array.isArray(msg)) {
6563
for (const m of msg) {
66-
yield * encoder.write(m)
64+
encoder.write(m, list)
6765
}
6866
} else {
69-
yield * encoder.write(msg)
67+
encoder.write(msg, list)
7068
}
69+
70+
yield list.subarray()
7171
}
7272
}

src/mplex.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ export class MplexStreamMuxer implements StreamMuxer {
155155
_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
156156
const { id, name, type, registry } = options
157157

158-
log('new %s stream %s %s', type, id)
158+
log('new %s stream %s', type, id)
159159

160160
if (type === 'initiator' && this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) {
161161
throw errCode(new Error('Too many outbound streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS')

src/stream.ts

+8-17
Original file line numberDiff line numberDiff line change
@@ -171,24 +171,15 @@ export function createStream (options: Options): MplexStream {
171171
send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) })
172172
}
173173

174-
const uint8ArrayList = new Uint8ArrayList()
175-
176-
for await (const data of source) {
177-
if (data.length <= maxMsgSize) {
178-
send({ id, type: Types.MESSAGE, data: data instanceof Uint8ArrayList ? data : new Uint8ArrayList(data) })
179-
} else {
180-
uint8ArrayList.append(data)
181-
182-
while (uint8ArrayList.length !== 0) {
183-
// eslint-disable-next-line max-depth
184-
if (uint8ArrayList.length <= maxMsgSize) {
185-
send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist() })
186-
uint8ArrayList.consume(uint8ArrayList.length)
187-
break
188-
}
189-
send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist(0, maxMsgSize) })
190-
uint8ArrayList.consume(maxMsgSize)
174+
for await (let data of source) {
175+
while (data.length > 0) {
176+
if (data.length <= maxMsgSize) {
177+
send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data })
178+
break
191179
}
180+
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
181+
send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) })
182+
data.consume(maxMsgSize)
192183
}
193184
}
194185
} catch (err: any) {

0 commit comments

Comments
 (0)