Skip to content

Commit 2297856

Browse files
fix: limit unprocessed message queue size separately to message size (libp2p#234)
* fix: limit unprocessed message queue size separately to message size It's possible to receive lots of small messages in one buffer that can be larger than the max message size, so limit the unprocessed message queue size separately from the max message size. * chore: add tests * chore: pass option to decoder * chore: PR comment Co-authored-by: Marin Petrunić <[email protected]> Co-authored-by: Marin Petrunić <[email protected]>
1 parent 31d3938 commit 2297856

File tree

6 files changed

+96
-9
lines changed

6 files changed

+96
-9
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ Creates a factory that can be used to create new muxers.
7070
`options` is an optional `Object` that may have the following properties:
7171

7272
- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
73+
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB)
7374
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
7475
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
7576
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@
172172
"it-drain": "^2.0.0",
173173
"it-foreach": "^1.0.0",
174174
"it-map": "^2.0.0",
175+
"it-to-buffer": "^3.0.0",
175176
"p-defer": "^4.0.0",
176177
"random-int": "^3.0.0",
177178
"typescript": "^4.7.4"

src/decode.ts

+18-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { Source } from 'it-stream-types'
44
import type { Message } from './message-types.js'
55

66
export const MAX_MSG_SIZE = 1 << 20 // 1MB
7+
export const MAX_MSG_QUEUE_SIZE = 4 << 20 // 4MB
78

89
interface MessageHeader {
910
id: number
@@ -16,11 +17,13 @@ class Decoder {
1617
private readonly _buffer: Uint8ArrayList
1718
private _headerInfo: MessageHeader | null
1819
private readonly _maxMessageSize: number
20+
private readonly _maxUnprocessedMessageQueueSize: number
1921

20-
constructor (maxMessageSize: number = MAX_MSG_SIZE) {
22+
constructor (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
2123
this._buffer = new Uint8ArrayList()
2224
this._headerInfo = null
2325
this._maxMessageSize = maxMessageSize
26+
this._maxUnprocessedMessageQueueSize = maxUnprocessedMessageQueueSize
2427
}
2528

2629
write (chunk: Uint8Array) {
@@ -30,8 +33,8 @@ class Decoder {
3033

3134
this._buffer.append(chunk)
3235

33-
if (this._buffer.byteLength > this._maxMessageSize) {
34-
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
36+
if (this._buffer.byteLength > this._maxUnprocessedMessageQueueSize) {
37+
throw Object.assign(new Error('unprocessed message queue size too large!'), { code: 'ERR_MSG_QUEUE_TOO_BIG' })
3538
}
3639

3740
const msgs: Message[] = []
@@ -40,7 +43,11 @@ class Decoder {
4043
if (this._headerInfo == null) {
4144
try {
4245
this._headerInfo = this._decodeHeader(this._buffer)
43-
} catch (_) {
46+
} catch (err: any) {
47+
if (err.code === 'ERR_MSG_TOO_BIG') {
48+
throw err
49+
}
50+
4451
break // We haven't received enough data yet
4552
}
4653
}
@@ -90,6 +97,11 @@ class Decoder {
9097
throw new Error(`Invalid type received: ${type}`)
9198
}
9299

100+
// test message type varint + data length
101+
if (length > this._maxMessageSize) {
102+
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
103+
}
104+
93105
// @ts-expect-error h is a number not a CODE
94106
return { id: h >> 3, type, offset: offset + end, length }
95107
}
@@ -128,9 +140,9 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
128140
/**
129141
* Decode a chunk and yield an _array_ of decoded messages
130142
*/
131-
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
143+
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
132144
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
133-
const decoder = new Decoder(maxMessageSize)
145+
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)
134146

135147
for await (const chunk of source) {
136148
const msgs = decoder.write(chunk)

src/index.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,20 @@ export interface MplexInit {
55
/**
66
* The maximum size of message that can be sent in one go in bytes.
77
* Messages larger than this will be split into multiple smaller
8-
* messages (default: 1MB)
8+
* messages. If we receive a message larger than this an error will
9+
* be thrown and the connection closed. (default: 1MB)
910
*/
1011
maxMsgSize?: number
1112

13+
/**
14+
* Constrains the size of the unprocessed message queue buffer.
15+
* Before messages are deserialized, the raw bytes are buffered to ensure
16+
* we have the complete message to deserialized. If the queue gets longer
17+
* than this value an error will be thrown and the connection closed.
18+
* (default: 4MB)
19+
*/
20+
maxUnprocessedMessageQueueSize?: number
21+
1222
/**
1323
* The maximum number of multiplexed streams that can be open at any
1424
* one time. A request to open more than this will have a stream

src/mplex.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
203203
try {
204204
await pipe(
205205
source,
206-
decode(this._init.maxMsgSize),
206+
decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize),
207207
async source => {
208208
for await (const msg of source) {
209209
await this._handleIncoming(msg)

test/restrict-size.spec.ts

+64-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { Message, MessageTypes } from '../src/message-types.js'
1010
import { encode } from '../src/encode.js'
1111
import { decode } from '../src/decode.js'
1212
import { Uint8ArrayList } from 'uint8arraylist'
13+
import toBuffer from 'it-to-buffer'
1314

1415
describe('restrict size', () => {
1516
it('should throw when size is too big', async () => {
@@ -36,9 +37,10 @@ describe('restrict size', () => {
3637
)
3738
} catch (err: any) {
3839
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
39-
expect(output).to.have.length(2)
40+
expect(output).to.have.length(3)
4041
expect(output[0]).to.deep.equal(input[0])
4142
expect(output[1]).to.deep.equal(input[1])
43+
expect(output[2]).to.deep.equal(input[2])
4244
return
4345
}
4446
throw new Error('did not restrict size')
@@ -59,4 +61,65 @@ describe('restrict size', () => {
5961
)
6062
expect(output).to.deep.equal(input)
6163
})
64+
65+
it('should throw when unprocessed message queue size is too big', async () => {
66+
const maxMessageSize = 32
67+
const maxUnprocessedMessageQueueSize = 64
68+
69+
const input: Message[] = [
70+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
71+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
72+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
73+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
74+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
75+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
76+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
77+
]
78+
79+
const output: Message[] = []
80+
81+
try {
82+
await pipe(
83+
input,
84+
encode,
85+
async function * (source) {
86+
// make one big buffer
87+
yield toBuffer(source)
88+
},
89+
decode(maxMessageSize, maxUnprocessedMessageQueueSize),
90+
(source) => each(source, chunk => {
91+
output.push(chunk)
92+
}),
93+
async (source) => await drain(source)
94+
)
95+
} catch (err: any) {
96+
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
97+
expect(output).to.have.length(0)
98+
return
99+
}
100+
throw new Error('did not restrict size')
101+
})
102+
103+
it('should throw when unprocessed message queue size is too big because of garbage', async () => {
104+
const maxMessageSize = 32
105+
const maxUnprocessedMessageQueueSize = 64
106+
const input = randomBytes(maxUnprocessedMessageQueueSize + 1)
107+
const output: Message[] = []
108+
109+
try {
110+
await pipe(
111+
[input],
112+
decode(maxMessageSize, maxUnprocessedMessageQueueSize),
113+
(source) => each(source, chunk => {
114+
output.push(chunk)
115+
}),
116+
async (source) => await drain(source)
117+
)
118+
} catch (err: any) {
119+
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
120+
expect(output).to.have.length(0)
121+
return
122+
}
123+
throw new Error('did not restrict size')
124+
})
62125
})

0 commit comments

Comments
 (0)