Skip to content

Commit b130ecb

Browse files
authored
Add get to CLI (#195)
1 parent a0c43cc commit b130ecb

11 files changed

+249
-20
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ resolver = "2"
88
[workspace.dependencies]
99
bytes = "1"
1010
env_logger = "0.11"
11+
futures = "0.3"
1112
log = "0.4"
1213
thiserror = "2"
1314
tokio = "1"

python/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ name = "hdfs_native._internal"
2727
[dependencies]
2828
bytes = { workspace = true }
2929
env_logger = { workspace = true }
30+
futures = { workspace = true }
3031
hdfs-native = { path = "../rust" }
3132
log = { workspace = true }
3233
pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"] }

python/conftest.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,16 @@ def minidfs():
4343
child.kill()
4444

4545

46-
@pytest.fixture(scope="module")
47-
def client(minidfs: str) -> Client:
48-
return Client(minidfs)
46+
@pytest.fixture
47+
def client(minidfs: str):
48+
client = Client(minidfs)
49+
50+
try:
51+
yield client
52+
finally:
53+
statuses = list(client.list_status("/"))
54+
for status in statuses:
55+
client.delete(status.path, True)
4956

5057

5158
@pytest.fixture(scope="module")

python/hdfs_native/__init__.py

+12
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ def __init__(self, inner: "RawFileReader"):
3939
def __len__(self) -> int:
4040
return self.inner.file_length()
4141

42+
def __iter__(self) -> Iterator[bytes]:
43+
return self.read_range_stream(0, len(self))
44+
4245
def __enter__(self):
4346
# Don't need to do anything special here
4447
return self
@@ -82,6 +85,15 @@ def read_range(self, offset: int, len: int) -> bytes:
8285
"""Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file"""
8386
return self.inner.read_range(offset, len)
8487

88+
def read_range_stream(self, offset: int, len: int) -> Iterator[bytes]:
89+
"""
90+
Read `len` bytes from the file starting at `offset` as an iterator of bytes. Doesn't affect
91+
the position in the file.
92+
93+
This is the most efficient way to iteratively read a file.
94+
"""
95+
return self.inner.read_range_stream(offset, len)
96+
8597
def close(self) -> None:
8698
pass
8799

python/hdfs_native/_internal.pyi

+6
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ class RawFileReader:
8080
def read_range(self, offset: int, len: int) -> bytes:
8181
"""Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file"""
8282

83+
def read_range_stream(self, offset: int, len: int) -> Iterator[bytes]:
84+
"""
85+
Read `len` bytes from the file starting at `offset` as an iterator of bytes. Doesn't affect
86+
the position in the file.
87+
"""
88+
8389
class RawFileWriter:
8490
def write(self, buf: Buffer) -> int:
8591
"""Writes `buf` to the file"""

python/hdfs_native/cli.py

+120-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import functools
22
import os
33
import re
4+
import shutil
45
import sys
56
from argparse import ArgumentParser, Namespace
6-
from typing import List, Optional, Sequence
7+
from concurrent.futures import ThreadPoolExecutor, as_completed
8+
from typing import List, Optional, Sequence, Tuple
79
from urllib.parse import urlparse
810

911
from hdfs_native import Client
@@ -50,6 +52,42 @@ def _glob_path(client: Client, glob: str) -> List[str]:
5052
return [glob]
5153

5254

55+
def _download_file(
56+
client: Client,
57+
remote_src: str,
58+
local_dst: str,
59+
force: bool = False,
60+
preserve: bool = False,
61+
) -> None:
62+
if not force and os.path.exists(local_dst):
63+
raise FileExistsError(f"{local_dst} already exists, use --force to overwrite")
64+
65+
with client.read(remote_src) as remote_file:
66+
with open(local_dst, "wb") as local_file:
67+
for chunk in remote_file.read_range_stream(0, len(remote_file)):
68+
local_file.write(chunk)
69+
70+
if preserve:
71+
status = client.get_file_info(remote_src)
72+
os.utime(
73+
local_dst,
74+
(status.access_time / 1000, status.modification_time / 1000),
75+
)
76+
os.chmod(local_dst, status.permission)
77+
78+
79+
def _upload_file(
80+
client: Client,
81+
local_src: str,
82+
remote_dst: str,
83+
force: bool = False,
84+
preserve: bool = False,
85+
) -> None:
86+
with open(local_src, "rb") as local_file:
87+
with client.create(remote_dst) as remote_file:
88+
shutil.copyfileobj(local_file, remote_file)
89+
90+
5391
def cat(args: Namespace):
5492
for src in args.src:
5593
client = _client_for_url(src)
@@ -99,6 +137,48 @@ def chown(args: Namespace):
99137
client.set_owner(path, owner, group)
100138

101139

140+
def get(args: Namespace):
141+
paths: List[Tuple[Client, str]] = []
142+
143+
for url in args.src:
144+
client = _client_for_url(url)
145+
for path in _glob_path(client, _path_for_url(url)):
146+
paths.append((client, path))
147+
148+
dst_is_dir = os.path.isdir(args.localdst)
149+
150+
if len(paths) > 1 and not dst_is_dir:
151+
raise ValueError("Destination must be directory when copying multiple files")
152+
elif not dst_is_dir:
153+
_download_file(
154+
paths[0][0],
155+
paths[0][1],
156+
args.localdst,
157+
force=args.force,
158+
preserve=args.preserve,
159+
)
160+
else:
161+
with ThreadPoolExecutor(args.threads) as executor:
162+
futures = []
163+
for client, path in paths:
164+
filename = os.path.basename(path)
165+
166+
futures.append(
167+
executor.submit(
168+
_download_file,
169+
client,
170+
path,
171+
os.path.join(args.localdst, filename),
172+
force=args.force,
173+
preserve=args.preserve,
174+
)
175+
)
176+
177+
# Iterate to raise any exceptions thrown
178+
for f in as_completed(futures):
179+
f.result()
180+
181+
102182
def mkdir(args: Namespace):
103183
create_parent = args.parent
104184

@@ -193,6 +273,45 @@ def main(in_args: Optional[Sequence[str]] = None):
193273
chown_parser.add_argument("path", nargs="+", help="File pattern to modify")
194274
chown_parser.set_defaults(func=chown)
195275

276+
get_parser = subparsers.add_parser(
277+
"get",
278+
aliases=["copyToLocal"],
279+
help="Copy files to a local destination",
280+
description="""Copy files matching a pattern to a local destination.
281+
When copying multiple files, the destination must be a directory""",
282+
)
283+
get_parser.add_argument(
284+
"-p",
285+
"--preserve",
286+
action="store_true",
287+
default=False,
288+
help="Preserve timestamps and the mode",
289+
)
290+
get_parser.add_argument(
291+
"-f",
292+
"--force",
293+
action="store_true",
294+
default=False,
295+
help="Overwrite the destination if it already exists",
296+
)
297+
get_parser.add_argument(
298+
"-t",
299+
"--threads",
300+
type=int,
301+
help="Number of threads to use",
302+
default=1,
303+
)
304+
get_parser.add_argument(
305+
"src",
306+
nargs="+",
307+
help="Source patterns to copy",
308+
)
309+
get_parser.add_argument(
310+
"localdst",
311+
help="Local destination to write to",
312+
)
313+
get_parser.set_defaults(func=get)
314+
196315
mkdir_parser = subparsers.add_parser(
197316
"mkdir",
198317
help="Create a directory",

python/src/lib.rs

+39-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::borrow::Cow;
22
use std::collections::HashMap;
3-
use std::sync::Arc;
3+
use std::sync::{Arc, Mutex};
44

55
use ::hdfs_native::file::{FileReader, FileWriter};
66
use ::hdfs_native::WriteOptions;
@@ -9,6 +9,8 @@ use ::hdfs_native::{
99
Client,
1010
};
1111
use bytes::Bytes;
12+
use futures::stream::BoxStream;
13+
use futures::StreamExt;
1214
use hdfs_native::acl::{AclEntry, AclStatus};
1315
use hdfs_native::client::ContentSummary;
1416
use pyo3::{exceptions::PyRuntimeError, prelude::*};
@@ -220,6 +222,32 @@ impl PyAclEntry {
220222
}
221223
}
222224

225+
#[pyclass(name = "FileReadStream")]
226+
struct PyFileReadStream {
227+
inner: Arc<Mutex<BoxStream<'static, hdfs_native::Result<Bytes>>>>,
228+
rt: Arc<Runtime>,
229+
}
230+
231+
#[pymethods]
232+
impl PyFileReadStream {
233+
pub fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
234+
slf
235+
}
236+
237+
fn __next__(slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<Cow<[u8]>>> {
238+
let inner = Arc::clone(&slf.inner);
239+
let rt = Arc::clone(&slf.rt);
240+
if let Some(result) = slf
241+
.py()
242+
.allow_threads(|| rt.block_on(inner.lock().unwrap().next()))
243+
{
244+
Ok(Some(Cow::from(result?.to_vec())))
245+
} else {
246+
Ok(None)
247+
}
248+
}
249+
}
250+
223251
#[pyclass]
224252
struct RawFileReader {
225253
inner: FileReader,
@@ -258,6 +286,16 @@ impl RawFileReader {
258286
.to_vec(),
259287
))
260288
}
289+
290+
pub fn read_range_stream(&self, offset: usize, len: usize) -> PyFileReadStream {
291+
let stream = Arc::new(Mutex::new(
292+
self.inner.read_range_stream(offset, len).boxed(),
293+
));
294+
PyFileReadStream {
295+
inner: stream,
296+
rt: Arc::clone(&self.rt),
297+
}
298+
}
261299
}
262300

263301
#[pyclass(get_all, set_all, name = "WriteOptions")]

0 commit comments

Comments
 (0)