@@ -18,6 +18,8 @@ import type { Logger } from '@libp2p/logger'
18
18
import type { Duplex } from 'it-stream-types'
19
19
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
20
20
import { Components , Initializable } from '@libp2p/interfaces/components'
21
+ import type { Stream } from '@libp2p/interfaces/connection'
22
+ import { abortableDuplex } from 'abortable-iterator'
21
23
22
24
export interface NetworkInit {
23
25
protocol : string
@@ -87,15 +89,17 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
87
89
}
88
90
89
91
this . log ( 'sending %s to %p' , msg . type , to )
92
+ yield dialingPeerEvent ( { peer : to } )
93
+ yield sendingQueryEvent ( { to, type : msg . type } )
90
94
91
- try {
92
- yield dialingPeerEvent ( { peer : to } )
93
-
94
- const { stream } = await this . components . getDialer ( ) . dialProtocol ( to , this . protocol , options )
95
+ let stream : Stream | undefined
95
96
96
- yield sendingQueryEvent ( { to, type : msg . type } )
97
+ try {
98
+ const connection = await this . components . getConnectionManager ( ) . openConnection ( to , options )
99
+ const streamData = await connection . newStream ( this . protocol )
100
+ stream = streamData . stream
97
101
98
- const response = await this . _writeReadMessage ( stream , msg . serialize ( ) )
102
+ const response = await this . _writeReadMessage ( stream , msg . serialize ( ) , options )
99
103
100
104
yield peerResponseEvent ( {
101
105
from : to ,
@@ -106,6 +110,10 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
106
110
} )
107
111
} catch ( err : any ) {
108
112
yield queryErrorEvent ( { from : to , error : err } )
113
+ } finally {
114
+ if ( stream != null ) {
115
+ stream . close ( )
116
+ }
109
117
}
110
118
}
111
119
@@ -118,26 +126,36 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
118
126
}
119
127
120
128
this . log ( 'sending %s to %p' , msg . type , to )
121
-
122
129
yield dialingPeerEvent ( { peer : to } )
123
-
124
- const { stream } = await this . components . getDialer ( ) . dialProtocol ( to , this . protocol , options )
125
-
126
130
yield sendingQueryEvent ( { to, type : msg . type } )
127
131
132
+ let stream : Stream | undefined
133
+
128
134
try {
129
- await this . _writeMessage ( stream , msg . serialize ( ) )
135
+ const connection = await this . components . getConnectionManager ( ) . openConnection ( to , options )
136
+ const data = await connection . newStream ( this . protocol )
137
+ stream = data . stream
138
+
139
+ await this . _writeMessage ( stream , msg . serialize ( ) , options )
130
140
131
141
yield peerResponseEvent ( { from : to , messageType : msg . type } )
132
142
} catch ( err : any ) {
133
143
yield queryErrorEvent ( { from : to , error : err } )
144
+ } finally {
145
+ if ( stream != null ) {
146
+ stream . close ( )
147
+ }
134
148
}
135
149
}
136
150
137
151
/**
138
152
* Write a message to the given stream
139
153
*/
140
- async _writeMessage ( stream : Duplex < Uint8Array > , msg : Uint8Array ) {
154
+ async _writeMessage ( stream : Duplex < Uint8Array > , msg : Uint8Array , options : AbortOptions ) {
155
+ if ( options . signal != null ) {
156
+ stream = abortableDuplex ( stream , options . signal )
157
+ }
158
+
141
159
await pipe (
142
160
[ msg ] ,
143
161
lp . encode ( ) ,
@@ -151,7 +169,11 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
151
169
* If no response is received after the specified timeout
152
170
* this will error out.
153
171
*/
154
- async _writeReadMessage ( stream : Duplex < Uint8Array > , msg : Uint8Array ) {
172
+ async _writeReadMessage ( stream : Duplex < Uint8Array > , msg : Uint8Array , options : AbortOptions ) {
173
+ if ( options . signal != null ) {
174
+ stream = abortableDuplex ( stream , options . signal )
175
+ }
176
+
155
177
const res = await pipe (
156
178
[ msg ] ,
157
179
lp . encode ( ) ,
0 commit comments