Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit c49fa92

Browse files
refactor: async await (#148)
BREAKING CHANGE: Switch to using async/await and async iterators. Co-Authored-By: Jacob Heun <[email protected]>
1 parent 088534b commit c49fa92

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3111
-3855
lines changed

package.json

+25-17
Original file line numberDiff line numberDiff line change
@@ -32,42 +32,46 @@
3232
"url": "https://github.com/libp2p/js-libp2p-kad-dht/issues"
3333
},
3434
"engines": {
35-
"node": ">=6.0.0",
36-
"npm": ">=3.0.0"
35+
"node": ">=10.0.0",
36+
"npm": ">=6.0.0"
3737
},
3838
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
3939
"dependencies": {
4040
"abort-controller": "^3.0.0",
4141
"async": "^2.6.2",
4242
"base32.js": "~0.1.0",
4343
"chai-checkmark": "^1.0.1",
44-
"cids": "~0.7.0",
44+
"cids": "~0.7.1",
4545
"debug": "^4.1.1",
46-
"err-code": "^1.1.2",
46+
"err-code": "^2.0.0",
4747
"hashlru": "^2.3.0",
4848
"heap": "~0.2.6",
49-
"interface-datastore": "~0.7.0",
49+
"interface-datastore": "~0.8.0",
5050
"k-bucket": "^5.0.0",
51-
"libp2p-crypto": "~0.16.1",
52-
"libp2p-record": "~0.6.2",
53-
"multihashes": "~0.4.14",
54-
"multihashing-async": "~0.5.2",
55-
"p-queue": "^6.0.0",
51+
"libp2p-crypto": "~0.17.1",
52+
"libp2p-record": "~0.7.0",
53+
"multihashes": "~0.4.15",
54+
"multihashing-async": "~0.8.0",
55+
"p-filter": "^2.1.0",
56+
"p-map": "^3.0.0",
57+
"p-queue": "^6.2.1",
58+
"p-timeout": "^3.2.0",
5659
"p-times": "^2.1.0",
57-
"peer-id": "~0.12.2",
58-
"peer-info": "~0.15.1",
60+
"peer-id": "~0.13.5",
61+
"peer-info": "~0.17.0",
5962
"promise-to-callback": "^1.0.0",
6063
"promisify-es6": "^1.0.3",
6164
"protons": "^1.0.1",
62-
"pull-length-prefixed": "^1.3.2",
63-
"pull-stream": "^3.6.9",
65+
"pull-length-prefixed": "^1.3.3",
66+
"pull-stream": "^3.6.14",
6467
"varint": "^5.0.0",
6568
"xor-distance": "^2.0.0"
6669
},
6770
"devDependencies": {
68-
"aegir": "^20.0.0",
71+
"aegir": "^20.4.1",
6972
"chai": "^4.2.0",
7073
"datastore-level": "~0.12.1",
74+
"delay": "^4.3.0",
7175
"dirty-chai": "^2.0.1",
7276
"interface-connection": "~0.3.3",
7377
"libp2p-mplex": "~0.8.5",
@@ -76,8 +80,12 @@
7680
"lodash": "^4.17.11",
7781
"lodash.random": "^3.2.0",
7882
"lodash.range": "^3.2.0",
79-
"peer-book": "~0.9.1",
80-
"sinon": "^7.3.1"
83+
"p-defer": "^3.0.0",
84+
"p-each-series": "^2.1.0",
85+
"p-map-series": "^2.1.0",
86+
"p-retry": "^4.2.0",
87+
"peer-book": "~0.9.2",
88+
"sinon": "^7.5.0"
8189
},
8290
"contributors": [
8391
"Alan Shaw <[email protected]>",

src/content-fetching/index.js

+278
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
'use strict'
2+
3+
const errcode = require('err-code')
4+
5+
const pFilter = require('p-filter')
6+
const pTimeout = require('p-timeout')
7+
8+
const libp2pRecord = require('libp2p-record')
9+
10+
const c = require('../constants')
11+
const Query = require('../query')
12+
13+
const utils = require('../utils')
14+
15+
const Record = libp2pRecord.Record
16+
17+
module.exports = (dht) => {
18+
const putLocal = async (key, rec) => { // eslint-disable-line require-await
19+
return dht.datastore.put(utils.bufferToKey(key), rec)
20+
}
21+
22+
/**
23+
* Attempt to retrieve the value for the given key from
24+
* the local datastore.
25+
*
26+
* @param {Buffer} key
27+
* @returns {Promise<Record>}
28+
*
29+
* @private
30+
*/
31+
const getLocal = async (key) => {
32+
dht._log('getLocal %b', key)
33+
34+
const raw = await dht.datastore.get(utils.bufferToKey(key))
35+
dht._log('found %b in local datastore', key)
36+
const rec = Record.deserialize(raw)
37+
38+
await dht._verifyRecordLocally(rec)
39+
return rec
40+
}
41+
42+
/**
43+
* Send the best record found to any peers that have an out of date record.
44+
*
45+
* @param {Buffer} key
46+
* @param {Array<Object>} vals - values retrieved from the DHT
47+
* @param {Object} best - the best record that was found
48+
* @returns {Promise}
49+
*
50+
* @private
51+
*/
52+
const sendCorrectionRecord = async (key, vals, best) => {
53+
const fixupRec = await utils.createPutRecord(key, best)
54+
55+
return Promise.all(vals.map(async (v) => {
56+
// no need to do anything
57+
if (v.val.equals(best)) {
58+
return
59+
}
60+
61+
// correct ourself
62+
if (dht._isSelf(v.from)) {
63+
try {
64+
await dht._putLocal(key, fixupRec)
65+
} catch (err) {
66+
dht._log.error('Failed error correcting self', err)
67+
}
68+
return
69+
}
70+
71+
// send correction
72+
try {
73+
await dht._putValueToPeer(key, fixupRec, v.from)
74+
} catch (err) {
75+
dht._log.error('Failed error correcting entry', err)
76+
}
77+
}))
78+
}
79+
80+
return {
81+
/**
82+
* Store the given key/value pair locally, in the datastore.
83+
* @param {Buffer} key
84+
* @param {Buffer} rec - encoded record
85+
* @returns {Promise<void>}
86+
* @private
87+
*/
88+
async _putLocal (key, rec) { // eslint-disable-line require-await
89+
return putLocal(key, rec)
90+
},
91+
92+
/**
93+
* Store the given key/value pair in the DHT.
94+
*
95+
* @param {Buffer} key
96+
* @param {Buffer} value
97+
* @param {Object} [options] - put options
98+
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
99+
* @returns {Promise<void>}
100+
*/
101+
async put (key, value, options = {}) {
102+
dht._log('PutValue %b', key)
103+
104+
// create record in the dht format
105+
const record = await utils.createPutRecord(key, value)
106+
107+
// store the record locally
108+
await putLocal(key, record)
109+
110+
// put record to the closest peers
111+
const peers = await dht.getClosestPeers(key, { shallow: true })
112+
const results = await pFilter(peers, async (peer) => {
113+
try {
114+
await dht._putValueToPeer(key, record, peer)
115+
return true
116+
} catch (err) {
117+
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
118+
return false
119+
}
120+
})
121+
122+
// verify if we were able to put to enough peers
123+
const minPeers = options.minPeers || peers.length // Ensure we have a default `minPeers`
124+
125+
if (minPeers > results.length) {
126+
const error = errcode(new Error(`Failed to put value to enough peers: ${results.length}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS')
127+
dht._log.error(error)
128+
throw error
129+
}
130+
},
131+
132+
/**
133+
* Get the value to the given key.
134+
* Times out after 1 minute by default.
135+
*
136+
* @param {Buffer} key
137+
* @param {Object} [options] - get options
138+
* @param {number} [options.timeout] - optional timeout (default: 60000)
139+
* @returns {Promise<{from: PeerId, val: Buffer}>}
140+
*/
141+
async get (key, options = {}) {
142+
options.timeout = options.timeout || c.minute
143+
144+
dht._log('_get %b', key)
145+
146+
const vals = await dht.getMany(key, c.GET_MANY_RECORD_COUNT, options)
147+
const recs = vals.map((v) => v.val)
148+
let i = 0
149+
150+
try {
151+
i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs)
152+
} catch (err) {
153+
// Assume the first record if no selector available
154+
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') {
155+
throw err
156+
}
157+
}
158+
159+
const best = recs[i]
160+
dht._log('GetValue %b %s', key, best)
161+
162+
if (!best) {
163+
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND')
164+
}
165+
166+
await sendCorrectionRecord(key, vals, best)
167+
168+
return best
169+
},
170+
171+
/**
172+
* Get the `n` values to the given key without sorting.
173+
*
174+
* @param {Buffer} key
175+
* @param {number} nvals
176+
* @param {Object} [options] - get options
177+
* @param {number} [options.timeout] - optional timeout (default: 60000)
178+
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
179+
*/
180+
async getMany (key, nvals, options = {}) {
181+
options.timeout = options.timeout || c.minute
182+
183+
dht._log('getMany %b (%s)', key, nvals)
184+
185+
let vals = []
186+
let localRec
187+
188+
try {
189+
localRec = await getLocal(key)
190+
} catch (err) {
191+
if (nvals === 0) {
192+
throw err
193+
}
194+
}
195+
196+
if (localRec) {
197+
vals.push({
198+
val: localRec.value,
199+
from: dht.peerInfo.id
200+
})
201+
}
202+
203+
if (vals.length >= nvals) {
204+
return vals
205+
}
206+
207+
const paths = []
208+
const id = await utils.convertBuffer(key)
209+
const rtp = dht.routingTable.closestPeers(id, this.kBucketSize)
210+
211+
dht._log('peers in rt: %d', rtp.length)
212+
213+
if (rtp.length === 0) {
214+
const errMsg = 'Failed to lookup key! No peers from routing table!'
215+
216+
dht._log.error(errMsg)
217+
throw errcode(new Error(errMsg), 'ERR_NO_PEERS_IN_ROUTING_TABLE')
218+
}
219+
220+
// we have peers, lets do the actual query to them
221+
const query = new Query(dht, key, (pathIndex, numPaths) => {
222+
// This function body runs once per disjoint path
223+
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
224+
const pathVals = []
225+
paths.push(pathVals)
226+
227+
// Here we return the query function to use on this particular disjoint path
228+
return async (peer) => {
229+
let rec, peers, lookupErr
230+
try {
231+
const results = await dht._getValueOrPeers(peer, key)
232+
rec = results.record
233+
peers = results.peers
234+
} catch (err) {
235+
// If we have an invalid record we just want to continue and fetch a new one.
236+
if (err.code !== 'ERR_INVALID_RECORD') {
237+
throw err
238+
}
239+
lookupErr = err
240+
}
241+
242+
const res = { closerPeers: peers }
243+
244+
if ((rec && rec.value) || lookupErr) {
245+
pathVals.push({
246+
val: rec && rec.value,
247+
from: peer
248+
})
249+
}
250+
251+
// enough is enough
252+
if (pathVals.length >= pathSize) {
253+
res.pathComplete = true
254+
}
255+
256+
return res
257+
}
258+
})
259+
260+
let error
261+
try {
262+
await pTimeout(query.run(rtp), options.timeout)
263+
} catch (err) {
264+
error = err
265+
}
266+
query.stop()
267+
268+
// combine vals from each path
269+
vals = [].concat.apply(vals, paths).slice(0, nvals)
270+
271+
if (error && vals.length === 0) {
272+
throw error
273+
}
274+
275+
return vals
276+
}
277+
}
278+
}

0 commit comments

Comments
 (0)