Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 534a2d9

Browse files
authored
fix: use utils.mapParallel for parallel processing of peers (#166)
1 parent eacdc6b commit 534a2d9

File tree

3 files changed

+23
-4
lines changed

3 files changed

+23
-4
lines changed

src/content-fetching/index.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,15 @@ module.exports = (dht) => {
109109
let counterAll = 0
110110
let counterSuccess = 0
111111

112-
for await (const peer of dht.getClosestPeers(key, { shallow: true })) {
112+
await utils.mapParallel(dht.getClosestPeers(key, { shallow: true }), async (peer) => {
113113
try {
114114
counterAll += 1
115115
await dht._putValueToPeer(key, record, peer)
116116
counterSuccess += 1
117117
} catch (err) {
118118
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
119119
}
120-
}
120+
})
121121

122122
// verify if we were able to put to enough peers
123123
const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers`

src/content-routing/index.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ module.exports = (dht) => {
4444
}]
4545

4646
// Notify closest peers
47-
for await (const peer of dht.getClosestPeers(key.buffer)) {
47+
await utils.mapParallel(dht.getClosestPeers(key.buffer), async (peer) => {
4848
dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
4949
try {
5050
await dht.network.sendMessage(peer, msg)
5151
} catch (err) {
5252
errors.push(err)
5353
}
54-
}
54+
})
5555

5656
if (errors.length) {
5757
// TODO:

src/utils.js

+19
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,22 @@ exports.withTimeout = (asyncFn, time) => {
206206
])
207207
}
208208
}
209+
210+
/**
211+
* Iterates the given `asyncIterator` and runs each item through the given `asyncFn` in parallel.
212+
* Returns a promise that resolves when all items of the `asyncIterator` have been passed
213+
* through `asyncFn`.
214+
*
215+
* @param {AsyncIterable} [asyncIterator]
216+
* @param {Function} [asyncFn]
217+
* @returns {Array}
218+
*
219+
* @private
220+
*/
221+
exports.mapParallel = async function (asyncIterator, asyncFn) {
222+
const tasks = []
223+
for await (const item of asyncIterator) {
224+
tasks.push(asyncFn(item))
225+
}
226+
return Promise.all(tasks)
227+
}

0 commit comments

Comments
 (0)