3
3
const errcode = require ( 'err-code' )
4
4
const pTimeout = require ( 'p-timeout' )
5
5
const uint8ArrayEquals = require ( 'uint8arrays/equals' )
6
+ const uint8ArrayToString = require ( 'uint8arrays/to-string' )
6
7
const libp2pRecord = require ( 'libp2p-record' )
7
-
8
8
const c = require ( '../constants' )
9
9
const Query = require ( '../query' )
10
-
11
10
const utils = require ( '../utils' )
12
-
13
11
const Record = libp2pRecord . Record
14
12
13
+ /**
14
+ * @typedef {import('peer-id') } PeerId
15
+ * @typedef {import('../query').DHTQueryResult } DHTQueryResult
16
+ */
17
+
18
+ /**
19
+ * @param {import('../') } dht
20
+ */
15
21
module . exports = ( dht ) => {
22
+ /**
23
+ * @param {Uint8Array } key
24
+ * @param {Uint8Array } rec
25
+ */
16
26
const putLocal = async ( key , rec ) => { // eslint-disable-line require-await
17
27
return dht . datastore . put ( utils . bufferToKey ( key ) , rec )
18
28
}
@@ -22,30 +32,26 @@ module.exports = (dht) => {
22
32
* the local datastore.
23
33
*
24
34
* @param {Uint8Array } key
25
- * @returns {Promise<Record> }
26
- *
27
- * @private
28
35
*/
29
36
const getLocal = async ( key ) => {
30
- dht . _log ( ' getLocal %b' , key )
37
+ dht . _log ( ` getLocal ${ uint8ArrayToString ( key , 'base32' ) } ` )
31
38
32
39
const raw = await dht . datastore . get ( utils . bufferToKey ( key ) )
33
- dht . _log ( 'found %b in local datastore' , key )
40
+ dht . _log ( `found ${ uint8ArrayToString ( key , 'base32' ) } in local datastore` )
41
+
34
42
const rec = Record . deserialize ( raw )
35
43
36
44
await dht . _verifyRecordLocally ( rec )
45
+
37
46
return rec
38
47
}
39
48
40
49
/**
41
50
* Send the best record found to any peers that have an out of date record.
42
51
*
43
52
* @param {Uint8Array } key
44
- * @param {Array<Object> } vals - values retrieved from the DHT
45
- * @param {Object } best - the best record that was found
46
- * @returns {Promise }
47
- *
48
- * @private
53
+ * @param {import('../query').DHTQueryValue[] } vals - values retrieved from the DHT
54
+ * @param {Uint8Array } best - the best record that was found
49
55
*/
50
56
const sendCorrectionRecord = async ( key , vals , best ) => {
51
57
const fixupRec = await utils . createPutRecord ( key , best )
@@ -78,10 +84,9 @@ module.exports = (dht) => {
78
84
return {
79
85
/**
80
86
* Store the given key/value pair locally, in the datastore.
87
+ *
81
88
* @param {Uint8Array } key
82
89
* @param {Uint8Array } rec - encoded record
83
- * @returns {Promise<void> }
84
- * @private
85
90
*/
86
91
async _putLocal ( key , rec ) { // eslint-disable-line require-await
87
92
return putLocal ( key , rec )
@@ -92,9 +97,8 @@ module.exports = (dht) => {
92
97
*
93
98
* @param {Uint8Array } key
94
99
* @param {Uint8Array } value
95
- * @param {Object } [options] - put options
100
+ * @param {object } [options] - put options
96
101
* @param {number } [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
97
- * @returns {Promise<void> }
98
102
*/
99
103
async put ( key , value , options = { } ) {
100
104
dht . _log ( 'PutValue %b' , key )
@@ -134,9 +138,8 @@ module.exports = (dht) => {
134
138
* Times out after 1 minute by default.
135
139
*
136
140
* @param {Uint8Array } key
137
- * @param {Object } [options] - get options
141
+ * @param {object } [options] - get options
138
142
* @param {number } [options.timeout] - optional timeout (default: 60000)
139
- * @returns {Promise<Uint8Array> }
140
143
*/
141
144
async get ( key , options = { } ) {
142
145
options . timeout = options . timeout || c . minute
@@ -173,16 +176,15 @@ module.exports = (dht) => {
173
176
*
174
177
* @param {Uint8Array } key
175
178
* @param {number } nvals
176
- * @param {Object } [options] - get options
179
+ * @param {object } [options] - get options
177
180
* @param {number } [options.timeout] - optional timeout (default: 60000)
178
- * @returns {Promise<Array<{from: PeerId, val: Uint8Array}>> }
179
181
*/
180
182
async getMany ( key , nvals , options = { } ) {
181
183
options . timeout = options . timeout || c . minute
182
184
183
185
dht . _log ( 'getMany %b (%s)' , key , nvals )
184
186
185
- let vals = [ ]
187
+ const vals = [ ]
186
188
let localRec
187
189
188
190
try {
@@ -204,9 +206,8 @@ module.exports = (dht) => {
204
206
return vals
205
207
}
206
208
207
- const paths = [ ]
208
209
const id = await utils . convertBuffer ( key )
209
- const rtp = dht . routingTable . closestPeers ( id , this . kBucketSize )
210
+ const rtp = dht . routingTable . closestPeers ( id , dht . kBucketSize )
210
211
211
212
dht . _log ( 'peers in rt: %d' , rtp . length )
212
213
@@ -220,15 +221,23 @@ module.exports = (dht) => {
220
221
return vals
221
222
}
222
223
223
- // we have peers, lets do the actual query to them
224
- const query = new Query ( dht , key , ( pathIndex , numPaths ) => {
225
- // This function body runs once per disjoint path
226
- const pathSize = utils . pathSize ( nvals - vals . length , numPaths )
227
- const pathVals = [ ]
228
- paths . push ( pathVals )
224
+ const valsLength = vals . length
229
225
230
- // Here we return the query function to use on this particular disjoint path
231
- return async ( peer ) => {
226
+ /**
227
+ * @param {number } pathIndex
228
+ * @param {number } numPaths
229
+ */
230
+ function createQuery ( pathIndex , numPaths ) {
231
+ // This function body runs once per disjoint path
232
+ const pathSize = utils . pathSize ( nvals - valsLength , numPaths )
233
+ let queryResults = 0
234
+
235
+ /**
236
+ * Here we return the query function to use on this particular disjoint path
237
+ *
238
+ * @param {PeerId } peer
239
+ */
240
+ async function disjointPathQuery ( peer ) {
232
241
let rec , peers , lookupErr
233
242
try {
234
243
const results = await dht . _getValueOrPeers ( peer , key )
@@ -242,37 +251,49 @@ module.exports = (dht) => {
242
251
lookupErr = err
243
252
}
244
253
245
- const res = { closerPeers : peers }
254
+ /** @type {import('../query').QueryResult } */
255
+ const res = {
256
+ closerPeers : peers
257
+ }
258
+
259
+ if ( rec && rec . value ) {
260
+ vals . push ( {
261
+ val : rec . value ,
262
+ from : peer
263
+ } )
246
264
247
- if ( ( rec && rec . value ) || lookupErr ) {
248
- pathVals . push ( {
249
- val : rec && rec . value ,
265
+ queryResults ++
266
+ } else if ( lookupErr ) {
267
+ vals . push ( {
268
+ err : lookupErr ,
250
269
from : peer
251
270
} )
271
+
272
+ queryResults ++
252
273
}
253
274
254
275
// enough is enough
255
- if ( pathVals . length >= pathSize ) {
276
+ if ( queryResults >= pathSize ) {
256
277
res . pathComplete = true
257
278
}
258
279
259
280
return res
260
281
}
261
- } )
262
282
263
- let error
264
- try {
265
- await pTimeout ( query . run ( rtp ) , options . timeout )
266
- } catch ( err ) {
267
- error = err
283
+ return disjointPathQuery
268
284
}
269
- query . stop ( )
270
285
271
- // combine vals from each path
272
- vals = [ ] . concat . apply ( vals , paths ) . slice ( 0 , nvals )
286
+ // we have peers, lets send the actual query to them
287
+ const query = new Query ( dht , key , createQuery )
273
288
274
- if ( error && vals . length === 0 ) {
275
- throw error
289
+ try {
290
+ await pTimeout ( query . run ( rtp ) , options . timeout )
291
+ } catch ( err ) {
292
+ if ( vals . length === 0 ) {
293
+ throw err
294
+ }
295
+ } finally {
296
+ query . stop ( )
276
297
}
277
298
278
299
return vals
0 commit comments