Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit 0f60832

Browse files
authored
feat: add onStreamEnd, muxer.streams and timeline (#56)
BREAKING CHANGE: This adds new validations to the stream muxer, which will cause existing tests to fail.
1 parent d908f8a commit 0f60832

File tree

2 files changed

+73
-6
lines changed

2 files changed

+73
-6
lines changed

README.md

+29
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ pipe(conn, muxer, conn) // conn is duplex connection to another peer
105105
```js
106106
new Mplex(stream => { /* ... */ })
107107
```
108+
* `onStreamEnd` - A function called when a stream ends.
109+
```js
110+
// Get notified when a stream has ended
111+
const onStreamEnd = stream => {
112+
// Manage any tracking changes, etc
113+
}
114+
const muxer = new Muxer({ onStreamEnd, ... })
115+
```
108116
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
109117
```js
110118
const controller = new AbortController()
@@ -126,6 +134,16 @@ const muxer = new Muxer()
126134
muxer.onStream = stream => { /* ... */ }
127135
```
128136

137+
#### `muxer.onStreamEnd`
138+
139+
Use this property as an alternative to passing `onStreamEnd` as an option to the `Muxer` constructor.
140+
141+
```js
142+
const muxer = new Muxer()
143+
// ...later
144+
muxer.onStreamEnd = stream => { /* ... */ }
145+
```
146+
129147
#### `const stream = muxer.newStream([options])`
130148

131149
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
@@ -140,6 +158,17 @@ const stream = muxer.newStream()
140158
pipe([1, 2, 3], stream, consume)
141159
```
142160

161+
#### `const streams = muxer.streams`
162+
163+
The streams property returns an array of streams the muxer currently has open. Closed streams will not be returned.
164+
165+
```js
166+
muxer.streams.map(stream => {
167+
// Log out the stream's id
168+
console.log(stream.id)
169+
})
170+
```
171+
143172
### Go
144173

145174
#### Attach muxer to a Connection

src/base-test.js

+44-6
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,28 @@ const pair = require('it-pair/duplex')
88
const pipe = require('it-pipe')
99
const { collect, map, consume } = require('streaming-iterables')
1010

11+
function close (stream) {
12+
return pipe([], stream, consume)
13+
}
14+
1115
async function closeAndWait (stream) {
12-
await pipe([], stream, consume)
16+
await close(stream)
1317
expect(true).to.be.true.mark()
1418
}
1519

20+
/**
21+
* A tick is considered valid if it happened between now
22+
* and `ms` milliseconds ago
23+
* @param {number} date Time in ticks
24+
* @param {number} ms max milliseconds that should have expired
25+
* @returns {boolean}
26+
*/
27+
function isValidTick (date, ms = 5000) {
28+
const now = Date.now()
29+
if (date > now - ms && date <= now) return true
30+
return false
31+
}
32+
1633
module.exports = (common) => {
1734
describe('base', () => {
1835
let Muxer
@@ -25,25 +42,44 @@ module.exports = (common) => {
2542
const p = pair()
2643
const dialer = new Muxer()
2744

28-
const listener = new Muxer(stream => {
29-
expect(stream).to.exist.mark()
30-
closeAndWait(stream)
45+
const listener = new Muxer({
46+
onStream: stream => {
47+
expect(stream).to.exist.mark() // 1st check
48+
expect(isValidTick(stream.timeline.open)).to.equal(true)
49+
// Make sure the stream is being tracked
50+
expect(listener.streams).to.include(stream)
51+
close(stream)
52+
},
53+
onStreamEnd: stream => {
54+
expect(stream).to.exist.mark() // 2nd check
55+
expect(listener.streams).to.not.include(stream)
56+
// Make sure the stream is removed from tracking
57+
expect(isValidTick(stream.timeline.close)).to.equal(true)
58+
}
3159
})
3260

3361
pipe(p[0], dialer, p[0])
3462
pipe(p[1], listener, p[1])
3563

36-
expect(3).checks(done)
64+
expect(3).checks(() => {
65+
// ensure we have no streams left
66+
expect(dialer.streams).to.have.length(0)
67+
expect(listener.streams).to.have.length(0)
68+
done()
69+
})
3770

3871
const conn = dialer.newStream()
72+
expect(dialer.streams).to.include(conn)
73+
expect(isValidTick(conn.timeline.open)).to.equal(true)
3974

40-
closeAndWait(conn)
75+
closeAndWait(conn) // 3rd check
4176
})
4277

4378
it('Open a stream from the listener', (done) => {
4479
const p = pair()
4580
const dialer = new Muxer(stream => {
4681
expect(stream).to.exist.mark()
82+
expect(isValidTick(stream.timeline.open)).to.equal(true)
4783
closeAndWait(stream)
4884
})
4985
const listener = new Muxer()
@@ -54,6 +90,8 @@ module.exports = (common) => {
5490
expect(3).check(done)
5591

5692
const conn = listener.newStream()
93+
expect(listener.streams).to.include(conn)
94+
expect(isValidTick(conn.timeline.open)).to.equal(true)
5795

5896
closeAndWait(conn)
5997
})

0 commit comments

Comments
 (0)