Skip to content

Commit b25a972

Browse files
committedApr 26, 2019
import: added test case to ensure the fix in tikv#4566 works
Signed-off-by: kennytm <[email protected]>
1 parent 16f1913 commit b25a972

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed
 

‎src/import/import.rs

+38-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use super::stream::*;
2121
use super::{Config, Error, Result};
2222

2323
const MAX_RETRY_TIMES: u64 = 5;
24-
const RETRY_INTERVAL_SECS: u64 = 3;
24+
#[cfg(not(test))]
25+
const RETRY_INTERVAL: Duration = Duration::from_secs(3);
26+
#[cfg(test)]
27+
const RETRY_INTERVAL: Duration = Duration::from_millis(10);
2528
const STORE_UNAVAILABLE_WAIT_INTERVAL_MILLIS: u64 = 20000;
2629

2730
/// ImportJob is responsible for importing data stored in an engine to a cluster.
@@ -290,7 +293,7 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
290293

291294
for i in 0..MAX_RETRY_TIMES {
292295
if i != 0 {
293-
thread::sleep(Duration::from_secs(RETRY_INTERVAL_SECS));
296+
thread::sleep(RETRY_INTERVAL);
294297
}
295298

296299
let range = self.sst.meta.get_range().clone();
@@ -444,3 +447,36 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
444447
}
445448
}
446449
}
450+
451+
#[cfg(test)]
452+
mod tests {
453+
use super::*;
454+
use crate::import::test_helpers::*;
455+
use kvproto::import_kvpb::{Mutation, Mutation_OP, WriteBatch};
456+
457+
#[test]
458+
fn test_import_failure() {
459+
let (_dir, engine) = new_engine("test_import_failure");
460+
let mut client = MockClient::new();
461+
client.add_region_range(b"a", b"z");
462+
463+
let mut wb = WriteBatch::new();
464+
let mut mutation = Mutation::new();
465+
mutation.set_op(Mutation_OP::Put);
466+
mutation.set_key(b"key".to_vec());
467+
mutation.set_value(b"value".to_vec());
468+
wb.mut_mutations().push(mutation);
469+
wb.set_commit_ts(1);
470+
engine.write(wb).unwrap();
471+
engine.flush(true).unwrap();
472+
473+
let cfg = Config::default();
474+
let mut job = ImportJob::new(cfg, client, engine);
475+
476+
job.client.set_upload_sst_successful(false);
477+
job.run().unwrap_err();
478+
479+
job.client.set_upload_sst_successful(true);
480+
job.run().unwrap();
481+
}
482+
}

‎src/import/test_helpers.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tikv_util::security::SecurityConfig;
2222

2323
use super::client::*;
2424
use super::common::*;
25-
use super::Result;
25+
use super::{Error, Result};
2626

2727
pub fn calc_data_crc32(data: &[u8]) -> u32 {
2828
let mut digest = crc32::Digest::new(crc32::IEEE);
@@ -82,6 +82,7 @@ pub struct MockClient {
8282
counter: Arc<AtomicUsize>,
8383
regions: Arc<Mutex<HashMap<u64, Region>>>,
8484
scatter_regions: Arc<Mutex<HashMap<u64, Region>>>,
85+
is_upload_sst_successful: bool,
8586
}
8687

8788
impl MockClient {
@@ -90,6 +91,7 @@ impl MockClient {
9091
counter: Arc::new(AtomicUsize::new(1)),
9192
regions: Arc::new(Mutex::new(HashMap::new())),
9293
scatter_regions: Arc::new(Mutex::new(HashMap::new())),
94+
is_upload_sst_successful: true,
9395
}
9496
}
9597

@@ -114,6 +116,10 @@ impl MockClient {
114116
let regions = self.scatter_regions.lock().unwrap();
115117
regions.get(&id).map(|r| RegionInfo::new(r.clone(), None))
116118
}
119+
120+
pub fn set_upload_sst_successful(&mut self, success: bool) {
121+
self.is_upload_sst_successful = success;
122+
}
117123
}
118124

119125
impl ImportClient for MockClient {
@@ -176,4 +182,16 @@ impl ImportClient for MockClient {
176182
fn is_space_enough(&self, _: u64, _: u64) -> Result<bool> {
177183
Ok(true)
178184
}
185+
186+
fn upload_sst(&self, _: u64, _: UploadStream) -> Result<UploadResponse> {
187+
if self.is_upload_sst_successful {
188+
Ok(UploadResponse::new())
189+
} else {
190+
Err(Error::ImportSSTJobFailed("mock failure".to_string()))
191+
}
192+
}
193+
194+
fn ingest_sst(&self, _: u64, _: IngestRequest) -> Result<IngestResponse> {
195+
Ok(IngestResponse::new())
196+
}
179197
}

0 commit comments

Comments
 (0)