Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 810be4d

Browse files
authored
fix: no more circular dependency, become a good block of libp2p (#13)
* fix: no more circular dependency, become a good block of libp2p * there were more tests :) fix remaining things, test with new swarm ✔️ * apply cr * fix docs * apply cr
1 parent 9c2e022 commit 810be4d

30 files changed

+329
-462
lines changed

package.json

+3-10
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
"protocol-buffers": "^3.2.1",
5959
"pull-length-prefixed": "^1.3.0",
6060
"pull-stream": "^3.6.0",
61+
"safe-buffer": "^5.1.1",
6162
"varint": "^5.0.0",
6263
"xor-distance": "^1.0.0"
6364
},
@@ -67,17 +68,9 @@
6768
"datastore-level": "^0.4.2",
6869
"dirty-chai": "^2.0.1",
6970
"interface-connection": "^0.3.2",
70-
"left-pad": "^1.1.3",
71-
"libp2p": "^0.10.1",
72-
"libp2p-mdns": "^0.7.1",
7371
"libp2p-multiplex": "^0.4.4",
74-
"libp2p-railing": "^0.5.2",
75-
"libp2p-secio": "^0.6.8",
76-
"libp2p-spdy": "^0.10.6",
77-
"libp2p-swarm": "^0.29.2",
72+
"libp2p-swarm": "^0.30.0",
7873
"libp2p-tcp": "^0.10.1",
79-
"libp2p-webrtc-star": "^0.11.0",
80-
"libp2p-websockets": "^0.10.0",
8174
"lodash": "^4.17.4",
8275
"lodash.random": "^3.2.0",
8376
"lodash.range": "^3.2.0",
@@ -90,4 +83,4 @@
9083
"Friedel Ziegelmayer <[email protected]>",
9184
"Pedro Teixeira <[email protected]>"
9285
]
93-
}
86+
}

src/index.js

+42-49
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const errors = require('./errors')
1818
const privateApi = require('./private')
1919
const Providers = require('./providers')
2020
const Message = require('./message')
21+
const assert = require('assert')
2122

2223
/**
2324
* A DHT implementation modeled after Kademlia with Coral and S/Kademlia modifications.
@@ -28,70 +29,74 @@ class KadDHT {
2829
/**
2930
* Create a new KadDHT.
3031
*
31-
* @param {Libp2p} libp2p
32-
* @param {number} [kBucketSize=20]
33-
* @param {Datastore} [datastore=MemoryDatastore]
32+
* @param {swarm} Swarm
33+
* @param {options} {kBucketSize=20, datastore=MemoryDatastore}
3434
*/
35-
constructor (libp2p, kBucketSize, datastore) {
35+
constructor (swarm, options) {
36+
assert(swarm, 'libp2p-kad-dht requires a instance of swarmt a')
37+
options = options || {}
38+
3639
/**
37-
* Local reference to libp2p.
40+
* Local reference to libp2p-swarm.
3841
*
39-
* @type {Libp2p}
42+
* @type {Swarm}
4043
*/
41-
this.libp2p = libp2p
44+
this.swarm = swarm
4245

4346
/**
4447
* k-bucket size, defaults to 20.
4548
*
4649
* @type {number}
4750
*/
48-
this.kBucketSize = kBucketSize || 20
51+
this.kBucketSize = options.kBucketSize || 20
4952

5053
/**
51-
* Number of closest peers to return on kBucket search
54+
* Number of closest peers to return on kBucket search, default 6
5255
*
5356
* @type {number}
5457
*/
55-
this.ncp = 6
58+
this.ncp = options.ncp || 6
5659

5760
/**
5861
* The routing table.
5962
*
6063
* @type {RoutingTable}
6164
*/
62-
this.routingTable = new RoutingTable(this.self.id, this.kBucketSize)
65+
this.routingTable = new RoutingTable(this.peerInfo.id, this.kBucketSize)
6366

6467
/**
6568
* Reference to the datastore, uses an in-memory store if none given.
6669
*
6770
* @type {Datastore}
6871
*/
69-
this.datastore = datastore || new MemoryStore()
72+
this.datastore = options.datastore || new MemoryStore()
7073

7174
/**
7275
* Provider management
7376
*
7477
* @type {Providers}
7578
*/
76-
this.providers = new Providers(this.datastore, this.self.id)
77-
78-
this.validators = {
79-
pk: libp2pRecord.validator.validators.pk
80-
}
79+
this.providers = new Providers(this.datastore, this.peerInfo.id)
8180

82-
this.selectors = {
83-
pk: libp2pRecord.selection.selectors.pk
84-
}
81+
this.validators = { pk: libp2pRecord.validator.validators.pk }
82+
this.selectors = { pk: libp2pRecord.selection.selectors.pk }
8583

86-
this.network = new Network(this, this.libp2p)
84+
this.network = new Network(this)
8785

88-
this._log = utils.logger(this.self.id)
86+
this._log = utils.logger(this.peerInfo.id)
8987

9088
// Inject private apis so we don't clutter up this file
9189
const pa = privateApi(this)
92-
Object.keys(pa).forEach((name) => {
93-
this[name] = pa[name]
94-
})
90+
Object.keys(pa).forEach((name) => { this[name] = pa[name] })
91+
}
92+
93+
/**
94+
* Is this DHT running.
95+
*
96+
* @type {bool}
97+
*/
98+
get isStarted () {
99+
return this._running
95100
}
96101

97102
/**
@@ -118,29 +123,17 @@ class KadDHT {
118123
this.network.stop(callback)
119124
}
120125

121-
/**
122-
* Alias to the peerbook from libp2p
123-
*/
124-
get peerBook () {
125-
return this.libp2p.peerBook
126-
}
127-
128-
/**
129-
* Is this DHT running.
130-
*
131-
* @type {bool}
132-
*/
133-
get isRunning () {
134-
return this._running
135-
}
136-
137126
/**
138127
* Local peer (yourself)
139128
*
140129
* @type {PeerInfo}
141130
*/
142-
get self () {
143-
return this.libp2p.peerInfo
131+
get peerInfo () {
132+
return this.swarm._peerInfo
133+
}
134+
135+
get peerBook () {
136+
return this.swarm._peerBook
144137
}
145138

146139
/**
@@ -205,7 +198,7 @@ class KadDHT {
205198
}
206199

207200
waterfall([
208-
(cb) => utils.createPutRecord(key, value, this.self.id, sign, cb),
201+
(cb) => utils.createPutRecord(key, value, this.peerInfo.id, sign, cb),
209202
(rec, cb) => waterfall([
210203
(cb) => this._putLocal(key, rec, cb),
211204
(cb) => this.getClosestPeers(key, cb),
@@ -266,7 +259,7 @@ class KadDHT {
266259
if (err == null) {
267260
vals.push({
268261
val: localRec.value,
269-
from: this.self.id
262+
from: this.peerInfo.id
270263
})
271264
}
272265

@@ -342,7 +335,7 @@ class KadDHT {
342335
// local check
343336
let info
344337
if (this.peerBook.has(peer)) {
345-
info = this.libp2p.peerBook.get(peer)
338+
info = this.peerBook.get(peer)
346339

347340
if (info && info.id.pubKey) {
348341
this._log('getPublicKey: found local copy')
@@ -355,7 +348,7 @@ class KadDHT {
355348
this._getPublicKeyFromNode(peer, (err, pk) => {
356349
if (!err) {
357350
info.id = new PeerId(peer.id, null, pk)
358-
this.libp2p.peerBook.put(info)
351+
this.peerBook.put(info)
359352

360353
return callback(null, pk)
361354
}
@@ -369,7 +362,7 @@ class KadDHT {
369362

370363
const pk = crypto.unmarshalPublicKey(value)
371364
info.id = new PeerId(peer, null, pk)
372-
this.libp2p.peerBook.put(info)
365+
this.peerBook.put(info)
373366

374367
callback(null, pk)
375368
})
@@ -389,7 +382,7 @@ class KadDHT {
389382
this._log('provide: %s', key.toBaseEncodedString())
390383

391384
waterfall([
392-
(cb) => this.providers.addProvider(key, this.self.id, cb),
385+
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
393386
(cb) => this.getClosestPeers(key.buffer, cb),
394387
(peers, cb) => {
395388
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)

src/network.js

+22-22
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@ class Network {
1717
/**
1818
* Create a new network.
1919
*
20-
* @param {DHT} dht
21-
* @param {Libp2p} libp2p
20+
* @param {KadDHT} self
2221
*/
23-
constructor (dht, libp2p) {
24-
this.dht = dht
25-
this.libp2p = libp2p
22+
constructor (self) {
23+
this.dht = self
2624
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
27-
this._log = utils.logger(this.dht.self.id, 'net')
25+
this._log = utils.logger(this.dht.peerInfo.id, 'net')
2826
this._rpc = rpc(this.dht)
2927
this._onPeerConnected = this._onPeerConnected.bind(this)
30-
this._online = false
28+
this._running = false
3129
}
3230

3331
/**
@@ -43,17 +41,18 @@ class Network {
4341
return cb(new Error('Network is already running'))
4442
}
4543

46-
if (!this.dht.isRunning || !this.dht.libp2p.isStarted()) {
44+
// TODO add a way to check if swarm has started or not
45+
if (!this.dht.isStarted) {
4746
return cb(new Error('Can not start network'))
4847
}
4948

50-
this._online = true
49+
this._running = true
5150

5251
// handle incoming connections
53-
this.libp2p.swarm.handle(c.PROTOCOL_DHT, this._rpc)
52+
this.dht.swarm.handle(c.PROTOCOL_DHT, this._rpc)
5453

5554
// handle new connections
56-
this.libp2p.on('peer:connect', this._onPeerConnected)
55+
this.dht.swarm.on('peer-mux-established', this._onPeerConnected)
5756

5857
cb()
5958
}
@@ -67,13 +66,13 @@ class Network {
6766
stop (callback) {
6867
const cb = (err) => setImmediate(() => callback(err))
6968

70-
if (!this.isOnline) {
69+
if (!this.dht.isStarted && !this.isStarted) {
7170
return cb(new Error('Network is already stopped'))
7271
}
73-
this._online = false
74-
this.libp2p.removeListener('peer:connect', this._onPeerConnected)
72+
this._running = false
73+
this.dht.swarm.removeListener('peer-mux-established', this._onPeerConnected)
7574

76-
this.libp2p.swarm.unhandle(c.PROTOCOL_DHT)
75+
this.dht.swarm.unhandle(c.PROTOCOL_DHT)
7776
cb()
7877
}
7978

@@ -82,8 +81,8 @@ class Network {
8281
*
8382
* @type {bool}
8483
*/
85-
get isOnline () {
86-
return this._online
84+
get isStarted () {
85+
return this._running
8786
}
8887

8988
/**
@@ -92,7 +91,8 @@ class Network {
9291
* @type {bool}
9392
*/
9493
get isConnected () {
95-
return this.dht.libp2p.isStarted() && this.dht.isRunning && this.isOnline
94+
// TODO add a way to check if swarm has started or not
95+
return this.dht.isStarted && this.isStarted
9696
}
9797

9898
/**
@@ -107,7 +107,7 @@ class Network {
107107
return this._log.error('Network is offline')
108108
}
109109

110-
this.libp2p.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
110+
this.dht.swarm.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
111111
if (err) {
112112
return this._log('%s does not support protocol: %s', peer.id.toB58String(), c.PROTOCOL_DHT)
113113
}
@@ -140,7 +140,7 @@ class Network {
140140
}
141141

142142
this._log('sending to: %s', to.toB58String())
143-
this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => {
143+
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
144144
if (err) {
145145
return callback(err)
146146
}
@@ -159,12 +159,12 @@ class Network {
159159
*/
160160
sendMessage (to, msg, callback) {
161161
if (!this.isConnected) {
162-
return callback(new Error('Network is offline'))
162+
return setImmediate(() => callback(new Error('Network is offline')))
163163
}
164164

165165
this._log('sending to: %s', to.toB58String())
166166

167-
this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => {
167+
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
168168
if (err) {
169169
return callback(err)
170170
}

src/private.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ module.exports = (dht) => ({
128128
// 5. check validity
129129

130130
// 5. if: we are the author, all good
131-
if (record.author.isEqual(dht.self.id)) {
131+
if (record.author.isEqual(dht.peerInfo.id)) {
132132
return callback(null, record)
133133
}
134134

@@ -226,7 +226,7 @@ module.exports = (dht) => ({
226226
* @private
227227
*/
228228
_isSelf (other) {
229-
return other && dht.self.id.id.equals(other.id)
229+
return other && dht.peerInfo.id.id.equals(other.id)
230230
},
231231
/**
232232
* Ask peer `peer` if they know where the peer with id `target` is.
@@ -308,7 +308,7 @@ module.exports = (dht) => ({
308308

309309
// Send out correction record
310310
waterfall([
311-
(cb) => utils.createPutRecord(key, best, dht.self.id, true, cb),
311+
(cb) => utils.createPutRecord(key, best, dht.peerInfo.id, true, cb),
312312
(fixupRec, cb) => each(vals, (v, cb) => {
313313
// no need to do anything
314314
if (v.val.equals(best)) {
@@ -523,7 +523,7 @@ module.exports = (dht) => ({
523523
(cb) => dht._findProvidersSingle(peer, key, cb),
524524
(msg, cb) => {
525525
const provs = msg.providerPeers
526-
dht._log('(%s) found %s provider entries', dht.self.id.toB58String(), provs.length)
526+
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)
527527

528528
provs.forEach((prov) => {
529529
out.push(dht.peerBook.put(prov))

src/query.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Query {
2525
this.key = key
2626
this.query = query
2727
this.concurrency = c.ALPHA
28-
this._log = utils.logger(this.dht.self.id, 'query:' + key.toString())
28+
this._log = utils.logger(this.dht.peerInfo.id, 'query:' + key.toString())
2929
}
3030

3131
/**

0 commit comments

Comments
 (0)