Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 279ad47

Browse files
authored
fix: apply message size limit before decoding message (#231)
* fix: apply message size limit before decoding message If we apply the message size limit after decoding the message it's too late as we've already processed the bad message. Instead, if the buffer full of unprocessed messages grows to be large than the max message size (e.g. we have not recieved a complete message under the size limit), throw an error which will cause the stream to be reset. * fix: add implementation
1 parent 9b120c9 commit 279ad47

7 files changed

+43
-70
lines changed

src/decode.ts

+18-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { Uint8ArrayList } from 'uint8arraylist'
33
import type { Source } from 'it-stream-types'
44
import type { Message } from './message-types.js'
55

6+
export const MAX_MSG_SIZE = 1 << 20 // 1MB
7+
68
interface MessageHeader {
79
id: number
810
type: keyof typeof MessageTypeNames
@@ -13,10 +15,12 @@ interface MessageHeader {
1315
class Decoder {
1416
private readonly _buffer: Uint8ArrayList
1517
private _headerInfo: MessageHeader | null
18+
private readonly _maxMessageSize: number
1619

17-
constructor () {
20+
constructor (maxMessageSize: number = MAX_MSG_SIZE) {
1821
this._buffer = new Uint8ArrayList()
1922
this._headerInfo = null
23+
this._maxMessageSize = maxMessageSize
2024
}
2125

2226
write (chunk: Uint8Array) {
@@ -25,6 +29,11 @@ class Decoder {
2529
}
2630

2731
this._buffer.append(chunk)
32+
33+
if (this._buffer.byteLength > this._maxMessageSize) {
34+
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
35+
}
36+
2837
const msgs: Message[] = []
2938

3039
while (this._buffer.length !== 0) {
@@ -119,14 +128,16 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
119128
/**
120129
* Decode a chunk and yield an _array_ of decoded messages
121130
*/
122-
export async function * decode (source: Source<Uint8Array>) {
123-
const decoder = new Decoder()
131+
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
132+
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
133+
const decoder = new Decoder(maxMessageSize)
124134

125-
for await (const chunk of source) {
126-
const msgs = decoder.write(chunk)
135+
for await (const chunk of source) {
136+
const msgs = decoder.write(chunk)
127137

128-
if (msgs.length > 0) {
129-
yield msgs
138+
if (msgs.length > 0) {
139+
yield * msgs
140+
}
130141
}
131142
}
132143
}

src/mplex.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { pushableV } from 'it-pushable'
33
import { abortableSource } from 'abortable-iterator'
44
import { encode } from './encode.js'
55
import { decode } from './decode.js'
6-
import { restrictSize } from './restrict-size.js'
76
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
87
import { createStream } from './stream.js'
98
import { toString as uint8ArrayToString } from 'uint8arrays'
@@ -204,8 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
204203
try {
205204
await pipe(
206205
source,
207-
decode,
208-
restrictSize(this._init.maxMsgSize),
206+
decode(this._init.maxMsgSize),
209207
async source => {
210208
for await (const msg of source) {
211209
await this._handleIncoming(msg)

src/restrict-size.ts

-36
This file was deleted.

src/stream.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { abortableSource } from 'abortable-iterator'
22
import { pushable } from 'it-pushable'
33
import errCode from 'err-code'
4-
import { MAX_MSG_SIZE } from './restrict-size.js'
4+
import { MAX_MSG_SIZE } from './decode.js'
55
import { anySignal } from 'any-signal'
66
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
77
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'

test/coder.spec.ts

+6-9
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ describe('coder', () => {
2323

2424
it('should decode header', async () => {
2525
const source = [uint8ArrayFromString('8801023137', 'base16')]
26-
for await (const msgs of decode(source)) {
27-
expect(msgs.length).to.equal(1)
28-
29-
expect(messageWithBytes(msgs[0])).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
26+
for await (const msg of decode()(source)) {
27+
expect(messageWithBytes(msg)).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
3028
}
3129
})
3230

@@ -67,8 +65,8 @@ describe('coder', () => {
6765
const source = [uint8ArrayFromString('88010231379801023139a801023231', 'base16')]
6866

6967
const res = []
70-
for await (const msgs of decode(source)) {
71-
res.push(...msgs)
68+
for await (const msg of decode()(source)) {
69+
res.push(msg)
7270
}
7371

7472
expect(res.map(messageWithBytes)).to.deep.equal([
@@ -89,9 +87,8 @@ describe('coder', () => {
8987
it('should decode zero length body msg', async () => {
9088
const source = [uint8ArrayFromString('880100', 'base16')]
9189

92-
for await (const msgs of decode(source)) {
93-
expect(msgs.length).to.equal(1)
94-
expect(messageWithBytes(msgs[0])).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
90+
for await (const msg of decode()(source)) {
91+
expect(messageWithBytes(msg)).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
9592
}
9693
})
9794
})

test/mplex.spec.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ describe('mplex', () => {
7676

7777
await muxer.sink(stream)
7878

79-
const messages = await all(decode(bufs))
79+
const messages = await all(decode()(bufs))
8080

81-
expect(messages).to.have.nested.property('[0][0].id', 11, 'Did not specify the correct stream id')
82-
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
81+
expect(messages).to.have.nested.property('[0].id', 11, 'Did not specify the correct stream id')
82+
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
8383
})
8484

8585
it('should reset a stream that fills the message buffer', async () => {
@@ -103,7 +103,7 @@ describe('mplex', () => {
103103
const dataMessage: MessageInitiatorMessage = {
104104
id,
105105
type: MessageTypes.MESSAGE_INITIATOR,
106-
data: new Uint8ArrayList(new Uint8Array(1024 * 1024))
106+
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
107107
}
108108
yield dataMessage
109109

@@ -144,9 +144,9 @@ describe('mplex', () => {
144144

145145
// collect outgoing mplex messages
146146
const muxerFinished = pDefer()
147-
let messages: Message[][] = []
147+
let messages: Message[] = []
148148
void Promise.resolve().then(async () => {
149-
messages = await all(decode(muxer.source))
149+
messages = await all(decode()(muxer.source))
150150
muxerFinished.resolve()
151151
})
152152

@@ -159,7 +159,7 @@ describe('mplex', () => {
159159

160160
// should have sent reset message to peer for this stream
161161
await muxerFinished.promise
162-
expect(messages).to.have.nested.property('[0][0].id', id)
163-
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER)
162+
expect(messages).to.have.nested.property('[0].id', id)
163+
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
164164
})
165165
})

test/restrict-size.spec.ts

+9-6
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,28 @@ import all from 'it-all'
77
import drain from 'it-drain'
88
import each from 'it-foreach'
99
import { Message, MessageTypes } from '../src/message-types.js'
10-
import { restrictSize } from '../src/restrict-size.js'
10+
import { encode } from '../src/encode.js'
11+
import { decode } from '../src/decode.js'
1112
import { Uint8ArrayList } from 'uint8arraylist'
1213

13-
describe('restrict-size', () => {
14+
describe('restrict size', () => {
1415
it('should throw when size is too big', async () => {
1516
const maxSize = 32
1617

1718
const input: Message[] = [
1819
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
20+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
1921
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
20-
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) },
21-
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
22+
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }
2223
]
2324

2425
const output: Message[] = []
2526

2627
try {
2728
await pipe(
2829
input,
29-
restrictSize(maxSize),
30+
encode,
31+
decode(maxSize),
3032
(source) => each(source, chunk => {
3133
output.push(chunk)
3234
}),
@@ -51,7 +53,8 @@ describe('restrict-size', () => {
5153

5254
const output = await pipe(
5355
input,
54-
restrictSize(32),
56+
encode,
57+
decode(32),
5558
async (source) => await all(source)
5659
)
5760
expect(output).to.deep.equal(input)

0 commit comments

Comments
 (0)