Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 12804e2

Browse files
authored
feat: async peer store (#272)
Refactors interfaces and classes used by `libp2p-interfaces` to use the async peer store from libp2p/js-libp2p#1058 BREAKING CHANGE: peerstore methods are now all async
1 parent 2a5e7d8 commit 12804e2

17 files changed

+120
-115
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ import { create } from 'libp2p-kad-dht'
6060
/**
6161
* @param {Libp2p} libp2p
6262
*/
63-
function addDHT(libp2p) {
63+
async function addDHT(libp2p) {
6464
const customDHT = create({
6565
libp2p,
6666
protocolPrefix: '/custom'
6767
})
68-
customDHT.start()
68+
await customDHT.start()
6969

7070
return customDHT
7171
}

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"it-take": "^1.0.2",
6767
"k-bucket": "^5.1.0",
6868
"libp2p-crypto": "^0.21.0",
69-
"libp2p-interfaces": "^2.0.1",
69+
"libp2p-interfaces": "^4.0.0",
7070
"libp2p-record": "^0.10.4",
7171
"multiaddr": "^10.0.0",
7272
"multiformats": "^9.4.5",
@@ -92,7 +92,7 @@
9292
"it-filter": "^1.0.3",
9393
"it-last": "^1.0.6",
9494
"it-pair": "^1.0.0",
95-
"libp2p": "^0.35.4",
95+
"libp2p": "libp2p/js-libp2p#feat/async-peerstore",
9696
"lodash.random": "^3.2.0",
9797
"lodash.range": "^3.2.0",
9898
"p-retry": "^4.2.0",

src/content-routing/index.js

+10-5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ContentRouting {
2828
* @param {import('../query/manager').QueryManager} params.queryManager
2929
* @param {import('../routing-table').RoutingTable} params.routingTable
3030
* @param {import('../providers').Providers} params.providers
31-
* @param {import('../types').PeerStore} params.peerStore
31+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
3232
* @param {boolean} params.lan
3333
*/
3434
constructor ({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }) {
@@ -137,10 +137,15 @@ class ContentRouting {
137137

138138
// yield values if we have some, also slice because maybe we got lucky and already have too many?
139139
if (provs.length) {
140-
const providers = provs.slice(0, toFind).map(peerId => ({
141-
id: peerId,
142-
multiaddrs: (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr)
143-
}))
140+
/** @type {{ id: PeerId, multiaddrs: Multiaddr[] }[]} */
141+
const providers = []
142+
143+
for (const peerId of provs.slice(0, toFind)) {
144+
providers.push({
145+
id: peerId,
146+
multiaddrs: ((await this._peerStore.addressBook.get(peerId)) || []).map(address => address.multiaddr)
147+
})
148+
}
144149

145150
yield peerResponseEvent({ from: this._peerId, messageType: MessageType.GET_PROVIDERS, providers })
146151
yield providerEvent({ from: this._peerId, providers: providers })

src/dual-kad-dht.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,15 @@ class DualKadDHT extends EventEmitter {
7777
/**
7878
* Whether we are in client or server mode
7979
*/
80-
enableServerMode () {
81-
this._wan.enableServerMode()
80+
async enableServerMode () {
81+
await this._wan.enableServerMode()
8282
}
8383

8484
/**
8585
* Whether we are in client or server mode
8686
*/
87-
enableClientMode () {
88-
this._wan.enableClientMode()
87+
async enableClientMode () {
88+
await this._wan.enableClientMode()
8989
}
9090

9191
/**
@@ -314,7 +314,7 @@ class DualKadDHT extends EventEmitter {
314314
log('getPublicKey %p', peer)
315315

316316
// local check
317-
const peerData = this._libp2p.peerStore.get(peer)
317+
const peerData = await this._libp2p.peerStore.get(peer)
318318

319319
if (peerData && peerData.id.pubKey) {
320320
log('getPublicKey: found local copy')
@@ -339,8 +339,8 @@ class DualKadDHT extends EventEmitter {
339339

340340
const peerId = new PeerId(peer.id, undefined, pk)
341341
const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr)
342-
this._libp2p.peerStore.addressBook.add(peerId, addrs)
343-
this._libp2p.peerStore.keyBook.set(peerId, pk)
342+
await this._libp2p.peerStore.addressBook.add(peerId, addrs)
343+
await this._libp2p.peerStore.keyBook.set(peerId, pk)
344344

345345
return pk
346346
}

src/kad-dht.js

+9-7
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,11 @@ class KadDHT extends EventEmitter {
280280

281281
// handle peers being discovered via other peer discovery mechanisms
282282
this._topologyListener.on('peer', async (peerId) => {
283+
const multiaddrs = await this._libp2p.peerStore.addressBook.get(peerId)
284+
283285
const peerData = {
284286
id: peerId,
285-
multiaddrs: (this._libp2p.peerStore.addressBook.get(peerId) || []).map((/** @type {{ multiaddr: Multiaddr }} */ addr) => addr.multiaddr)
287+
multiaddrs: multiaddrs.map(addr => addr.multiaddr)
286288
}
287289

288290
this.onPeerConnect(peerData).catch(err => {
@@ -332,19 +334,19 @@ class KadDHT extends EventEmitter {
332334
/**
333335
* Whether we are in client or server mode
334336
*/
335-
enableServerMode () {
337+
async enableServerMode () {
336338
this._log('enabling server mode')
337339
this._clientMode = false
338-
this._libp2p.handle(this._protocol, this._rpc.onIncomingStream.bind(this._rpc))
340+
await this._libp2p.handle(this._protocol, this._rpc.onIncomingStream.bind(this._rpc))
339341
}
340342

341343
/**
342344
* Whether we are in client or server mode
343345
*/
344-
enableClientMode () {
346+
async enableClientMode () {
345347
this._log('enabling client mode')
346348
this._clientMode = true
347-
this._libp2p.unhandle(this._protocol)
349+
await this._libp2p.unhandle(this._protocol)
348350
}
349351

350352
/**
@@ -355,9 +357,9 @@ class KadDHT extends EventEmitter {
355357

356358
// Only respond to queries when not in client mode
357359
if (this._clientMode) {
358-
this.enableClientMode()
360+
await this.enableClientMode()
359361
} else {
360-
this.enableServerMode()
362+
await this.enableServerMode()
361363
}
362364

363365
await Promise.all([

src/peer-routing/index.js

+25-19
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class PeerRouting {
2424
* @param {object} params
2525
* @param {import('peer-id')} params.peerId
2626
* @param {import('../routing-table').RoutingTable} params.routingTable
27-
* @param {import('../types').PeerStore} params.peerStore
27+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
2828
* @param {import('../network').Network} params.network
2929
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
3030
* @param {import('../query/manager').QueryManager} params.queryManager
@@ -52,11 +52,11 @@ class PeerRouting {
5252

5353
if (p) {
5454
this._log('findPeerLocal found %p in routing table', peer)
55-
peerData = this._peerStore.get(p)
55+
peerData = await this._peerStore.get(p)
5656
}
5757

5858
if (!peerData) {
59-
peerData = this._peerStore.get(peer)
59+
peerData = await this._peerStore.get(peer)
6060
}
6161

6262
if (peerData) {
@@ -141,7 +141,7 @@ class PeerRouting {
141141
const match = peers.find((p) => p.equals(id))
142142

143143
if (match) {
144-
const peer = this._peerStore.get(id)
144+
const peer = await this._peerStore.get(id)
145145

146146
if (peer) {
147147
this._log('found in peerStore')
@@ -232,13 +232,15 @@ class PeerRouting {
232232

233233
this._log('found %d peers close to %b', peers.length, key)
234234

235-
yield * peers.peers.map(peer => finalPeerEvent({
236-
from: this._peerId,
237-
peer: {
238-
id: peer,
239-
multiaddrs: (this._peerStore.addressBook.get(peer) || []).map(addr => addr.multiaddr)
240-
}
241-
}))
235+
for (const peer of peers.peers) {
236+
yield finalPeerEvent({
237+
from: this._peerId,
238+
peer: {
239+
id: peer,
240+
multiaddrs: (await (this._peerStore.addressBook.get(peer)) || []).map(addr => addr.multiaddr)
241+
}
242+
})
243+
}
242244
}
243245

244246
/**
@@ -294,16 +296,20 @@ class PeerRouting {
294296
async getCloserPeersOffline (key, closerThan) {
295297
const id = await utils.convertBuffer(key)
296298
const ids = this._routingTable.closestPeers(id)
297-
const output = ids
298-
.map((p) => {
299-
const peer = this._peerStore.get(p)
299+
const output = []
300300

301-
return {
302-
id: p,
303-
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : []
304-
}
301+
for (const peerId of ids) {
302+
if (peerId.equals(closerThan)) {
303+
continue
304+
}
305+
306+
const peer = await this._peerStore.get(peerId)
307+
308+
output.push({
309+
id: peerId,
310+
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : []
305311
})
306-
.filter((closer) => !closer.id.equals(closerThan))
312+
}
307313

308314
if (output.length) {
309315
this._log('getCloserPeersOffline found %d peer(s) closer to %b than %p', output.length, key, closerThan)

src/rpc/handlers/add-provider.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class AddProviderHandler {
1919
* @param {object} params
2020
* @param {PeerId} params.peerId
2121
* @param {import('../../providers').Providers} params.providers
22-
* @param {import('../../types').PeerStore} params.peerStore
22+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
2323
*/
2424
constructor ({ peerId, providers, peerStore }) {
2525
this._peerId = peerId
@@ -69,7 +69,7 @@ class AddProviderHandler {
6969

7070
if (!this._peerId.equals(pi.id)) {
7171
// Add known address to peer store
72-
this._peerStore.addressBook.add(pi.id, pi.multiaddrs)
72+
await this._peerStore.addressBook.add(pi.id, pi.multiaddrs)
7373
await this._providers.addProvider(cid, pi.id)
7474
}
7575
})

src/rpc/handlers/get-providers.js

+22-13
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const {
1313
/**
1414
* @typedef {import('peer-id')} PeerId
1515
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
16+
* @typedef {import('../../types').PeerData} PeerData
1617
*/
1718

1819
/**
@@ -24,7 +25,7 @@ class GetProvidersHandler {
2425
* @param {PeerId} params.peerId
2526
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
2627
* @param {import('../../providers').Providers} params.providers
27-
* @param {import('../../types').PeerStore} params.peerStore
28+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
2829
* @param {import('../../types').Addressable} params.addressable
2930
* @param {boolean} [params.lan]
3031
*/
@@ -58,8 +59,8 @@ class GetProvidersHandler {
5859
this._peerRouting.getCloserPeersOffline(msg.key, peerId)
5960
])
6061

61-
const providerPeers = this._getPeers(peers)
62-
const closerPeers = this._getPeers(closer.map(({ id }) => id))
62+
const providerPeers = await this._getPeers(peers)
63+
const closerPeers = await this._getPeers(closer.map(({ id }) => id))
6364
const response = new Message(msg.type, msg.key, msg.clusterLevel)
6465

6566
if (providerPeers.length > 0) {
@@ -77,22 +78,30 @@ class GetProvidersHandler {
7778
/**
7879
* @param {PeerId} peerId
7980
*/
80-
_getAddresses (peerId) {
81-
return this._peerId.equals(peerId) ? this._addressable.multiaddrs : (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr)
81+
async _getAddresses (peerId) {
82+
return this._peerId.equals(peerId) ? this._addressable.multiaddrs : (await (this._peerStore.addressBook.get(peerId)) || []).map(address => address.multiaddr)
8283
}
8384

8485
/**
8586
* @param {PeerId[]} peerIds
86-
* @returns
8787
*/
88-
_getPeers (peerIds) {
89-
return peerIds
90-
.map((peerId) => ({
88+
async _getPeers (peerIds) {
89+
/** @type {PeerData[]} */
90+
const output = []
91+
const addrFilter = this._lan ? removePublicAddresses : removePrivateAddresses
92+
93+
for (const peerId of peerIds) {
94+
const peer = addrFilter({
9195
id: peerId,
92-
multiaddrs: this._getAddresses(peerId)
93-
}))
94-
.map(this._lan ? removePublicAddresses : removePrivateAddresses)
95-
.filter(({ multiaddrs }) => multiaddrs.length)
96+
multiaddrs: await this._getAddresses(peerId)
97+
})
98+
99+
if (peer.multiaddrs.length) {
100+
output.push(peer)
101+
}
102+
}
103+
104+
return output
96105
}
97106
}
98107

src/rpc/handlers/get-value.js

+8-7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-value')
1313
/**
1414
* @typedef {import('peer-id')} PeerId
1515
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
16+
* @typedef {import('libp2p-interfaces/src/keys/types').PublicKey} PublicKey
1617
*/
1718

1819
/**
@@ -22,7 +23,7 @@ class GetValueHandler {
2223
/**
2324
* @param {object} params
2425
* @param {PeerId} params.peerId
25-
* @param {import('../../types').PeerStore} params.peerStore
26+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
2627
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
2728
* @param {import('interface-datastore').Datastore} params.records
2829
*/
@@ -53,18 +54,18 @@ class GetValueHandler {
5354
if (utils.isPublicKeyKey(key)) {
5455
log('is public key')
5556
const idFromKey = utils.fromPublicKeyKey(key)
56-
let id
57+
/** @type {PublicKey | undefined} */
58+
let pubKey
5759

5860
if (this._peerId.equals(idFromKey)) {
59-
id = this._peerId
61+
pubKey = this._peerId.pubKey
6062
} else {
61-
const peerData = this._peerStore.get(idFromKey)
62-
id = peerData && peerData.id
63+
pubKey = await this._peerStore.keyBook.get(idFromKey)
6364
}
6465

65-
if (id && id.pubKey) {
66+
if (pubKey != null) {
6667
log('returning found public key')
67-
response.record = new Record(key, id.pubKey.bytes)
68+
response.record = new Record(key, pubKey.bytes)
6869
return response
6970
}
7071
}

src/rpc/handlers/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const { PutValueHandler } = require('./put-value')
1616
* @param {object} params
1717
* @param {import('peer-id')} params.peerId
1818
* @param {import('../../providers').Providers} params.providers
19-
* @param {import('../../types').PeerStore} params.peerStore
19+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
2020
* @param {import('../../types').Addressable} params.addressable
2121
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
2222
* @param {import('interface-datastore').Datastore} params.records

src/rpc/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class RPC {
2323
* @param {import('../routing-table').RoutingTable} params.routingTable
2424
* @param {import('peer-id')} params.peerId
2525
* @param {import('../providers').Providers} params.providers
26-
* @param {import('../types').PeerStore} params.peerStore
26+
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
2727
* @param {import('../types').Addressable} params.addressable
2828
* @param {import('../peer-routing').PeerRouting} params.peerRouting
2929
* @param {import('interface-datastore').Datastore} params.records

0 commit comments

Comments
 (0)