Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 9d5bdb9

Browse files
authored
fix: wait for self-query to have run before running queries (#457)
In order to run a query we need DHT peers. The most common way to find these is to run a self-query which happens after startup. There's no way for the users to know when this has happened so they end up putting arbitrary waits into the code so their queries have a better chance of succeeding. Instead just delay all queries until the self-query has run and found some peers, then continue with the queries.
1 parent 134190d commit 9d5bdb9

File tree

9 files changed

+303
-51
lines changed

9 files changed

+303
-51
lines changed

src/constants.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ export const ALPHA = 3
4141
// How often we look for our closest DHT neighbours
4242
export const QUERY_SELF_INTERVAL = Number(5 * minute)
4343

44+
// How often we look for the first set of our closest DHT neighbours
45+
export const QUERY_SELF_INITIAL_INTERVAL = Number(Number(second))
46+
4447
// How long to look for our closest DHT neighbours for
45-
export const QUERY_SELF_TIMEOUT = Number(30 * second)
48+
export const QUERY_SELF_TIMEOUT = Number(5 * second)
4649

4750
// How often we try to find new peers
4851
export const TABLE_REFRESH_INTERVAL = Number(5 * minute)

src/index.ts

+15
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ export interface KadDHTInit {
4545
*/
4646
querySelfInterval?: number
4747

48+
/**
49+
* During startup we run the self-query at a shorter interval to ensure
50+
* the containing node can respond to queries quickly. Set this interval
51+
* here in ms (default: 1000)
52+
*/
53+
initialQuerySelfInterval?: number
54+
55+
/**
56+
* After startup by default all queries will be paused until the initial
57+
* self-query has run and there are some peers in the routing table.
58+
*
59+
* Pass true here to disable this behaviour. (default: false)
60+
*/
61+
allowQueryWithZeroPeers?: boolean
62+
4863
/**
4964
* A custom protocol prefix to use (default: '/ipfs')
5065
*/

src/kad-dht.ts

+19-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { validators as recordValidators } from '@libp2p/record/validators'
2525
import { selectors as recordSelectors } from '@libp2p/record/selectors'
2626
import { symbol } from '@libp2p/interface-peer-discovery'
2727
import { PROTOCOL_DHT, PROTOCOL_PREFIX, LAN_PREFIX } from './constants.js'
28+
import pDefer from 'p-defer'
2829

2930
export const DEFAULT_MAX_INBOUND_STREAMS = 32
3031
export const DEFAULT_MAX_OUTBOUND_STREAMS = 64
@@ -117,10 +118,22 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT {
117118
protocol: this.protocol,
118119
lan: this.lan
119120
})
121+
122+
// all queries should wait for the initial query-self query to run so we have
123+
// some peers and don't force consumers to use arbitrary timeouts
124+
const initialQuerySelfHasRun = pDefer<any>()
125+
126+
// if the user doesn't want to wait for query peers, resolve the initial
127+
// self-query promise immediately
128+
if (init.allowQueryWithZeroPeers === true) {
129+
initialQuerySelfHasRun.resolve()
130+
}
131+
120132
this.queryManager = new QueryManager(components, {
121133
// Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper
122134
disjointPaths: Math.ceil(this.kBucketSize / 2),
123-
lan
135+
lan,
136+
initialQuerySelfHasRun
124137
})
125138

126139
// DHT components
@@ -167,7 +180,10 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT {
167180
this.querySelf = new QuerySelf(components, {
168181
peerRouting: this.peerRouting,
169182
interval: querySelfInterval,
170-
lan: this.lan
183+
initialInterval: init.initialQuerySelfInterval,
184+
lan: this.lan,
185+
initialQuerySelfHasRun,
186+
routingTable: this.routingTable
171187
})
172188

173189
// handle peers being discovered during processing of DHT messages
@@ -212,7 +228,7 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT {
212228
}
213229

214230
async onPeerConnect (peerData: PeerInfo): Promise<void> {
215-
this.log('peer %p connected with protocols %s', peerData.id, peerData.protocols)
231+
this.log('peer %p connected with protocols', peerData.id, peerData.protocols)
216232

217233
if (this.lan) {
218234
peerData = removePublicAddresses(peerData)

src/peer-routing/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import { Libp2pRecord } from '@libp2p/record'
1313
import { logger } from '@libp2p/logger'
1414
import { keys } from '@libp2p/crypto'
1515
import { peerIdFromKeys } from '@libp2p/peer-id'
16-
import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, QueryOptions, Validators } from '@libp2p/interface-dht'
16+
import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, Validators } from '@libp2p/interface-dht'
1717
import type { RoutingTable } from '../routing-table/index.js'
18-
import type { QueryManager } from '../query/manager.js'
18+
import type { QueryManager, QueryOptions } from '../query/manager.js'
1919
import type { Network } from '../network.js'
2020
import type { Logger } from '@libp2p/logger'
2121
import type { AbortOptions } from '@libp2p/interfaces'

src/query-self.ts

+106-36
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
11
import { setMaxListeners } from 'events'
22
import take from 'it-take'
33
import length from 'it-length'
4-
import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K } from './constants.js'
4+
import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K, QUERY_SELF_INITIAL_INTERVAL } from './constants.js'
55
import { anySignal } from 'any-signal'
66
import { logger, Logger } from '@libp2p/logger'
77
import type { PeerRouting } from './peer-routing/index.js'
88
import type { Startable } from '@libp2p/interfaces/startable'
99
import { pipe } from 'it-pipe'
1010
import type { KadDHTComponents } from './index.js'
11+
import type { DeferredPromise } from 'p-defer'
12+
import type { RoutingTable } from './routing-table/index.js'
1113

1214
export interface QuerySelfInit {
1315
lan: boolean
1416
peerRouting: PeerRouting
17+
routingTable: RoutingTable
1518
count?: number
1619
interval?: number
20+
initialInterval?: number
1721
queryTimeout?: number
22+
initialQuerySelfHasRun: DeferredPromise<void>
23+
}
24+
25+
function debounce (func: () => void, wait: number): () => void {
26+
let timeout: ReturnType<typeof setTimeout> | undefined
27+
28+
return function () {
29+
const later = function (): void {
30+
timeout = undefined
31+
func()
32+
}
33+
34+
clearTimeout(timeout)
35+
timeout = setTimeout(later, wait)
36+
}
1837
}
1938

2039
/**
@@ -24,40 +43,51 @@ export class QuerySelf implements Startable {
2443
private readonly log: Logger
2544
private readonly components: KadDHTComponents
2645
private readonly peerRouting: PeerRouting
46+
private readonly routingTable: RoutingTable
2747
private readonly count: number
2848
private readonly interval: number
49+
private readonly initialInterval: number
2950
private readonly queryTimeout: number
51+
private started: boolean
3052
private running: boolean
3153
private timeoutId?: NodeJS.Timer
3254
private controller?: AbortController
55+
private initialQuerySelfHasRun?: DeferredPromise<void>
3356

3457
constructor (components: KadDHTComponents, init: QuerySelfInit) {
35-
const { peerRouting, lan, count, interval, queryTimeout } = init
58+
const { peerRouting, lan, count, interval, queryTimeout, routingTable } = init
3659

3760
this.components = components
3861
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`)
3962
this.running = false
63+
this.started = false
4064
this.peerRouting = peerRouting
65+
this.routingTable = routingTable
4166
this.count = count ?? K
4267
this.interval = interval ?? QUERY_SELF_INTERVAL
68+
this.initialInterval = init.initialInterval ?? QUERY_SELF_INITIAL_INTERVAL
4369
this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT
70+
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun
71+
72+
this.querySelf = debounce(this.querySelf.bind(this), 100)
4473
}
4574

4675
isStarted (): boolean {
47-
return this.running
76+
return this.started
4877
}
4978

5079
async start (): Promise<void> {
51-
if (this.running) {
80+
if (this.started) {
5281
return
5382
}
5483

55-
this.running = true
56-
this._querySelf()
84+
this.started = true
85+
clearTimeout(this.timeoutId)
86+
this.timeoutId = setTimeout(this.querySelf.bind(this), this.initialInterval)
5787
}
5888

5989
async stop (): Promise<void> {
60-
this.running = false
90+
this.started = false
6191

6292
if (this.timeoutId != null) {
6393
clearTimeout(this.timeoutId)
@@ -68,36 +98,76 @@ export class QuerySelf implements Startable {
6898
}
6999
}
70100

71-
_querySelf (): void {
72-
Promise.resolve().then(async () => {
73-
this.controller = new AbortController()
74-
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])
101+
querySelf (): void {
102+
if (!this.started) {
103+
this.log('skip self-query because we are not started')
104+
return
105+
}
75106

76-
// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
77-
try {
78-
if (setMaxListeners != null) {
79-
setMaxListeners(Infinity, signal)
80-
}
81-
} catch {} // fails on node < 15.4
82-
83-
try {
84-
const found = await pipe(
85-
this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), {
86-
signal
87-
}),
88-
(source) => take(source, this.count),
89-
async (source) => await length(source)
90-
)
91-
92-
this.log('query ran successfully - found %d peers', found)
93-
} catch (err: any) {
94-
this.log('query error', err)
95-
} finally {
96-
this.timeoutId = setTimeout(this._querySelf.bind(this), this.interval)
97-
signal.clear()
107+
if (this.running) {
108+
this.log('skip self-query because we are already running, will run again in %dms', this.interval)
109+
return
110+
}
111+
112+
if (this.routingTable.size === 0) {
113+
let nextInterval = this.interval
114+
115+
if (this.initialQuerySelfHasRun != null) {
116+
// if we've not yet run the first self query, shorten the interval until we try again
117+
nextInterval = this.initialInterval
98118
}
99-
}).catch(err => {
100-
this.log('query error', err)
101-
})
119+
120+
this.log('skip self-query because routing table is empty, will run again in %dms', nextInterval)
121+
clearTimeout(this.timeoutId)
122+
this.timeoutId = setTimeout(this.querySelf.bind(this), nextInterval)
123+
return
124+
}
125+
126+
this.running = true
127+
128+
Promise.resolve()
129+
.then(async () => {
130+
this.controller = new AbortController()
131+
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])
132+
133+
// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
134+
try {
135+
if (setMaxListeners != null) {
136+
setMaxListeners(Infinity, signal)
137+
}
138+
} catch {} // fails on node < 15.4
139+
140+
try {
141+
this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)
142+
143+
const found = await pipe(
144+
this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), {
145+
signal,
146+
isSelfQuery: true
147+
}),
148+
(source) => take(source, this.count),
149+
async (source) => await length(source)
150+
)
151+
152+
this.log('self-query ran successfully - found %d peers', found)
153+
154+
if (this.initialQuerySelfHasRun != null) {
155+
this.initialQuerySelfHasRun.resolve()
156+
this.initialQuerySelfHasRun = undefined
157+
}
158+
} catch (err: any) {
159+
this.log.error('self-query error', err)
160+
} finally {
161+
signal.clear()
162+
}
163+
}).catch(err => {
164+
this.log('self-query error', err)
165+
}).finally(() => {
166+
this.running = false
167+
168+
this.log('running self-query again in %dms', this.interval)
169+
clearTimeout(this.timeoutId)
170+
this.timeoutId = setTimeout(this.querySelf.bind(this), this.interval)
171+
})
102172
}
103173
}

src/query/manager.ts

+28-1
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ import { logger } from '@libp2p/logger'
1111
import type { PeerId } from '@libp2p/interface-peer-id'
1212
import type { Startable } from '@libp2p/interfaces/startable'
1313
import type { QueryFunc } from './types.js'
14-
import type { QueryEvent, QueryOptions } from '@libp2p/interface-dht'
14+
import type { QueryEvent } from '@libp2p/interface-dht'
1515
import { PeerSet } from '@libp2p/peer-collections'
1616
import type { Metric, Metrics } from '@libp2p/interface-metrics'
17+
import type { DeferredPromise } from 'p-defer'
18+
import type { AbortOptions } from '@libp2p/interfaces'
19+
import { AbortError } from '@libp2p/interfaces/errors'
1720

1821
export interface CleanUpEvents {
1922
'cleanup': CustomEvent
@@ -23,13 +26,19 @@ export interface QueryManagerInit {
2326
lan?: boolean
2427
disjointPaths?: number
2528
alpha?: number
29+
initialQuerySelfHasRun: DeferredPromise<void>
2630
}
2731

2832
export interface QueryManagerComponents {
2933
peerId: PeerId
3034
metrics?: Metrics
3135
}
3236

37+
export interface QueryOptions extends AbortOptions {
38+
queryFuncTimeout?: number
39+
isSelfQuery?: boolean
40+
}
41+
3342
/**
3443
* Keeps track of all running queries
3544
*/
@@ -46,6 +55,8 @@ export class QueryManager implements Startable {
4655
queryTime: Metric
4756
}
4857

58+
private initialQuerySelfHasRun?: DeferredPromise<void>
59+
4960
constructor (components: QueryManagerComponents, init: QueryManagerInit) {
5061
const { lan = false, disjointPaths = K, alpha = ALPHA } = init
5162

@@ -55,6 +66,7 @@ export class QueryManager implements Startable {
5566
this.alpha = alpha ?? ALPHA
5667
this.lan = lan
5768
this.queries = 0
69+
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun
5870

5971
// allow us to stop queries on shut down
6072
this.shutDownController = new AbortController()
@@ -131,6 +143,21 @@ export class QueryManager implements Startable {
131143
const cleanUp = new EventEmitter<CleanUpEvents>()
132144

133145
try {
146+
if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) {
147+
log('waiting for initial query-self query before continuing')
148+
149+
await Promise.race([
150+
new Promise((resolve, reject) => {
151+
signal.addEventListener('abort', () => {
152+
reject(new AbortError('Query was aborted before self-query ran'))
153+
})
154+
}),
155+
this.initialQuerySelfHasRun.promise
156+
])
157+
158+
this.initialQuerySelfHasRun = undefined
159+
}
160+
134161
log('query:start')
135162
this.queries++
136163
this.metrics?.runningQueries.update(this.queries)

0 commit comments

Comments
 (0)