Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit c06da3b

Browse files
dignifiedquiredaviddias
authored andcommitted
feat(tests): add closing tests, make sure errors are propagated
1 parent 5069679 commit c06da3b

File tree

3 files changed

+169
-2
lines changed

3 files changed

+169
-2
lines changed

package.json

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "interface-stream-muxer",
33
"version": "0.3.1",
44
"description": "A test suite and interface you can use to implement a stream muxer.",
5-
"main": "lib/index.js",
5+
"main": "src/index.js",
66
"jsnext:main": "src/index.js",
77
"scripts": {
88
"test": "exit(0)",
@@ -34,9 +34,13 @@
3434
"async": "^2.0.1",
3535
"chai": "^3.5.0",
3636
"chai-checkmark": "^1.0.1",
37+
"libp2p-tcp": "^0.8.1",
38+
"multiaddr": "^2.0.2",
3739
"pull-generate": "^2.2.0",
3840
"pull-pair": "^1.1.0",
39-
"pull-stream": "^3.4.3"
41+
"pull-stream": "^3.4.3",
42+
"run-parallel": "^1.1.6",
43+
"run-series": "^1.1.4"
4044
},
4145
"devDependencies": {
4246
"aegir": "^6.0.1"

src/close-test.js

+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/* eslint-env mocha */
2+
'use strict'
3+
4+
const chai = require('chai')
5+
chai.use(require('chai-checkmark'))
6+
const expect = chai.expect
7+
const pair = require('pull-pair/duplex')
8+
const pull = require('pull-stream')
9+
const parallel = require('run-parallel')
10+
const series = require('run-series')
11+
const Tcp = require('libp2p-tcp')
12+
const multiaddr = require('multiaddr')
13+
14+
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090')
15+
16+
function closeAndWait (stream) {
17+
pull(
18+
pull.empty(),
19+
stream,
20+
pull.onEnd((err) => {
21+
expect(err).to.not.exist.mark()
22+
})
23+
)
24+
}
25+
26+
module.exports = (common) => {
27+
describe.only('close', () => {
28+
let muxer
29+
30+
beforeEach((done) => {
31+
common.setup((err, _muxer) => {
32+
if (err) return done(err)
33+
muxer = _muxer
34+
done()
35+
})
36+
})
37+
38+
it('closing underlying closes streams (tcp)', (done) => {
39+
expect(2).checks(done)
40+
41+
const tcp = new Tcp()
42+
const tcpListener = tcp.createListener((socket) => {
43+
const listener = muxer.listen(socket)
44+
listener.on('stream', (stream) => {
45+
pull(stream, stream)
46+
})
47+
})
48+
49+
tcpListener.listen(mh, () => {
50+
const dialer = muxer.dial(tcp.dial(mh, () => {
51+
tcpListener.close()
52+
}))
53+
54+
const s1 = dialer.newStream(() => {
55+
pull(
56+
s1,
57+
pull.onEnd((err) => {
58+
expect(err).to.exist.mark()
59+
})
60+
)
61+
62+
const s2 = dialer.newStream(() => {
63+
pull(
64+
s2,
65+
pull.onEnd((err) => {
66+
expect(err).to.exist.mark()
67+
})
68+
)
69+
})
70+
})
71+
})
72+
})
73+
74+
it('closing one of the muxed streams doesn\'t close others', (done) => {
75+
const p = pair()
76+
const dialer = muxer.dial(p[0])
77+
const listener = muxer.listen(p[1])
78+
79+
expect(6).checks(done)
80+
81+
const conns = []
82+
83+
listener.on('stream', (stream) => {
84+
expect(stream).to.exist.mark()
85+
pull(stream, stream)
86+
})
87+
88+
for (let i = 0; i < 5; i++) {
89+
conns.push(dialer.newStream())
90+
}
91+
92+
conns.forEach((conn, i) => {
93+
if (i === 2) {
94+
closeAndWait(conn)
95+
} else {
96+
pull(
97+
conn,
98+
pull.onEnd(() => {
99+
throw new Error('should not end')
100+
})
101+
)
102+
}
103+
})
104+
})
105+
106+
it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => {
107+
const p = pair()
108+
const dialer = muxer.dial(p[0])
109+
const listener = muxer.listen(p[1])
110+
111+
expect(15).checks(done)
112+
113+
const conns = []
114+
const count = []
115+
for (let i = 0; i < 5; i++) {
116+
count.push(i)
117+
}
118+
119+
series(count.map((i) => (cb) => {
120+
parallel([
121+
(cb) => listener.once('stream', (stream) => {
122+
console.log('pipe')
123+
expect(stream).to.exist.mark()
124+
pull(stream, stream)
125+
cb()
126+
}),
127+
(cb) => conns.push(dialer.newStream(cb))
128+
], cb)
129+
}), (err) => {
130+
if (err) return done(err)
131+
132+
conns.forEach((conn, i) => {
133+
pull(
134+
pull.values([Buffer('hello')]),
135+
pull.asyncMap((val, cb) => {
136+
setTimeout(() => {
137+
cb(null, val)
138+
}, i * 10)
139+
}),
140+
pull.through((val) => console.log('send', val)),
141+
conn,
142+
pull.through((val) => console.log('recv', val)),
143+
pull.collect((err, data) => {
144+
console.log('end', i)
145+
expect(err).to.not.exist.mark()
146+
expect(data).to.be.eql([Buffer('hello')]).mark()
147+
})
148+
)
149+
})
150+
151+
listener.on('close', () => {
152+
console.log('closed listener')
153+
})
154+
155+
dialer.end(() => {
156+
console.log('CLOSED')
157+
})
158+
})
159+
})
160+
})
161+
}

src/index.js

+2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
'use strict'
33

44
const baseTest = require('./base-test')
5+
const closeTest = require('./close-test')
56
const stressTest = require('./stress-test')
67
const megaStressTest = require('./mega-stress-test')
78

89
module.exports = (common) => {
910
describe('interface-stream-muxer', () => {
1011
baseTest(common)
12+
closeTest(common)
1113
stressTest(common)
1214
megaStressTest(common)
1315
})

0 commit comments

Comments
 (0)