Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cat: make it concurrent #593

Merged
merged 38 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f81fb67
implement linked list buffer
denizsurmeli Jul 13, 2023
2c0d961
feat: implement linked buffer
denizsurmeli Jul 13, 2023
e7a66e8
fix: remove unused function from the tests
denizsurmeli Jul 13, 2023
cb0805e
feat: make `cat` concurrent
denizsurmeli Jul 13, 2023
019acee
cat: remove unnecessary check
denizsurmeli Jul 14, 2023
da08bd3
chore: add `concurrency` and `part-size` flags to `cat`
denizsurmeli Jul 16, 2023
f035153
chore: update buffer
denizsurmeli Jul 17, 2023
f25fa66
cat(test): add big file test
denizsurmeli Jul 17, 2023
3477025
feat: extend testing
denizsurmeli Jul 17, 2023
a27df74
test: add fuzzy test
denizsurmeli Jul 18, 2023
d2d4e14
chore: remove debug lines
denizsurmeli Jul 18, 2023
fc77a75
chore: update the expected error in the tests
denizsurmeli Jul 18, 2023
4e596cc
chore: put debug messages back
denizsurmeli Jul 18, 2023
a512d3b
fix: copy the content before buffering
denizsurmeli Jul 18, 2023
c4858d0
Merge branch 'peak:master' into linked-list-writer-at
denizsurmeli Jul 20, 2023
425e902
test: add changing slice test and remove the unnecessary tests
denizsurmeli Jul 18, 2023
407d543
testing: add io.Copy test to custom buffer
denizsurmeli Jul 20, 2023
5cf4380
test: make tests parallel
denizsurmeli Jul 20, 2023
a00524d
fix: remove unused dep, remove unused fields
denizsurmeli Jul 21, 2023
5b55085
tests: reduce the test size
denizsurmeli Jul 21, 2023
9575a85
Merge branch 'master' into linked-list-writer-at
denizsurmeli Jul 21, 2023
9f10c1b
a
denizsurmeli Jul 21, 2023
25f1581
optimize test
denizsurmeli Jul 21, 2023
589d40c
make tests parallel
denizsurmeli Jul 21, 2023
3fcb2b3
trigger ci
denizsurmeli Jul 21, 2023
9844dc0
chore: update changelog
denizsurmeli Jul 21, 2023
b2bdb82
Merge branch 'master' into linked-list-writer-at
denizsurmeli Jul 22, 2023
d303d91
chore: reduce test size for actions
denizsurmeli Jul 22, 2023
b0ae89b
Merge branch 'master' into linked-list-writer-at
igungor Jul 24, 2023
e6208fe
Merge branch 'master' into linked-list-writer-at
denizsurmeli Jul 25, 2023
d3ff3f0
refactor: resolve the comments
denizsurmeli Jul 25, 2023
9833971
refactor: name `OrderedBuffer` to `OrderedWriterAt`
denizsurmeli Jul 25, 2023
069a4e5
Merge branch 'master' into linked-list-writer-at
denizsurmeli Jul 25, 2023
8b46bec
Merge branch 'master' into linked-list-writer-at
denizsurmeli Jul 25, 2023
545aff3
refactor: requested changes applied
denizsurmeli Jul 27, 2023
131dc80
refactor: simplify code, update docs
denizsurmeli Jul 27, 2023
f092dfb
refactor: random bytes as a function
denizsurmeli Jul 27, 2023
4cd302c
chore: changelog
denizsurmeli Jul 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

#### Features
- Added `--content-disposition` flag to `cp` command. ([#569](https://github.com/peak/s5cmd/issues/569))
- Added `--show-fullpath` flag to `ls`. (#[596](https://github.com/peak/s5cmd/issues/596))
- Added `--show-fullpath` flag to `ls`. ([#596](https://github.com/peak/s5cmd/issues/596))

#### Improvements
- Implemented concurrent multipart download support for `cat`. ([#593](https://github.com/peak/s5cmd/pull/593))

#### Bugfixes
- Fixed a bug introduced with `external sort` support in `sync` command which prevents `sync` to an empty destination with `--delete` option. ([#576](https://github.com/peak/s5cmd/issues/576))
Expand Down
27 changes: 20 additions & 7 deletions command/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package command
import (
"context"
"fmt"
"io"
"os"

"github.com/urfave/cli/v2"

"github.com/peak/s5cmd/v2/log/stat"
"github.com/peak/s5cmd/v2/orderedwriter"
"github.com/peak/s5cmd/v2/storage"
"github.com/peak/s5cmd/v2/storage/url"
)
Expand Down Expand Up @@ -44,6 +44,18 @@ func NewCatCommand() *cli.Command {
Name: "version-id",
Usage: "use the specified version of an object",
},
&cli.IntFlag{
Name: "concurrency",
Aliases: []string{"c"},
Value: defaultCopyConcurrency,
Usage: "number of concurrent parts transferred between host and remote server",
},
&cli.IntFlag{
Name: "part-size",
Aliases: []string{"p"},
Value: defaultPartSize,
Usage: "size of each part transferred between host and remote server, in MiB",
},
},
CustomHelpTemplate: catHelpTemplate,
Before: func(c *cli.Context) error {
Expand Down Expand Up @@ -72,6 +84,8 @@ func NewCatCommand() *cli.Command {
fullCommand: fullCommand,

storageOpts: NewStorageOpts(c),
concurrency: c.Int("concurrency"),
partSize: c.Int64("part-size") * megabytes,
}.Run(c.Context)
},
}
Expand All @@ -86,6 +100,8 @@ type Cat struct {
fullCommand string

storageOpts storage.Options
concurrency int
partSize int64
}

// Run prints content of given source to standard output.
Expand All @@ -95,20 +111,17 @@ func (c Cat) Run(ctx context.Context) error {
printError(c.fullCommand, c.op, err)
return err
}

rc, err := client.Read(ctx, c.src)
_, err = client.Stat(ctx, c.src)
if err != nil {
printError(c.fullCommand, c.op, err)
return err
}
defer rc.Close()

_, err = io.Copy(os.Stdout, rc)
buf := orderedwriter.New(os.Stdout)
_, err = client.Get(ctx, c.src, buf, c.concurrency, c.partSize)
if err != nil {
printError(c.fullCommand, c.op, err)
return err
}

return nil
}

Expand Down
48 changes: 38 additions & 10 deletions e2e/cat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
"gotest.tools/v3/icmd"
)

const (
kb int64 = 1024
mb = kb * kb
)

func TestCatS3Object(t *testing.T) {
t.Parallel()

const (
filename = "file.txt"
)
contents, expected := getSequentialFileContent()
contents, expected := getSequentialFileContent(4 * mb)

testcases := []struct {
name string
Expand All @@ -42,6 +47,31 @@ func TestCatS3Object(t *testing.T) {
jsonCheck(true),
},
},
{
name: "cat remote object with lower part size and higher concurrency",
cmd: []string{
"cat",
"-p",
"1",
"-c",
"2",
},
expected: expected,
},
{
name: "cat remote object with json flag lower part size and higher concurrency",
cmd: []string{
"--json",
"cat",
"-p",
"1",
"-c",
"2",
}, expected: expected,
assertOps: []assertOp{
jsonCheck(true),
},
},
}
for _, tc := range testcases {
tc := tc
Expand Down Expand Up @@ -86,7 +116,7 @@ func TestCatS3ObjectFail(t *testing.T) {
"cat",
},
expected: map[int]compareFunc{
0: match(`ERROR "cat s3://(.*)/prefix/file\.txt": NoSuchKey:`),
0: match(`ERROR "cat s3://(.*)/prefix/file\.txt":(.*) not found`),
},
},
{
Expand All @@ -97,7 +127,7 @@ func TestCatS3ObjectFail(t *testing.T) {
"cat",
},
expected: map[int]compareFunc{
0: match(`{"operation":"cat","command":"cat s3:\/\/(.*)\/prefix\/file\.txt","error":"NoSuchKey:`),
0: match(`{"operation":"cat","command":"cat s3:\/\/(.*)\/prefix\/file\.txt","error":"(.*) not found`),
},
assertOps: []assertOp{
jsonCheck(true),
Expand Down Expand Up @@ -142,7 +172,6 @@ func TestCatS3ObjectFail(t *testing.T) {
cmd := s5cmd(tc.cmd...)

result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})
assertLines(t, result.Stderr(), tc.expected, tc.assertOps...)
})
Expand Down Expand Up @@ -200,16 +229,16 @@ func TestCatLocalFileFail(t *testing.T) {
}
}

// getSequentialFileContent creates a string with 64666688 in size (~61.670 MB)
func getSequentialFileContent() (string, map[int]compareFunc) {
// getSequentialFileContent creates a string with size bytes in size.
func getSequentialFileContent(size int64) (string, map[int]compareFunc) {
sb := strings.Builder{}
expectedLines := make(map[int]compareFunc)

for i := 0; i < 50000; i++ {
totalBytesWritten := int64(0)
for i := 0; totalBytesWritten < size; i++ {
line := fmt.Sprintf(`{ "line": "%d", "id": "i%d", data: "some event %d" }`, i, i, i)
sb.WriteString(line)
sb.WriteString("\n")

totalBytesWritten += int64(len(line))
expectedLines[i] = equals(line)
}

Expand Down Expand Up @@ -272,7 +301,6 @@ func TestCatByVersionID(t *testing.T) {
cmd = s5cmd("cat", "--version-id", version,
fmt.Sprintf("s3://%v/%v", bucket, filename))
result = icmd.RunCmd(cmd)

if diff := cmp.Diff(contents[i], result.Stdout()); diff != "" {
t.Errorf("(-want +got):\n%v", diff)
}
Expand Down
113 changes: 113 additions & 0 deletions orderedwriter/orderedwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Package orderedwriter implements an unbounded buffer for ordering concurrent writes for
// non-seekable writers. It keeps an internal linked list that keeps the chunks in order
// and flushes buffered chunks when the expected offset is available.
package orderedwriter

import (
"container/list"
"io"
"sync"
)

type chunk struct {
offset int64
value []byte
}

type OrderedWriterAt struct {
mu *sync.Mutex
list *list.List
w io.Writer
written int64
}

func New(w io.Writer) *OrderedWriterAt {
return &OrderedWriterAt{
mu: &sync.Mutex{},
list: list.New(),
w: w,
written: 0,
}
}

func (w *OrderedWriterAt) WriteAt(p []byte, offset int64) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()

// If the queue is empty and the chunk is writeable, push it without queueing.
if w.list.Front() == nil && w.written == offset {
n, err := w.w.Write(p)
if err != nil {
return n, err
}
w.written += int64(n)
return len(p), nil
}

// Copy the chunk, buffered writers can modify
// the slice before we consume them.
b := make([]byte, len(p))
copy(b, p)

// If there are no items in the list and we can't write
// directly push back and return early.
if w.list.Front() == nil {
w.list.PushBack(&chunk{
offset: offset,
value: b,
})
return len(p), nil
}

// Traverse the list from the beginning and insert
// it to the smallest index possible. That is,
// compare the element's offset with the offset
// that you want to buffer.
var inserted bool
for e := w.list.Front(); e != nil; e = e.Next() {
v, _ := e.Value.(*chunk)
if offset < v.offset {
w.list.InsertBefore(&chunk{
offset: offset,
value: b,
}, e)
inserted = true
break
}
}

// If the chunk haven't been inserted, put it at
// the end of the buffer.
if !inserted {
w.list.PushBack(&chunk{
offset: offset,
value: b,
})
}

// If the expected offset is buffered,
// flush the items that you can.
var removeList []*list.Element
for e := w.list.Front(); e != nil; e = e.Next() {
v, _ := e.Value.(*chunk)
if v.offset != w.written {
break
}

n, err := w.w.Write(v.value)
if err != nil {
return n, err
}

removeList = append(removeList, e)
w.written += int64(n)

}

// Remove the items that have been written.
for _, e := range removeList {
w.list.Remove(e)
}

return len(p), nil
}
Loading