Skip to content

Commit

Permalink
cat: make it concurrent (#593)
Browse files Browse the repository at this point in the history
This PR adds a new io.WriterAt adapter for non-seekable writers. It uses an internal linked list to order the incoming chunks. The implementation is independent from the download manager of aws-sdk-go, and because of that currently it can not bound the memory usage. In order to limit the memory usage, we would have had to write a custom manager other than the aws-sdk-go's implementation, which seemed unfeasible.

The new implementation is about %25 percent faster than the older implementation for a 9.4 GB file with partSize=50MB and concurrency=20 parameters, with significantly higher memory usage, on average it uses 0.9 GB of memory and at most 2.1 GB is observed. Obviously, the memory usage and performance is dependent on the partSize-concurrency configuration and the link.

Resolves #245

Co-authored-by: İbrahim Güngör <[email protected]>
  • Loading branch information
denizsurmeli and igungor authored Jul 27, 2023
1 parent bbc13fd commit 4292ecc
Show file tree
Hide file tree
Showing 5 changed files with 492 additions and 18 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

#### Features
- Added `--content-disposition` flag to `cp` command. ([#569](https://github.com/peak/s5cmd/issues/569))
- Added `--show-fullpath` flag to `ls`. (#[596](https://github.com/peak/s5cmd/issues/596))
- Added `--show-fullpath` flag to `ls`. ([#596](https://github.com/peak/s5cmd/issues/596))

#### Improvements
- Implemented concurrent multipart download support for `cat`. ([#245](https://github.com/peak/s5cmd/issues/245))

#### Bugfixes
- Fixed a bug introduced with `external sort` support in `sync` command which prevents `sync` to an empty destination with `--delete` option. ([#576](https://github.com/peak/s5cmd/issues/576))
Expand Down
27 changes: 20 additions & 7 deletions command/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package command
import (
"context"
"fmt"
"io"
"os"

"github.com/urfave/cli/v2"

"github.com/peak/s5cmd/v2/log/stat"
"github.com/peak/s5cmd/v2/orderedwriter"
"github.com/peak/s5cmd/v2/storage"
"github.com/peak/s5cmd/v2/storage/url"
)
Expand Down Expand Up @@ -44,6 +44,18 @@ func NewCatCommand() *cli.Command {
Name: "version-id",
Usage: "use the specified version of an object",
},
&cli.IntFlag{
Name: "concurrency",
Aliases: []string{"c"},
Value: defaultCopyConcurrency,
Usage: "number of concurrent parts transferred between host and remote server",
},
&cli.IntFlag{
Name: "part-size",
Aliases: []string{"p"},
Value: defaultPartSize,
Usage: "size of each part transferred between host and remote server, in MiB",
},
},
CustomHelpTemplate: catHelpTemplate,
Before: func(c *cli.Context) error {
Expand Down Expand Up @@ -72,6 +84,8 @@ func NewCatCommand() *cli.Command {
fullCommand: fullCommand,

storageOpts: NewStorageOpts(c),
concurrency: c.Int("concurrency"),
partSize: c.Int64("part-size") * megabytes,
}.Run(c.Context)
},
}
Expand All @@ -86,6 +100,8 @@ type Cat struct {
fullCommand string

storageOpts storage.Options
concurrency int
partSize int64
}

// Run prints content of given source to standard output.
Expand All @@ -95,20 +111,17 @@ func (c Cat) Run(ctx context.Context) error {
printError(c.fullCommand, c.op, err)
return err
}

rc, err := client.Read(ctx, c.src)
_, err = client.Stat(ctx, c.src)
if err != nil {
printError(c.fullCommand, c.op, err)
return err
}
defer rc.Close()

_, err = io.Copy(os.Stdout, rc)
buf := orderedwriter.New(os.Stdout)
_, err = client.Get(ctx, c.src, buf, c.concurrency, c.partSize)
if err != nil {
printError(c.fullCommand, c.op, err)
return err
}

return nil
}

Expand Down
48 changes: 38 additions & 10 deletions e2e/cat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
"gotest.tools/v3/icmd"
)

const (
kb int64 = 1024
mb = kb * kb
)

func TestCatS3Object(t *testing.T) {
t.Parallel()

const (
filename = "file.txt"
)
contents, expected := getSequentialFileContent()
contents, expected := getSequentialFileContent(4 * mb)

testcases := []struct {
name string
Expand All @@ -42,6 +47,31 @@ func TestCatS3Object(t *testing.T) {
jsonCheck(true),
},
},
{
name: "cat remote object with lower part size and higher concurrency",
cmd: []string{
"cat",
"-p",
"1",
"-c",
"2",
},
expected: expected,
},
{
name: "cat remote object with json flag lower part size and higher concurrency",
cmd: []string{
"--json",
"cat",
"-p",
"1",
"-c",
"2",
}, expected: expected,
assertOps: []assertOp{
jsonCheck(true),
},
},
}
for _, tc := range testcases {
tc := tc
Expand Down Expand Up @@ -86,7 +116,7 @@ func TestCatS3ObjectFail(t *testing.T) {
"cat",
},
expected: map[int]compareFunc{
0: match(`ERROR "cat s3://(.*)/prefix/file\.txt": NoSuchKey:`),
0: match(`ERROR "cat s3://(.*)/prefix/file\.txt":(.*) not found`),
},
},
{
Expand All @@ -97,7 +127,7 @@ func TestCatS3ObjectFail(t *testing.T) {
"cat",
},
expected: map[int]compareFunc{
0: match(`{"operation":"cat","command":"cat s3:\/\/(.*)\/prefix\/file\.txt","error":"NoSuchKey:`),
0: match(`{"operation":"cat","command":"cat s3:\/\/(.*)\/prefix\/file\.txt","error":"(.*) not found`),
},
assertOps: []assertOp{
jsonCheck(true),
Expand Down Expand Up @@ -142,7 +172,6 @@ func TestCatS3ObjectFail(t *testing.T) {
cmd := s5cmd(tc.cmd...)

result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})
assertLines(t, result.Stderr(), tc.expected, tc.assertOps...)
})
Expand Down Expand Up @@ -200,16 +229,16 @@ func TestCatLocalFileFail(t *testing.T) {
}
}

// getSequentialFileContent creates a string with 64666688 in size (~61.670 MB)
func getSequentialFileContent() (string, map[int]compareFunc) {
// getSequentialFileContent creates a string with size bytes in size.
func getSequentialFileContent(size int64) (string, map[int]compareFunc) {
sb := strings.Builder{}
expectedLines := make(map[int]compareFunc)

for i := 0; i < 50000; i++ {
totalBytesWritten := int64(0)
for i := 0; totalBytesWritten < size; i++ {
line := fmt.Sprintf(`{ "line": "%d", "id": "i%d", data: "some event %d" }`, i, i, i)
sb.WriteString(line)
sb.WriteString("\n")

totalBytesWritten += int64(len(line))
expectedLines[i] = equals(line)
}

Expand Down Expand Up @@ -272,7 +301,6 @@ func TestCatByVersionID(t *testing.T) {
cmd = s5cmd("cat", "--version-id", version,
fmt.Sprintf("s3://%v/%v", bucket, filename))
result = icmd.RunCmd(cmd)

if diff := cmp.Diff(contents[i], result.Stdout()); diff != "" {
t.Errorf("(-want +got):\n%v", diff)
}
Expand Down
113 changes: 113 additions & 0 deletions orderedwriter/orderedwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Package orderedwriter implements an unbounded buffer for ordering concurrent writes for
// non-seekable writers. It keeps an internal linked list that keeps the chunks in order
// and flushes buffered chunks when the expected offset is available.
package orderedwriter

import (
"container/list"
"io"
"sync"
)

type chunk struct {
offset int64
value []byte
}

type OrderedWriterAt struct {
mu *sync.Mutex
list *list.List
w io.Writer
written int64
}

func New(w io.Writer) *OrderedWriterAt {
return &OrderedWriterAt{
mu: &sync.Mutex{},
list: list.New(),
w: w,
written: 0,
}
}

func (w *OrderedWriterAt) WriteAt(p []byte, offset int64) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()

// If the queue is empty and the chunk is writeable, push it without queueing.
if w.list.Front() == nil && w.written == offset {
n, err := w.w.Write(p)
if err != nil {
return n, err
}
w.written += int64(n)
return len(p), nil
}

// Copy the chunk, buffered writers can modify
// the slice before we consume them.
b := make([]byte, len(p))
copy(b, p)

// If there are no items in the list and we can't write
// directly push back and return early.
if w.list.Front() == nil {
w.list.PushBack(&chunk{
offset: offset,
value: b,
})
return len(p), nil
}

// Traverse the list from the beginning and insert
// it to the smallest index possible. That is,
// compare the element's offset with the offset
// that you want to buffer.
var inserted bool
for e := w.list.Front(); e != nil; e = e.Next() {
v, _ := e.Value.(*chunk)
if offset < v.offset {
w.list.InsertBefore(&chunk{
offset: offset,
value: b,
}, e)
inserted = true
break
}
}

// If the chunk haven't been inserted, put it at
// the end of the buffer.
if !inserted {
w.list.PushBack(&chunk{
offset: offset,
value: b,
})
}

// If the expected offset is buffered,
// flush the items that you can.
var removeList []*list.Element
for e := w.list.Front(); e != nil; e = e.Next() {
v, _ := e.Value.(*chunk)
if v.offset != w.written {
break
}

n, err := w.w.Write(v.value)
if err != nil {
return n, err
}

removeList = append(removeList, e)
w.written += int64(n)

}

// Remove the items that have been written.
for _, e := range removeList {
w.list.Remove(e)
}

return len(p), nil
}
Loading

0 comments on commit 4292ecc

Please sign in to comment.