Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 91d2e8c

Browse files
committed
fix: apply message size limit before decoding message
If we apply the message size limit after decoding the message it's too late as we've already processed the bad message. Instead, if the buffer full of unprocessed messages grows to be large than the max message size (e.g. we have not recieved a complete message under the size limit), throw an error which will cause the stream to be reset.
1 parent c813bae commit 91d2e8c

7 files changed

+43
-74
lines changed

src/decode.ts

+16-9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { Uint8ArrayList } from 'uint8arraylist'
33
import type { Source } from 'it-stream-types'
44
import type { Message } from './message-types.js'
55

6+
export const MAX_MSG_SIZE = Math.pow(2, 20)
7+
68
interface MessageHeader {
79
id: number
810
type: keyof typeof MessageTypeNames
@@ -13,10 +15,12 @@ interface MessageHeader {
1315
class Decoder {
1416
private readonly _buffer: Uint8ArrayList
1517
private _headerInfo: MessageHeader | null
18+
private readonly _maxMessageSize: number
1619

17-
constructor () {
20+
constructor (maxMessageSize: number) {
1821
this._buffer = new Uint8ArrayList()
1922
this._headerInfo = null
23+
this._maxMessageSize = maxMessageSize
2024
}
2125

2226
write (chunk: Uint8Array) {
@@ -25,6 +29,11 @@ class Decoder {
2529
}
2630

2731
this._buffer.append(chunk)
32+
33+
if (this._buffer.byteLength > this._maxMessageSize) {
34+
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
35+
}
36+
2837
const msgs: Message[] = []
2938

3039
while (this._buffer.length !== 0) {
@@ -117,16 +126,14 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
117126
}
118127

119128
/**
120-
* Decode a chunk and yield an _array_ of decoded messages
129+
* Decode a chunk and yield decoded messages
121130
*/
122-
export async function * decode (source: Source<Uint8Array>) {
123-
const decoder = new Decoder()
124-
125-
for await (const chunk of source) {
126-
const msgs = decoder.write(chunk)
131+
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
132+
return async function * decodeMessages (source: Source<Uint8Array>) {
133+
const decoder = new Decoder(maxMessageSize)
127134

128-
if (msgs.length > 0) {
129-
yield msgs
135+
for await (const chunk of source) {
136+
yield * decoder.write(chunk)
130137
}
131138
}
132139
}

src/mplex.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { pushableV } from 'it-pushable'
33
import { abortableSource } from 'abortable-iterator'
44
import { encode } from './encode.js'
55
import { decode } from './decode.js'
6-
import { restrictSize } from './restrict-size.js'
76
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
87
import { createStream } from './stream.js'
98
import { toString as uint8ArrayToString } from 'uint8arrays'
@@ -204,8 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
204203
try {
205204
await pipe(
206205
source,
207-
decode,
208-
restrictSize(this._init.maxMsgSize),
206+
decode(this._init.maxMsgSize),
209207
async source => {
210208
for await (const msg of source) {
211209
await this._handleIncoming(msg)

src/restrict-size.ts

-36
This file was deleted.

src/stream.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { abortableSource } from 'abortable-iterator'
22
import { pushable } from 'it-pushable'
33
import errCode from 'err-code'
4-
import { MAX_MSG_SIZE } from './restrict-size.js'
4+
import { MAX_MSG_SIZE } from './decode.js'
55
import { anySignal } from 'any-signal'
66
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
77
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'

test/coder.spec.ts

+6-9
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ describe('coder', () => {
2323

2424
it('should decode header', async () => {
2525
const source = [uint8ArrayFromString('8801023137', 'base16')]
26-
for await (const msgs of decode(source)) {
27-
expect(msgs.length).to.equal(1)
28-
29-
expect(messageWithBytes(msgs[0])).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
26+
for await (const msg of decode()(source)) {
27+
expect(messageWithBytes(msg)).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
3028
}
3129
})
3230

@@ -67,8 +65,8 @@ describe('coder', () => {
6765
const source = [uint8ArrayFromString('88010231379801023139a801023231', 'base16')]
6866

6967
const res = []
70-
for await (const msgs of decode(source)) {
71-
res.push(...msgs)
68+
for await (const msg of decode()(source)) {
69+
res.push(msg)
7270
}
7371

7472
expect(res.map(messageWithBytes)).to.deep.equal([
@@ -89,9 +87,8 @@ describe('coder', () => {
8987
it('should decode zero length body msg', async () => {
9088
const source = [uint8ArrayFromString('880100', 'base16')]
9189

92-
for await (const msgs of decode(source)) {
93-
expect(msgs.length).to.equal(1)
94-
expect(messageWithBytes(msgs[0])).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
90+
for await (const msg of decode()(source)) {
91+
expect(messageWithBytes(msg)).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
9592
}
9693
})
9794
})

test/mplex.spec.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ describe('mplex', () => {
7676

7777
await muxer.sink(stream)
7878

79-
const messages = await all(decode(bufs))
79+
const messages = await all(decode()(bufs))
8080

81-
expect(messages).to.have.nested.property('[0][0].id', 11, 'Did not specify the correct stream id')
82-
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
81+
expect(messages).to.have.nested.property('[0].id', 11, 'Did not specify the correct stream id')
82+
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
8383
})
8484

8585
it('should reset a stream that fills the message buffer', async () => {
@@ -103,7 +103,7 @@ describe('mplex', () => {
103103
const dataMessage: MessageInitiatorMessage = {
104104
id,
105105
type: MessageTypes.MESSAGE_INITIATOR,
106-
data: new Uint8ArrayList(new Uint8Array(1024 * 1024))
106+
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
107107
}
108108
yield dataMessage
109109

@@ -144,9 +144,9 @@ describe('mplex', () => {
144144

145145
// collect outgoing mplex messages
146146
const muxerFinished = pDefer()
147-
let messages: Message[][] = []
147+
let messages: Message[] = []
148148
void Promise.resolve().then(async () => {
149-
messages = await all(decode(muxer.source))
149+
messages = await all(decode()(muxer.source))
150150
muxerFinished.resolve()
151151
})
152152

@@ -159,7 +159,7 @@ describe('mplex', () => {
159159

160160
// should have sent reset message to peer for this stream
161161
await muxerFinished.promise
162-
expect(messages).to.have.nested.property('[0][0].id', id)
163-
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER)
162+
expect(messages).to.have.nested.property('[0].id', id)
163+
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
164164
})
165165
})

test/restrict-size.spec.ts

+11-8
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,30 @@ import all from 'it-all'
77
import drain from 'it-drain'
88
import each from 'it-foreach'
99
import { Message, MessageTypes } from '../src/message-types.js'
10-
import { restrictSize } from '../src/restrict-size.js'
1110
import { Uint8ArrayList } from 'uint8arraylist'
11+
import { decode } from '../src/decode.js'
12+
import { encode } from '../src/encode.js'
1213

13-
describe('restrict-size', () => {
14+
describe('restrict size', () => {
1415
it('should throw when size is too big', async () => {
1516
const maxSize = 32
1617

1718
const input: Message[] = [
1819
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
20+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
1921
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
20-
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) },
21-
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
22+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(10)) }
2223
]
2324

2425
const output: Message[] = []
2526

2627
try {
2728
await pipe(
2829
input,
29-
restrictSize(maxSize),
30-
(source) => each(source, chunk => {
31-
output.push(chunk)
30+
encode,
31+
decode(maxSize),
32+
(source) => each(source, msg => {
33+
output.push(msg)
3234
}),
3335
async (source) => await drain(source)
3436
)
@@ -51,7 +53,8 @@ describe('restrict-size', () => {
5153

5254
const output = await pipe(
5355
input,
54-
restrictSize(32),
56+
encode,
57+
decode(32),
5558
async (source) => await all(source)
5659
)
5760
expect(output).to.deep.equal(input)

0 commit comments

Comments
 (0)