1
1
// deno-lint-ignore-file no-explicit-any
2
2
import { chunk } from "@std/collections/chunk" ;
3
3
import { concat } from "@std/bytes/concat" ;
4
- import { writeAll } from "@std/io/write-all" ;
5
- import type { Reader , Writer } from "@std/io/types" ;
6
4
7
5
/**
8
6
* A Redis client for interacting with a Redis server.
@@ -85,13 +83,12 @@ function createRequest(command: Command): Uint8Array {
85
83
return concat ( lines ) ;
86
84
}
87
85
88
- async function * readLines ( reader : Reader ) : AsyncIterableIterator < Uint8Array > {
89
- const buffer = new Uint8Array ( 1024 ) ;
90
- let chunks = new Uint8Array ( ) ;
91
- while ( true ) {
92
- const result = await reader . read ( buffer ) ;
93
- if ( result === null ) break ;
94
- chunks = concat ( [ chunks , buffer . subarray ( 0 , result ) ] ) ;
86
+ async function * readLines (
87
+ readable : ReadableStream < Uint8Array < ArrayBufferLike > > ,
88
+ ) {
89
+ let chunks : Uint8Array < ArrayBufferLike > = new Uint8Array ( new ArrayBuffer ( 0 ) ) ;
90
+ for await ( const chunk of readable ) {
91
+ chunks = concat ( [ chunks , chunk ] ) as Uint8Array < ArrayBufferLike > ;
95
92
let index ;
96
93
while (
97
94
( index = chunks . indexOf ( CRLF_BYTES [ 0 ] ) ) !== - 1 &&
@@ -101,7 +98,6 @@ async function* readLines(reader: Reader): AsyncIterableIterator<Uint8Array> {
101
98
chunks = chunks . subarray ( index + 2 ) ;
102
99
}
103
100
}
104
- yield chunks ;
105
101
}
106
102
107
103
function readNReplies (
@@ -121,9 +117,6 @@ async function readReply(
121
117
raw = false ,
122
118
) : Promise < Reply > {
123
119
const { value } = await iterator . next ( ) ;
124
- if ( value . length === 0 ) {
125
- return Promise . reject ( new TypeError ( "No reply received" ) ) ;
126
- }
127
120
switch ( value [ 0 ] ) {
128
121
case ARRAY_PREFIX :
129
122
case PUSH_PREFIX : {
@@ -376,13 +369,18 @@ async function readReply(
376
369
* ```
377
370
*/
378
371
export class RedisClient {
379
- #conn: Reader & Writer ;
372
+ #writer: WritableStreamDefaultWriter < Uint8Array > ;
380
373
#lines: AsyncIterableIterator < Uint8Array > ;
381
374
#queue: Promise < any > = Promise . resolve ( ) ;
382
375
383
- constructor ( conn : Reader & Writer ) {
384
- this . #conn = conn ;
385
- this . #lines = readLines ( this . #conn) ;
376
+ constructor (
377
+ conn : {
378
+ readable : ReadableStream < Uint8Array > ;
379
+ writable : WritableStream < Uint8Array > ;
380
+ } ,
381
+ ) {
382
+ this . #writer = conn . writable . getWriter ( ) ;
383
+ this . #lines = readLines ( conn . readable ) ;
386
384
}
387
385
388
386
#enqueue< T > ( task : ( ) => Promise < T > ) : Promise < T > {
@@ -410,8 +408,8 @@ export class RedisClient {
410
408
* ```
411
409
*/
412
410
sendCommand ( command : Command , raw = false ) : Promise < Reply > {
413
- return this . #enqueue( async ( ) => {
414
- await writeAll ( this . #conn , createRequest ( command ) ) ;
411
+ return this . #enqueue( ( ) => {
412
+ this . #writer . write ( createRequest ( command ) ) ;
415
413
return readReply ( this . #lines, raw ) ;
416
414
} ) ;
417
415
}
@@ -436,7 +434,7 @@ export class RedisClient {
436
434
* ```
437
435
*/
438
436
writeCommand ( command : Command ) : Promise < void > {
439
- return this . #enqueue( ( ) => writeAll ( this . #conn , createRequest ( command ) ) ) ;
437
+ return this . #enqueue( ( ) => this . #writer . write ( createRequest ( command ) ) ) ;
440
438
}
441
439
442
440
/**
@@ -494,9 +492,9 @@ export class RedisClient {
494
492
* ```
495
493
*/
496
494
pipelineCommands ( commands : Command [ ] , raw = false ) : Promise < Reply [ ] > {
497
- return this . #enqueue( async ( ) => {
495
+ return this . #enqueue( ( ) => {
498
496
const bytes = concat ( commands . map ( createRequest ) ) ;
499
- await writeAll ( this . #conn , bytes ) ;
497
+ this . #writer . write ( bytes ) ;
500
498
return readNReplies ( this . #lines, commands . length , raw ) ;
501
499
} ) ;
502
500
}
0 commit comments