Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 46313a8

Browse files
authored
fix: use events to delay before self-query (#478)
Instead of debouncing and using timeouts to wait for DHT peers before running the initial self-query, instead get the routing table to emit events when peers are added or removed - if the table is empty when we run the self-query, wait for the `peer:add` event before continuing. Improves startup time. Adds tests for the query-self component.
1 parent b70076a commit 46313a8

File tree

6 files changed

+236
-92
lines changed

6 files changed

+236
-92
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
"it-take": "^3.0.1",
182182
"multiformats": "^11.0.0",
183183
"p-defer": "^4.0.0",
184+
"p-event": "^5.0.1",
184185
"p-queue": "^7.3.4",
185186
"private-ip": "^3.0.0",
186187
"progress-events": "^1.0.0",

src/kad-dht.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,11 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
285285
this.queryManager.start(),
286286
this.network.start(),
287287
this.routingTable.start(),
288-
this.topologyListener.start(),
289-
this.querySelf.start()
288+
this.topologyListener.start()
290289
])
291290

291+
this.querySelf.start()
292+
292293
await this.routingTableRefresh.start()
293294
}
294295

@@ -299,14 +300,15 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
299300
async stop (): Promise<void> {
300301
this.running = false
301302

303+
this.querySelf.stop()
304+
302305
await Promise.all([
303306
this.providers.stop(),
304307
this.queryManager.stop(),
305308
this.network.stop(),
306309
this.routingTable.stop(),
307310
this.routingTableRefresh.stop(),
308-
this.topologyListener.stop(),
309-
this.querySelf.stop()
311+
this.topologyListener.stop()
310312
])
311313
}
312314

src/query-self.ts

+63-85
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import { anySignal } from 'any-signal'
44
import length from 'it-length'
55
import { pipe } from 'it-pipe'
66
import take from 'it-take'
7+
import pDefer from 'p-defer'
8+
import { pEvent } from 'p-event'
79
import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K, QUERY_SELF_INITIAL_INTERVAL } from './constants.js'
8-
import type { KadDHTComponents } from './index.js'
910
import type { PeerRouting } from './peer-routing/index.js'
1011
import type { RoutingTable } from './routing-table/index.js'
12+
import type { PeerId } from '@libp2p/interface-peer-id'
1113
import type { Startable } from '@libp2p/interfaces/startable'
1214
import type { DeferredPromise } from 'p-defer'
1315

@@ -22,44 +24,33 @@ export interface QuerySelfInit {
2224
initialQuerySelfHasRun: DeferredPromise<void>
2325
}
2426

25-
function debounce (func: () => void, wait: number): () => void {
26-
let timeout: ReturnType<typeof setTimeout> | undefined
27-
28-
return function () {
29-
const later = function (): void {
30-
timeout = undefined
31-
func()
32-
}
33-
34-
clearTimeout(timeout)
35-
timeout = setTimeout(later, wait)
36-
}
27+
export interface QuerySelfComponents {
28+
peerId: PeerId
3729
}
3830

3931
/**
4032
* Receives notifications of new peers joining the network that support the DHT protocol
4133
*/
4234
export class QuerySelf implements Startable {
4335
private readonly log: Logger
44-
private readonly components: KadDHTComponents
36+
private readonly components: QuerySelfComponents
4537
private readonly peerRouting: PeerRouting
4638
private readonly routingTable: RoutingTable
4739
private readonly count: number
4840
private readonly interval: number
4941
private readonly initialInterval: number
5042
private readonly queryTimeout: number
5143
private started: boolean
52-
private running: boolean
5344
private timeoutId?: NodeJS.Timer
5445
private controller?: AbortController
5546
private initialQuerySelfHasRun?: DeferredPromise<void>
47+
private querySelfPromise?: DeferredPromise<void>
5648

57-
constructor (components: KadDHTComponents, init: QuerySelfInit) {
49+
constructor (components: QuerySelfComponents, init: QuerySelfInit) {
5850
const { peerRouting, lan, count, interval, queryTimeout, routingTable } = init
5951

6052
this.components = components
6153
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`)
62-
this.running = false
6354
this.started = false
6455
this.peerRouting = peerRouting
6556
this.routingTable = routingTable
@@ -68,25 +59,28 @@ export class QuerySelf implements Startable {
6859
this.initialInterval = init.initialInterval ?? QUERY_SELF_INITIAL_INTERVAL
6960
this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT
7061
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun
71-
72-
this.querySelf = debounce(this.querySelf.bind(this), 100)
7362
}
7463

7564
isStarted (): boolean {
7665
return this.started
7766
}
7867

79-
async start (): Promise<void> {
68+
start (): void {
8069
if (this.started) {
8170
return
8271
}
8372

8473
this.started = true
8574
clearTimeout(this.timeoutId)
86-
this.timeoutId = setTimeout(this.querySelf.bind(this), this.initialInterval)
75+
this.timeoutId = setTimeout(() => {
76+
this.querySelf()
77+
.catch(err => {
78+
this.log.error('error running self-query', err)
79+
})
80+
}, this.initialInterval)
8781
}
8882

89-
async stop (): Promise<void> {
83+
stop (): void {
9084
this.started = false
9185

9286
if (this.timeoutId != null) {
@@ -98,84 +92,68 @@ export class QuerySelf implements Startable {
9892
}
9993
}
10094

101-
querySelf (): void {
95+
async querySelf (): Promise<void> {
10296
if (!this.started) {
10397
this.log('skip self-query because we are not started')
10498
return
10599
}
106100

107-
if (this.running) {
108-
this.log('skip self-query because we are already running, will run again in %dms', this.interval)
109-
return
101+
if (this.querySelfPromise != null) {
102+
this.log('joining existing self query')
103+
return this.querySelfPromise.promise
110104
}
111105

112-
if (this.routingTable.size === 0) {
113-
let nextInterval = this.interval
106+
this.querySelfPromise = pDefer()
114107

115-
if (this.initialQuerySelfHasRun != null) {
116-
// if we've not yet run the first self query, shorten the interval until we try again
117-
nextInterval = this.initialInterval
118-
}
119-
120-
this.log('skip self-query because routing table is empty, will run again in %dms', nextInterval)
121-
clearTimeout(this.timeoutId)
122-
this.timeoutId = setTimeout(this.querySelf.bind(this), nextInterval)
123-
return
108+
if (this.routingTable.size === 0) {
109+
// wait to discover at least one DHT peer
110+
await pEvent(this.routingTable, 'peer:add')
124111
}
125112

126-
this.running = true
113+
if (this.started) {
114+
this.controller = new AbortController()
115+
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])
127116

128-
Promise.resolve()
129-
.then(async () => {
130-
if (!this.started) {
131-
this.log('not running self-query - node stopped before query started')
132-
return
117+
// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
118+
try {
119+
if (setMaxListeners != null) {
120+
setMaxListeners(Infinity, signal)
133121
}
122+
} catch {} // fails on node < 15.4
134123

135-
this.controller = new AbortController()
136-
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])
137-
138-
// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
139-
try {
140-
if (setMaxListeners != null) {
141-
setMaxListeners(Infinity, signal)
142-
}
143-
} catch {} // fails on node < 15.4
144-
145-
try {
146-
this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)
147-
148-
const found = await pipe(
149-
this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), {
150-
signal,
151-
isSelfQuery: true
152-
}),
153-
(source) => take(source, this.count),
154-
async (source) => length(source)
155-
)
156-
157-
this.log('self-query ran successfully - found %d peers', found)
158-
159-
if (this.initialQuerySelfHasRun != null) {
160-
this.initialQuerySelfHasRun.resolve()
161-
this.initialQuerySelfHasRun = undefined
162-
}
163-
} catch (err: any) {
164-
this.log.error('self-query error', err)
165-
} finally {
166-
signal.clear()
167-
}
168-
}).catch(err => {
169-
this.log('self-query error', err)
170-
}).finally(() => {
171-
this.running = false
124+
try {
125+
this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)
126+
127+
const found = await pipe(
128+
this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), {
129+
signal,
130+
isSelfQuery: true
131+
}),
132+
(source) => take(source, this.count),
133+
async (source) => length(source)
134+
)
172135

173-
clearTimeout(this.timeoutId)
136+
this.log('self-query ran successfully - found %d peers', found)
174137

175-
if (this.started) {
176-
this.log('running self-query again in %dms', this.interval)
177-
this.timeoutId = setTimeout(this.querySelf.bind(this), this.interval)
138+
if (this.initialQuerySelfHasRun != null) {
139+
this.initialQuerySelfHasRun.resolve()
140+
this.initialQuerySelfHasRun = undefined
178141
}
179-
})
142+
} catch (err: any) {
143+
this.log.error('self-query error', err)
144+
} finally {
145+
signal.clear()
146+
}
147+
}
148+
149+
this.querySelfPromise.resolve()
150+
this.querySelfPromise = undefined
151+
152+
this.timeoutId = setTimeout(() => {
153+
this.querySelf()
154+
.catch(err => {
155+
this.log.error('error running self-query', err)
156+
})
157+
}, this.interval)
180158
}
181159
}

src/routing-table/index.ts

+15-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { EventEmitter } from '@libp2p/interfaces/events'
12
import { logger } from '@libp2p/logger'
23
import { PeerSet } from '@libp2p/peer-collections'
34
import Queue from 'p-queue'
@@ -33,11 +34,16 @@ export interface RoutingTableComponents {
3334
metrics?: Metrics
3435
}
3536

37+
export interface RoutingTableEvents {
38+
'peer:add': CustomEvent<PeerId>
39+
'peer:remove': CustomEvent<PeerId>
40+
}
41+
3642
/**
3743
* A wrapper around `k-bucket`, to provide easy store and
3844
* retrieval for peers.
3945
*/
40-
export class RoutingTable implements Startable {
46+
export class RoutingTable extends EventEmitter<RoutingTableEvents> implements Startable {
4147
public kBucketSize: number
4248
public kb?: KBucket
4349
public pingQueue: Queue
@@ -58,6 +64,8 @@ export class RoutingTable implements Startable {
5864
}
5965

6066
constructor (components: RoutingTableComponents, init: RoutingTableInit) {
67+
super()
68+
6169
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init
6270

6371
this.components = components
@@ -160,11 +168,15 @@ export class RoutingTable implements Startable {
160168
kClosest = newClosest
161169
})
162170

163-
kBuck.addEventListener('added', () => {
171+
kBuck.addEventListener('added', (evt) => {
164172
updatePeerTags()
173+
174+
this.safeDispatchEvent('peer:add', { detail: evt.detail.peer })
165175
})
166-
kBuck.addEventListener('removed', () => {
176+
kBuck.addEventListener('removed', (evt) => {
167177
updatePeerTags()
178+
179+
this.safeDispatchEvent('peer:remove', { detail: evt.detail.peer })
168180
})
169181
}
170182

0 commit comments

Comments
 (0)