Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit df15a83

Browse files
authored
feat: tag kad-close peers (#375)
When peers are added or removed from the kbuckets, if the `kBucketSize` closest peers change, update their peer store tags to ensure we don't close connections to them when the connection manager reaches it's max connections limit.
1 parent 16583be commit df15a83

File tree

4 files changed

+185
-9
lines changed

4 files changed

+185
-9
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@
201201
"lodash.random": "^3.2.0",
202202
"lodash.range": "^3.2.0",
203203
"p-retry": "^5.0.0",
204+
"p-wait-for": "^5.0.0",
204205
"protons": "^5.1.0",
205206
"sinon": "^14.0.0",
206207
"ts-sinon": "^2.0.2",

src/routing-table/index.ts

+77-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import type { PeerId } from '@libp2p/interface-peer-id'
88
import type { Startable } from '@libp2p/interfaces/startable'
99
import type { Logger } from '@libp2p/logger'
1010
import { Components, Initializable } from '@libp2p/components'
11+
import { PeerSet } from '@libp2p/peer-collections'
12+
13+
export const KAD_CLOSE_TAG_NAME = 'kad-close'
14+
export const KAD_CLOSE_TAG_VALUE = 50
15+
export const KBUCKET_SIZE = 20
16+
export const PING_TIMEOUT = 10000
17+
export const PING_CONCURRENCY = 10
1118

1219
export interface KBucketPeer {
1320
id: Uint8Array
@@ -22,10 +29,20 @@ export interface KBucket {
2229
right: KBucket
2330
}
2431

32+
interface KBucketTreeEvents {
33+
'ping': (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void
34+
'added': (contact: KBucketPeer) => void
35+
'removed': (contact: KBucketPeer) => void
36+
}
37+
2538
export interface KBucketTree {
2639
root: KBucket
2740
localNodeId: Uint8Array
28-
on: (event: 'ping', callback: (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void) => void
41+
42+
on: <U extends keyof KBucketTreeEvents>(
43+
event: U, listener: KBucketTreeEvents[U]
44+
) => this
45+
2946
closest: (key: Uint8Array, count: number) => KBucketPeer[]
3047
closestPeer: (key: Uint8Array) => KBucketPeer
3148
remove: (key: Uint8Array) => void
@@ -45,6 +62,8 @@ export interface RoutingTableInit {
4562
kBucketSize?: number
4663
pingTimeout?: number
4764
pingConcurrency?: number
65+
tagName?: string
66+
tagValue?: number
4867
}
4968

5069
/**
@@ -63,17 +82,21 @@ export class RoutingTable implements Startable, Initializable {
6382
private readonly pingConcurrency: number
6483
private running: boolean
6584
private readonly protocol: string
85+
private readonly tagName: string
86+
private readonly tagValue: number
6687

6788
constructor (init: RoutingTableInit) {
68-
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol } = init
89+
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init
6990

7091
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`)
71-
this.kBucketSize = kBucketSize ?? 20
72-
this.pingTimeout = pingTimeout ?? 10000
73-
this.pingConcurrency = pingConcurrency ?? 10
92+
this.kBucketSize = kBucketSize ?? KBUCKET_SIZE
93+
this.pingTimeout = pingTimeout ?? PING_TIMEOUT
94+
this.pingConcurrency = pingConcurrency ?? PING_CONCURRENCY
7495
this.lan = lan
7596
this.running = false
7697
this.protocol = protocol
98+
this.tagName = tagName ?? KAD_CLOSE_TAG_NAME
99+
this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE
77100

78101
const updatePingQueueSizeMetric = () => {
79102
this.components.getMetrics()?.updateComponentMetric({
@@ -108,13 +131,18 @@ export class RoutingTable implements Startable, Initializable {
108131
async start () {
109132
this.running = true
110133

111-
const kBuck = new KBuck({
134+
const kBuck: KBucketTree = new KBuck({
112135
localNodeId: await utils.convertPeerId(this.components.getPeerId()),
113136
numberOfNodesPerKBucket: this.kBucketSize,
114137
numberOfNodesToPing: 1
115138
})
116-
kBuck.on('ping', this._onPing)
117139
this.kb = kBuck
140+
141+
// test whether to evict peers
142+
kBuck.on('ping', this._onPing)
143+
144+
// tag kad-close peers
145+
this._tagPeers(kBuck)
118146
}
119147

120148
async stop () {
@@ -123,6 +151,48 @@ export class RoutingTable implements Startable, Initializable {
123151
this.kb = undefined
124152
}
125153

154+
/**
155+
* Keep track of our k-closest peers and tag them in the peer store as such
156+
* - this will lower the chances that connections to them get closed when
157+
* we reach connection limits
158+
*/
159+
_tagPeers (kBuck: KBucketTree) {
160+
let kClosest = new PeerSet()
161+
162+
const updatePeerTags = utils.debounce(() => {
163+
const newClosest = new PeerSet(
164+
kBuck.closest(kBuck.localNodeId, KBUCKET_SIZE).map(contact => contact.peer)
165+
)
166+
const addedPeers = newClosest.difference(kClosest)
167+
const removedPeers = kClosest.difference(newClosest)
168+
169+
Promise.resolve()
170+
.then(async () => {
171+
for (const peer of addedPeers) {
172+
await this.components.getPeerStore().tagPeer(peer, this.tagName, {
173+
value: this.tagValue
174+
})
175+
}
176+
177+
for (const peer of removedPeers) {
178+
await this.components.getPeerStore().unTagPeer(peer, this.tagName)
179+
}
180+
})
181+
.catch(err => {
182+
this.log.error('Could not update peer tags', err)
183+
})
184+
185+
kClosest = newClosest
186+
})
187+
188+
kBuck.on('added', () => {
189+
updatePeerTags()
190+
})
191+
kBuck.on('removed', () => {
192+
updatePeerTags()
193+
})
194+
}
195+
126196
/**
127197
* Called on the `ping` event from `k-bucket` when a bucket is full
128198
* and cannot split.

src/utils.ts

+9
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,12 @@ export function createPutRecord (key: Uint8Array, value: Uint8Array) {
112112

113113
return rec.serialize()
114114
}
115+
116+
export function debounce (callback: () => void, wait: number = 100) {
117+
let timeout: ReturnType<typeof setTimeout>
118+
119+
return () => {
120+
clearTimeout(timeout)
121+
timeout = setTimeout(() => callback(), wait)
122+
}
123+
}

test/routing-table.spec.ts

+98-2
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,21 @@
33
import { expect } from 'aegir/chai'
44
import random from 'lodash.random'
55
import sinon from 'sinon'
6-
import { RoutingTable } from '../src/routing-table/index.js'
6+
import { KAD_CLOSE_TAG_NAME, KAD_CLOSE_TAG_VALUE, KBUCKET_SIZE, RoutingTable } from '../src/routing-table/index.js'
77
import * as kadUtils from '../src/utils.js'
88
import { createPeerId, createPeerIds } from './utils/create-peer-id.js'
99
import { PROTOCOL_DHT } from '../src/constants.js'
1010
import { peerIdFromString } from '@libp2p/peer-id'
1111
import { Components } from '@libp2p/components'
1212
import { mockConnectionManager } from '@libp2p/interface-mocks'
13+
import { PersistentPeerStore } from '@libp2p/peer-store'
14+
import { MemoryDatastore } from 'datastore-core'
15+
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
16+
import { sortClosestPeers } from './utils/sort-closest-peers.js'
17+
import pWaitFor from 'p-wait-for'
18+
import { pipe } from 'it-pipe'
19+
import all from 'it-all'
20+
import { PeerSet } from '@libp2p/peer-collections'
1321

1422
describe('Routing Table', () => {
1523
let table: RoutingTable
@@ -20,7 +28,9 @@ describe('Routing Table', () => {
2028

2129
components = new Components({
2230
peerId: await createPeerId(),
23-
connectionManager: mockConnectionManager()
31+
connectionManager: mockConnectionManager(),
32+
datastore: new MemoryDatastore(),
33+
peerStore: new PersistentPeerStore()
2434
})
2535

2636
table = new RoutingTable({
@@ -207,4 +217,90 @@ describe('Routing Table', () => {
207217
// evicted the old peer
208218
expect(table.kb.get(oldPeer.id)).to.be.null()
209219
})
220+
221+
it('tags newly found kad-close peers', async () => {
222+
const remotePeer = await createEd25519PeerId()
223+
const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer')
224+
225+
await table.add(remotePeer)
226+
227+
expect(tagPeerSpy.callCount).to.equal(0, 'did not debounce call to peerStore.tagPeer')
228+
229+
await pWaitFor(() => {
230+
return tagPeerSpy.callCount === 1
231+
})
232+
233+
expect(tagPeerSpy.callCount).to.equal(1, 'did not tag kad-close peer')
234+
expect(tagPeerSpy.getCall(0).args[0].toString()).to.equal(remotePeer.toString())
235+
expect(tagPeerSpy.getCall(0).args[1]).to.equal(KAD_CLOSE_TAG_NAME)
236+
expect(tagPeerSpy.getCall(0).args[2]).to.have.property('value', KAD_CLOSE_TAG_VALUE)
237+
})
238+
239+
it('removes tags from kad-close peers when closer peers are found', async () => {
240+
async function getTaggedPeers (): Promise<PeerSet> {
241+
return new PeerSet(await pipe(
242+
await components.getPeerStore().all(),
243+
async function * (source) {
244+
for await (const peer of source) {
245+
const tags = await components.getPeerStore().getTags(peer.id)
246+
const kadCloseTags = tags.filter(tag => tag.name === KAD_CLOSE_TAG_NAME)
247+
248+
if (kadCloseTags.length > 0) {
249+
yield peer.id
250+
}
251+
}
252+
},
253+
async (source) => await all(source)
254+
))
255+
}
256+
257+
const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer')
258+
const unTagPeerSpy = sinon.spy(components.getPeerStore(), 'unTagPeer')
259+
const localNodeId = await kadUtils.convertPeerId(components.getPeerId())
260+
const sortedPeerList = await sortClosestPeers(
261+
await Promise.all(
262+
new Array(KBUCKET_SIZE + 1).fill(0).map(async () => await createEd25519PeerId())
263+
),
264+
localNodeId
265+
)
266+
267+
// sort list furthest -> closest
268+
sortedPeerList.reverse()
269+
270+
// fill the table up to the first kbucket size
271+
for (let i = 0; i < KBUCKET_SIZE; i++) {
272+
await table.add(sortedPeerList[i])
273+
}
274+
275+
// should have all added contacts in the root kbucket
276+
expect(table.kb?.count()).to.equal(KBUCKET_SIZE, 'did not fill kbuckets')
277+
expect(table.kb?.root.contacts).to.have.lengthOf(KBUCKET_SIZE, 'split root kbucket when we should not have')
278+
expect(table.kb?.root.left).to.be.null('split root kbucket when we should not have')
279+
expect(table.kb?.root.right).to.be.null('split root kbucket when we should not have')
280+
281+
await pWaitFor(() => {
282+
return tagPeerSpy.callCount === KBUCKET_SIZE
283+
})
284+
285+
// make sure we tagged all of the peers as kad-close
286+
const taggedPeers = await getTaggedPeers()
287+
expect(taggedPeers.difference(new PeerSet(sortedPeerList.slice(0, sortedPeerList.length - 1)))).to.have.property('size', 0)
288+
tagPeerSpy.resetHistory()
289+
290+
// add a node that is closer than any added so far
291+
await table.add(sortedPeerList[sortedPeerList.length - 1])
292+
293+
expect(table.kb?.count()).to.equal(KBUCKET_SIZE + 1, 'did not fill kbuckets')
294+
expect(table.kb?.root.left).to.not.be.null('did not split root kbucket when we should have')
295+
expect(table.kb?.root.right).to.not.be.null('did not split root kbucket when we should have')
296+
297+
// wait for tag new peer and untag old peer
298+
await pWaitFor(() => {
299+
return tagPeerSpy.callCount === 1 && unTagPeerSpy.callCount === 1
300+
})
301+
302+
// should have updated list of tagged peers
303+
const finalTaggedPeers = await getTaggedPeers()
304+
expect(finalTaggedPeers.difference(new PeerSet(sortedPeerList.slice(1)))).to.have.property('size', 0)
305+
})
210306
})

0 commit comments

Comments
 (0)