Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

Commit 223f79b

Browse files
Saulsaul
Saul
and
saul
authored
feat: Unix domain sockets (#208)
This PR adds the functionality to handle UNIX domain sockets. * This works with plain `/unix` multiaddresses but is unable to handle p2p encapsulated addresses (`/unix/.../p2p/...`) due to ambiguity. Because of this it causes errors when attempting to use `/unix` addresses in js-libp2p because it appends the `/p2p` address. * On Windows this uses named pipes instead. Despite the fact that it does not work with `/unix` addresses in js-libp2p directly, this still remains useful and is needed in other projects like [js-libp2p-daemon](https://github.com/libp2p/js-libp2p-daemon). Refs #132. Co-authored-by: saul <[email protected]>
1 parent 962e399 commit 223f79b

File tree

6 files changed

+49
-37
lines changed

6 files changed

+49
-37
lines changed

src/constants.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// p2p multi-address code
22
export const CODE_P2P = 421
33
export const CODE_CIRCUIT = 290
4+
export const CODE_UNIX = 400
45

56
// Time to wait for a connection to close gracefully before destroying it manually
67
export const CLOSE_TIMEOUT = 2000

src/index.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
66
import { createListener } from './listener.js'
77
import { multiaddrToNetConfig } from './utils.js'
88
import { AbortError } from '@libp2p/interfaces/errors'
9-
import { CODE_CIRCUIT, CODE_P2P } from './constants.js'
9+
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
1010
import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport'
1111
import type { Multiaddr } from '@multiformats/multiaddr'
12-
import type { Socket } from 'net'
12+
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
1313
import type { AbortOptions } from '@libp2p/interfaces'
1414
import type { Connection } from '@libp2p/interface-connection'
1515

@@ -75,19 +75,20 @@ export class TCP implements Transport {
7575

7676
return await new Promise<Socket>((resolve, reject) => {
7777
const start = Date.now()
78-
const cOpts = multiaddrToNetConfig(ma)
78+
const cOpts = multiaddrToNetConfig(ma) as (IpcSocketConnectOpts & TcpSocketConnectOpts)
79+
const cOptsStr = cOpts.path ?? `${cOpts.host ?? ''}:${cOpts.port}`
7980

8081
log('dialing %j', cOpts)
8182
const rawSocket = net.connect(cOpts)
8283

8384
const onError = (err: Error) => {
84-
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
85+
err.message = `connection error ${cOptsStr}: ${err.message}`
8586

8687
done(err)
8788
}
8889

8990
const onTimeout = () => {
90-
log('connection timeout %s:%s', cOpts.host, cOpts.port)
91+
log('connection timeout %s', cOptsStr)
9192

9293
const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
9394
// Note: this will result in onError() being called
@@ -155,6 +156,10 @@ export class TCP implements Transport {
155156
return false
156157
}
157158

159+
if (ma.protoCodes().includes(CODE_UNIX)) {
160+
return true
161+
}
162+
158163
return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P))
159164
})
160165
}

src/listener.ts

+12-12
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,19 @@ export function createListener (context: Context) {
110110
}
111111

112112
if (typeof address === 'string') {
113-
throw new Error('Incorrect server address type')
114-
}
115-
116-
try {
117-
// Because TCP will only return the IPv6 version
118-
// we need to capture from the passed multiaddr
119-
if (listeningAddr.toString().startsWith('/ip4')) {
120-
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
121-
} else if (address.family === 'IPv6') {
122-
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
113+
addrs = [listeningAddr]
114+
} else {
115+
try {
116+
// Because TCP will only return the IPv6 version
117+
// we need to capture from the passed multiaddr
118+
if (listeningAddr.toString().startsWith('/ip4')) {
119+
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
120+
} else if (address.family === 'IPv6') {
121+
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
122+
}
123+
} catch (err) {
124+
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
123125
}
124-
} catch (err) {
125-
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
126126
}
127127

128128
return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)

src/socket-to-conn.ts

+12-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { logger } from '@libp2p/logger'
44
import toIterable from 'stream-to-it'
55
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
66
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
7+
import { multiaddrToNetConfig } from './utils.js'
78
import errCode from 'err-code'
89
import type { Socket } from 'net'
910
import type { Multiaddr } from '@multiformats/multiaddr'
@@ -52,13 +53,14 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
5253
remoteAddr = toMultiaddr(socket.remoteAddress, socket.remotePort)
5354
}
5455

55-
const { host, port } = remoteAddr.toOptions()
56+
const lOpts = multiaddrToNetConfig(remoteAddr)
57+
const lOptsStr = lOpts.path ?? `${lOpts.host ?? ''}:${lOpts.port ?? ''}`
5658
const { sink, source } = toIterable.duplex(socket)
5759

5860
// by default there is no timeout
5961
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
6062
socket.setTimeout(inactivityTimeout, () => {
61-
log('%s:%s socket read timeout', host, port)
63+
log('%s socket read timeout', lOptsStr)
6264

6365
// only destroy with an error if the remote has not sent the FIN message
6466
let err: Error | undefined
@@ -72,7 +74,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
7274
})
7375

7476
socket.once('close', () => {
75-
log('%s:%s socket closed', host, port)
77+
log('%s socket read timeout', lOptsStr)
7678

7779
// In instances where `close` was not explicitly called,
7880
// such as an iterable stream ending, ensure we have set the close
@@ -119,36 +121,36 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
119121

120122
async close () {
121123
if (socket.destroyed) {
122-
log('%s:%s socket was already destroyed when trying to close', host, port)
124+
log('%s socket was already destroyed when trying to close', lOptsStr)
123125
return
124126
}
125127

126-
log('%s:%s closing socket', host, port)
128+
log('%s closing socket', lOptsStr)
127129
await new Promise<void>((resolve, reject) => {
128130
const start = Date.now()
129131

130132
// Attempt to end the socket. If it takes longer to close than the
131133
// timeout, destroy it manually.
132134
const timeout = setTimeout(() => {
133135
if (socket.destroyed) {
134-
log('%s:%s is already destroyed', host, port)
136+
log('%s is already destroyed', lOptsStr)
135137
resolve()
136138
} else {
137-
log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start)
139+
log('%s socket close timeout after %dms, destroying it manually', lOptsStr, Date.now() - start)
138140

139141
// will trigger 'error' and 'close' events that resolves promise
140142
socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT'))
141143
}
142144
}, closeTimeout).unref()
143145

144146
socket.once('close', () => {
145-
log('%s:%s socket closed', host, port)
147+
log('%s socket closed', lOptsStr)
146148
// socket completely closed
147149
clearTimeout(timeout)
148150
resolve()
149151
})
150152
socket.once('error', (err: Error) => {
151-
log('%s:%s socket error', host, port, err)
153+
log('%s socket error', lOptsStr, err)
152154

153155
// error closing socket
154156
if (maConn.timeline.close == null) {
@@ -171,7 +173,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
171173
if (socket.writableLength > 0) {
172174
// there are outgoing bytes waiting to be sent
173175
socket.once('drain', () => {
174-
log('%s:%s socket drained', host, port)
176+
log('%s socket drained', lOptsStr)
175177

176178
// all bytes have been sent we can destroy the socket (maybe) before the timeout
177179
socket.destroy()

src/utils.ts

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
import { Multiaddr } from '@multiformats/multiaddr'
2+
import type { ListenOptions, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
23
import os from 'os'
4+
import path from 'path'
35

46
const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }
57

6-
export function multiaddrToNetConfig (addr: Multiaddr) {
8+
export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) {
79
const listenPath = addr.getPath()
810

911
// unix socket listening
1012
if (listenPath != null) {
11-
// TCP should not return unix socket else need to refactor listener which accepts connection options object
12-
throw new Error('Unix Sockets are not supported by the TCP transport')
13+
if (os.platform() === 'win32') {
14+
// Use named pipes on Windows systems.
15+
return { path: path.join('\\\\.\\pipe\\', listenPath) }
16+
} else {
17+
return { path: listenPath }
18+
}
1319
}
1420

1521
// tcp listening

test/listen-dial.spec.ts

+5-7
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ describe('listen', () => {
3030
}
3131
})
3232

33-
// TCP doesn't support unix paths
34-
it.skip('listen on path', async () => {
35-
const mh = new Multiaddr(`/unix${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
33+
it('listen on path', async () => {
34+
const mh = new Multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
3635

3736
listener = tcp.createListener({
3837
upgrader
@@ -207,9 +206,8 @@ describe('dial', () => {
207206
await listener.close()
208207
})
209208

210-
// TCP doesn't support unix paths
211-
it.skip('dial on path', async () => {
212-
const ma = new Multiaddr(`/unix${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
209+
it('dial on path', async () => {
210+
const ma = new Multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
213211

214212
const listener = tcp.createListener({
215213
upgrader
@@ -226,7 +224,7 @@ describe('dial', () => {
226224
async (source) => await all(source)
227225
)
228226

229-
expect(values).to.deep.equal(['hey'])
227+
expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey'))
230228
await conn.close()
231229
await listener.close()
232230
})

0 commit comments

Comments
 (0)