Skip to content

Commit bac40c6

Browse files
authored
Add ObjectStore::get_opts (#2241) (#4212)
* Add ObjectStore::get_opts (#2241) * Cleanup error handling * Review feedback
1 parent 0a8913a commit bac40c6

18 files changed

+470
-314
lines changed

object_store/src/aws/client.rs

+11-25
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,25 @@
1717

1818
use crate::aws::checksum::Checksum;
1919
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
20-
use crate::aws::STRICT_PATH_ENCODE_SET;
20+
use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
2121
use crate::client::pagination::stream_paginated;
2222
use crate::client::retry::RetryExt;
23+
use crate::client::GetOptionsExt;
2324
use crate::multipart::UploadPart;
2425
use crate::path::DELIMITER;
25-
use crate::util::{format_http_range, format_prefix};
26+
use crate::util::format_prefix;
2627
use crate::{
27-
BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
28-
RetryConfig, StreamExt,
28+
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path,
29+
Result, RetryConfig, StreamExt,
2930
};
3031
use base64::prelude::BASE64_STANDARD;
3132
use base64::Engine;
3233
use bytes::{Buf, Bytes};
3334
use chrono::{DateTime, Utc};
3435
use percent_encoding::{utf8_percent_encode, PercentEncode};
35-
use reqwest::{
36-
header::CONTENT_TYPE, Client as ReqwestClient, Method, Response, StatusCode,
37-
};
36+
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
3837
use serde::{Deserialize, Serialize};
3938
use snafu::{ResultExt, Snafu};
40-
use std::ops::Range;
4139
use std::sync::Arc;
4240

4341
/// A specialized `Error` for object store-related errors
@@ -102,16 +100,9 @@ impl From<Error> for crate::Error {
102100
Error::GetRequest { source, path }
103101
| Error::DeleteRequest { source, path }
104102
| Error::CopyRequest { source, path }
105-
| Error::PutRequest { source, path }
106-
if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
107-
{
108-
Self::NotFound {
109-
path,
110-
source: Box::new(source),
111-
}
112-
}
103+
| Error::PutRequest { source, path } => source.error(STORE, path),
113104
_ => Self::Generic {
114-
store: "S3",
105+
store: STORE,
115106
source: Box::new(err),
116107
},
117108
}
@@ -245,25 +236,20 @@ impl S3Client {
245236
pub async fn get_request(
246237
&self,
247238
path: &Path,
248-
range: Option<Range<usize>>,
239+
options: GetOptions,
249240
head: bool,
250241
) -> Result<Response> {
251-
use reqwest::header::RANGE;
252-
253242
let credential = self.get_credential().await?;
254243
let url = self.config.path_url(path);
255244
let method = match head {
256245
true => Method::HEAD,
257246
false => Method::GET,
258247
};
259248

260-
let mut builder = self.client.request(method, url);
261-
262-
if let Some(range) = range {
263-
builder = builder.header(RANGE, format_http_range(range));
264-
}
249+
let builder = self.client.request(method, url);
265250

266251
let response = builder
252+
.with_get_options(options)
267253
.with_aws_sigv4(
268254
credential.as_ref(),
269255
&self.config.region,

object_store/src/aws/credential.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::aws::STRICT_ENCODE_SET;
18+
use crate::aws::{STORE, STRICT_ENCODE_SET};
1919
use crate::client::retry::RetryExt;
2020
use crate::client::token::{TemporaryToken, TokenCache};
2121
use crate::util::hmac_sha256;
@@ -330,7 +330,7 @@ impl CredentialProvider for InstanceCredentialProvider {
330330
self.imdsv1_fallback,
331331
)
332332
.map_err(|source| crate::Error::Generic {
333-
store: "S3",
333+
store: STORE,
334334
source,
335335
})
336336
}))
@@ -363,7 +363,7 @@ impl CredentialProvider for WebIdentityProvider {
363363
&self.endpoint,
364364
)
365365
.map_err(|source| crate::Error::Generic {
366-
store: "S3",
366+
store: STORE,
367367
source,
368368
})
369369
}))
@@ -552,7 +552,7 @@ mod profile {
552552
.provide_credentials()
553553
.await
554554
.map_err(|source| crate::Error::Generic {
555-
store: "S3",
555+
store: STORE,
556556
source: Box::new(source),
557557
})?;
558558
let t_now = SystemTime::now();

object_store/src/aws/mod.rs

+14-25
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use itertools::Itertools;
4040
use serde::{Deserialize, Serialize};
4141
use snafu::{ensure, OptionExt, ResultExt, Snafu};
4242
use std::collections::BTreeSet;
43-
use std::ops::Range;
4443
use std::str::FromStr;
4544
use std::sync::Arc;
4645
use tokio::io::AsyncWrite;
@@ -57,8 +56,8 @@ use crate::client::ClientConfigKey;
5756
use crate::config::ConfigValue;
5857
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
5958
use crate::{
60-
ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path,
61-
Result, RetryConfig, StreamExt,
59+
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
60+
ObjectStore, Path, Result, RetryConfig, StreamExt,
6261
};
6362

6463
mod checksum;
@@ -79,6 +78,8 @@ pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
7978
/// This struct is used to maintain the URI path encoding
8079
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
8180

81+
const STORE: &str = "S3";
82+
8283
/// Default metadata endpoint
8384
static METADATA_ENDPOINT: &str = "http://169.254.169.254";
8485

@@ -160,10 +161,10 @@ impl From<Error> for super::Error {
160161
fn from(source: Error) -> Self {
161162
match source {
162163
Error::UnknownConfigurationKey { key } => {
163-
Self::UnknownConfigurationKey { store: "S3", key }
164+
Self::UnknownConfigurationKey { store: STORE, key }
164165
}
165166
_ => Self::Generic {
166-
store: "S3",
167+
store: STORE,
167168
source: Box::new(source),
168169
},
169170
}
@@ -246,39 +247,26 @@ impl ObjectStore for AmazonS3 {
246247
.await
247248
}
248249

249-
async fn get(&self, location: &Path) -> Result<GetResult> {
250-
let response = self.client.get_request(location, None, false).await?;
250+
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
251+
let response = self.client.get_request(location, options, false).await?;
251252
let stream = response
252253
.bytes_stream()
253254
.map_err(|source| crate::Error::Generic {
254-
store: "S3",
255+
store: STORE,
255256
source: Box::new(source),
256257
})
257258
.boxed();
258259

259260
Ok(GetResult::Stream(stream))
260261
}
261262

262-
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
263-
let bytes = self
264-
.client
265-
.get_request(location, Some(range), false)
266-
.await?
267-
.bytes()
268-
.await
269-
.map_err(|source| client::Error::GetResponseBody {
270-
source,
271-
path: location.to_string(),
272-
})?;
273-
Ok(bytes)
274-
}
275-
276263
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
277264
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
278265

266+
let options = GetOptions::default();
279267
// Extract meta from headers
280268
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
281-
let response = self.client.get_request(location, None, true).await?;
269+
let response = self.client.get_request(location, options, true).await?;
282270
let headers = response.headers();
283271

284272
let last_modified = headers
@@ -1169,8 +1157,8 @@ fn profile_credentials(
11691157
mod tests {
11701158
use super::*;
11711159
use crate::tests::{
1172-
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
1173-
put_get_delete_list_opts, rename_and_copy, stream_get,
1160+
get_nonexistent_object, get_opts, list_uses_directories_correctly,
1161+
list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get,
11741162
};
11751163
use bytes::Bytes;
11761164
use std::collections::HashMap;
@@ -1417,6 +1405,7 @@ mod tests {
14171405

14181406
// Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
14191407
put_get_delete_list_opts(&integration, is_local).await;
1408+
get_opts(&integration).await;
14201409
list_uses_directories_correctly(&integration).await;
14211410
list_with_delimiter(&integration).await;
14221411
rename_and_copy(&integration).await;

object_store/src/azure/client.rs

+19-39
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
use super::credential::{AzureCredential, CredentialProvider};
1919
use crate::azure::credential::*;
20+
use crate::azure::STORE;
2021
use crate::client::pagination::stream_paginated;
2122
use crate::client::retry::RetryExt;
23+
use crate::client::GetOptionsExt;
2224
use crate::path::DELIMITER;
23-
use crate::util::{deserialize_rfc1123, format_http_range, format_prefix};
25+
use crate::util::{deserialize_rfc1123, format_prefix};
2426
use crate::{
25-
BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
26-
StreamExt,
27+
BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result,
28+
RetryConfig, StreamExt,
2729
};
2830
use base64::prelude::BASE64_STANDARD;
2931
use base64::Engine;
@@ -32,13 +34,12 @@ use chrono::{DateTime, Utc};
3234
use itertools::Itertools;
3335
use reqwest::header::CONTENT_TYPE;
3436
use reqwest::{
35-
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
37+
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
3638
Client as ReqwestClient, Method, Response, StatusCode,
3739
};
3840
use serde::{Deserialize, Serialize};
3941
use snafu::{ResultExt, Snafu};
4042
use std::collections::HashMap;
41-
use std::ops::Range;
4243
use url::Url;
4344

4445
/// A specialized `Error` for object store-related errors
@@ -69,12 +70,6 @@ pub(crate) enum Error {
6970
path: String,
7071
},
7172

72-
#[snafu(display("Error performing copy request {}: {}", path, source))]
73-
CopyRequest {
74-
source: crate::client::retry::Error,
75-
path: String,
76-
},
77-
7873
#[snafu(display("Error performing list request: {}", source))]
7974
ListRequest { source: crate::client::retry::Error },
8075

@@ -95,25 +90,9 @@ impl From<Error> for crate::Error {
9590
match err {
9691
Error::GetRequest { source, path }
9792
| Error::DeleteRequest { source, path }
98-
| Error::CopyRequest { source, path }
99-
| Error::PutRequest { source, path }
100-
if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
101-
{
102-
Self::NotFound {
103-
path,
104-
source: Box::new(source),
105-
}
106-
}
107-
Error::CopyRequest { source, path }
108-
if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
109-
{
110-
Self::AlreadyExists {
111-
path,
112-
source: Box::new(source),
113-
}
114-
}
93+
| Error::PutRequest { source, path } => source.error(STORE, path),
11594
_ => Self::Generic {
116-
store: "MicrosoftAzure",
95+
store: STORE,
11796
source: Box::new(err),
11897
},
11998
}
@@ -175,7 +154,7 @@ impl AzureClient {
175154
// and we want to use it in an infallible function
176155
HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| {
177156
crate::Error::Generic {
178-
store: "MicrosoftAzure",
157+
store: STORE,
179158
source: Box::new(err),
180159
}
181160
})?,
@@ -193,7 +172,7 @@ impl AzureClient {
193172
// and we want to use it in an infallible function
194173
HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| {
195174
crate::Error::Generic {
196-
store: "MicrosoftAzure",
175+
store: STORE,
197176
source: Box::new(err),
198177
}
199178
})?,
@@ -253,7 +232,7 @@ impl AzureClient {
253232
pub async fn get_request(
254233
&self,
255234
path: &Path,
256-
range: Option<Range<usize>>,
235+
options: GetOptions,
257236
head: bool,
258237
) -> Result<Response> {
259238
let credential = self.get_credential().await?;
@@ -263,17 +242,14 @@ impl AzureClient {
263242
false => Method::GET,
264243
};
265244

266-
let mut builder = self
245+
let builder = self
267246
.client
268247
.request(method, url)
269248
.header(CONTENT_LENGTH, HeaderValue::from_static("0"))
270249
.body(Bytes::new());
271250

272-
if let Some(range) = range {
273-
builder = builder.header(RANGE, format_http_range(range));
274-
}
275-
276251
let response = builder
252+
.with_get_options(options)
277253
.with_azure_authorization(&credential, &self.config.account)
278254
.send_retry(&self.config.retry_config)
279255
.await
@@ -338,8 +314,12 @@ impl AzureClient {
338314
.with_azure_authorization(&credential, &self.config.account)
339315
.send_retry(&self.config.retry_config)
340316
.await
341-
.context(CopyRequestSnafu {
342-
path: from.as_ref(),
317+
.map_err(|err| match err.status() {
318+
Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
319+
source: Box::new(err),
320+
path: to.to_string(),
321+
},
322+
_ => err.error(STORE, from.to_string()),
343323
})?;
344324

345325
Ok(())

0 commit comments

Comments
 (0)