Skip to content

Commit 156c76e

Browse files
subscribe-test: remove dependency on Node's EventEmitter (#2796)
1 parent 9305c04 commit 156c76e

File tree

4 files changed

+151
-170
lines changed

4 files changed

+151
-170
lines changed

src/subscription/__tests__/eventEmitterAsyncIterator.js

-64
This file was deleted.

src/subscription/__tests__/eventEmitterAsyncIterator-test.js src/subscription/__tests__/simplePubSub-test.js

+10-13
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
1-
import EventEmitter from 'events';
2-
31
import { expect } from 'chai';
42
import { describe, it } from 'mocha';
53

6-
import eventEmitterAsyncIterator from './eventEmitterAsyncIterator';
4+
import SimplePubSub from './simplePubSub';
75

8-
describe('eventEmitterAsyncIterator', () => {
6+
describe('SimplePubSub', () => {
97
it('subscribe async-iterator mock', async () => {
10-
// Create an AsyncGenerator from an EventEmitter
11-
const emitter = new EventEmitter();
12-
const iterator = eventEmitterAsyncIterator(emitter, 'publish');
8+
const pubsub = new SimplePubSub();
9+
const iterator = pubsub.getSubscriber();
1310

1411
// Queue up publishes
15-
expect(emitter.emit('publish', 'Apple')).to.equal(true);
16-
expect(emitter.emit('publish', 'Banana')).to.equal(true);
12+
expect(pubsub.emit('Apple')).to.equal(true);
13+
expect(pubsub.emit('Banana')).to.equal(true);
1714

1815
// Read payloads
1916
expect(await iterator.next()).to.deep.equal({
@@ -30,8 +27,8 @@ describe('eventEmitterAsyncIterator', () => {
3027
const i4 = iterator.next().then((x) => x);
3128

3229
// Publish
33-
expect(emitter.emit('publish', 'Coconut')).to.equal(true);
34-
expect(emitter.emit('publish', 'Durian')).to.equal(true);
30+
expect(pubsub.emit('Coconut')).to.equal(true);
31+
expect(pubsub.emit('Durian')).to.equal(true);
3532

3633
// Await out of order to get correct results
3734
expect(await i4).to.deep.equal({ done: false, value: 'Durian' });
@@ -40,11 +37,11 @@ describe('eventEmitterAsyncIterator', () => {
4037
// Read ahead
4138
const i5 = iterator.next().then((x) => x);
4239

43-
// Terminate emitter
40+
// Terminate queue
4441
await iterator.return();
4542

4643
// Publish is not caught after terminate
47-
expect(emitter.emit('publish', 'Fig')).to.equal(false);
44+
expect(pubsub.emit('Fig')).to.equal(false);
4845

4946
// Find that cancelled read-ahead got a "done" result
5047
expect(await i5).to.deep.equal({ done: true, value: undefined });
+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
3+
* PubSub system for tests.
4+
*/
5+
export default class SimplePubSub<T> {
6+
_subscribers: Set<(T) => void>;
7+
8+
constructor() {
9+
this._subscribers = new Set();
10+
}
11+
12+
emit(event: T): boolean {
13+
for (const subscriber of this._subscribers) {
14+
subscriber(event);
15+
}
16+
return this._subscribers.size > 0;
17+
}
18+
19+
getSubscriber<R>(transform?: (T) => R): AsyncGenerator<R, void, void> {
20+
const pullQueue = [];
21+
const pushQueue = [];
22+
let listening = true;
23+
this._subscribers.add(pushValue);
24+
25+
const emptyQueue = () => {
26+
listening = false;
27+
this._subscribers.delete(pushValue);
28+
for (const resolve of pullQueue) {
29+
resolve({ value: undefined, done: true });
30+
}
31+
pullQueue.length = 0;
32+
pushQueue.length = 0;
33+
};
34+
35+
/* TODO: Flow doesn't support symbols as keys:
36+
https://github.com/facebook/flow/issues/3258 */
37+
return ({
38+
next() {
39+
if (!listening) {
40+
return Promise.resolve({ value: undefined, done: true });
41+
}
42+
43+
if (pushQueue.length > 0) {
44+
return Promise.resolve({ value: pushQueue.shift(), done: false });
45+
}
46+
return new Promise((resolve) => pullQueue.push(resolve));
47+
},
48+
return() {
49+
emptyQueue();
50+
return Promise.resolve({ value: undefined, done: true });
51+
},
52+
throw(error: mixed) {
53+
emptyQueue();
54+
return Promise.reject(error);
55+
},
56+
[Symbol.asyncIterator]() {
57+
return this;
58+
},
59+
}: any);
60+
61+
function pushValue(event: T): void {
62+
const value = transform != null ? transform(event) : event;
63+
if (pullQueue.length > 0) {
64+
pullQueue.shift()({ value, done: false });
65+
} else {
66+
pushQueue.push(value);
67+
}
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)