Skip to content

Commit 8338936

Browse files
authored
support follower read (tikv#5051)
* raftstore,server: support follower read Signed-off-by: 5kbpers <[email protected]> * raftstore: fix condition check of read on replica Signed-off-by: 5kbpers <[email protected]> * raftstore: follower read waits for apply index reaches read index Signed-off-by: 5kbpers <[email protected]> * add a test of waiting for read index Signed-off-by: 5kbpers <[email protected]> * fix test_wait_for_apply_index Signed-off-by: 5kbpers <[email protected]> * dec pending reads count after follower handle read index cmd Signed-off-by: 5kbpers <[email protected]> * update comments Signed-off-by: 5kbpers <[email protected]> * remove unused file Signed-off-by: 5kbpers <[email protected]> * fix test_wait_for_apply_index Signed-off-by: 5kbpers <[email protected]> * update comments Signed-off-by: 5kbpers <[email protected]> * update test_wait_for_apply_index Signed-off-by: 5kbpers <[email protected]> * update dependency 'kvproto' Signed-off-by: 5kbpers <[email protected]>
1 parent 7a2b0de commit 8338936

File tree

8 files changed

+165
-10
lines changed

8 files changed

+165
-10
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/raftstore/store/fsm/apply.rs

+4
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,10 @@ impl ApplyDelegate {
10971097
ctx: &ApplyContext,
10981098
req: &RaftCmdRequest,
10991099
) -> Result<(RaftCmdResponse, ApplyResult)> {
1100+
fail_point!("on_apply_write_cmd", self.id() == 3, |_| {
1101+
unimplemented!();
1102+
});
1103+
11001104
let requests = req.get_requests();
11011105
let mut responses = Vec::with_capacity(requests.len());
11021106

src/raftstore/store/fsm/peer.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -2254,7 +2254,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
22542254
// ReadIndex can be processed on the replicas.
22552255
let is_read_index_request =
22562256
request.len() == 1 && request[0].get_cmd_type() == CmdType::ReadIndex;
2257-
if !(self.fsm.peer.is_leader() || is_read_index_request) {
2257+
let mut read_only = true;
2258+
for r in msg.get_requests() {
2259+
match r.get_cmd_type() {
2260+
CmdType::Get | CmdType::Snap | CmdType::ReadIndex => (),
2261+
_ => read_only = false,
2262+
}
2263+
}
2264+
let allow_follower_read = read_only && msg.get_header().get_follower_read();
2265+
if !(self.fsm.peer.is_leader() || is_read_index_request || allow_follower_read) {
22582266
self.ctx.raft_metrics.invalid_proposal.not_leader += 1;
22592267
let leader = self.fsm.peer.get_peer_from_cache(leader_id);
22602268
self.fsm.group_state = GroupState::Chaos;

src/raftstore/store/peer.rs

+22-8
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,14 @@ impl Peer {
10091009
&& !self.is_merging()
10101010
}
10111011

1012+
fn ready_to_handle_unsafe_follower_read(&self, read_index: u64) -> bool {
1013+
// Wait until the follower applies all values before the read. There is still a
1014+
// problem if the leader applies fewer values than the follower, the follower read
1015+
// could get a newer value, and after that, the leader may read a stale value,
1016+
// which violates linearizability.
1017+
self.get_store().applied_index() >= read_index && !self.is_splitting() && !self.is_merging()
1018+
}
1019+
10121020
#[inline]
10131021
fn is_splitting(&self) -> bool {
10141022
self.last_committed_split_idx > self.get_store().applied_index()
@@ -1354,18 +1362,24 @@ impl Peer {
13541362
&& read.cmds[0].0.get_requests().len() == 1
13551363
&& read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex;
13561364

1357-
if !is_read_index_request {
1358-
let term = self.term();
1359-
// Only read index request is valid.
1360-
for (_, cb) in read.cmds.drain(..) {
1361-
apply::notify_stale_req(term, cb);
1362-
}
1363-
} else {
1365+
let term = self.term();
1366+
if is_read_index_request {
13641367
for (req, cb) in read.cmds.drain(..) {
13651368
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
13661369
}
1370+
self.pending_reads.ready_cnt -= 1;
1371+
} else if self.ready_to_handle_unsafe_follower_read(read.read_index.unwrap()) {
1372+
for (req, cb) in read.cmds.drain(..) {
1373+
if req.get_header().get_follower_read() {
1374+
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
1375+
} else {
1376+
apply::notify_stale_req(term, cb);
1377+
}
1378+
}
1379+
self.pending_reads.ready_cnt -= 1;
1380+
} else {
1381+
self.pending_reads.reads.push_front(read);
13671382
}
1368-
self.pending_reads.ready_cnt -= 1;
13691383
}
13701384
}
13711385
}

src/storage/kv/raftkv.rs

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ impl<S: RaftStoreRouter> RaftKv<S> {
171171
header.set_term(ctx.get_term());
172172
}
173173
header.set_sync_log(ctx.get_sync_log());
174+
header.set_follower_read(ctx.get_follower_read());
174175
header
175176
}
176177

tests/failpoints/cases/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
mod test_bootstrap;
44
mod test_conf_change;
55
mod test_coprocessor;
6+
mod test_follower_read;
67
mod test_merge;
78
mod test_pending_peers;
89
mod test_snap;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
use std::sync::Arc;
4+
5+
use fail;
6+
use std::time::Duration;
7+
use test_raftstore::*;
8+
use tikv_util::HandyRwLock;
9+
10+
#[test]
11+
fn test_wait_for_apply_index() {
12+
let _guard = crate::setup();
13+
let mut cluster = new_server_cluster(0, 3);
14+
15+
// Increase the election tick to make this test case running reliably.
16+
configure_for_lease_read(&mut cluster, Some(50), Some(10_000));
17+
let pd_client = Arc::clone(&cluster.pd_client);
18+
pd_client.disable_default_operator();
19+
20+
let r1 = cluster.run_conf_change();
21+
cluster.must_put(b"k0", b"v0");
22+
let p2 = new_peer(2, 2);
23+
cluster.pd_client.must_add_peer(r1, p2.clone());
24+
let p3 = new_peer(3, 3);
25+
cluster.pd_client.must_add_peer(r1, p3.clone());
26+
must_get_equal(&cluster.get_engine(3), b"k0", b"v0");
27+
28+
let region = cluster.get_region(b"k0");
29+
cluster.must_transfer_leader(region.get_id(), p2.clone());
30+
31+
// Block all write cmd applying of Peer 3.
32+
fail::cfg("on_apply_write_cmd", "sleep(2000)").unwrap();
33+
cluster.must_put(b"k1", b"v1");
34+
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
35+
36+
// Peer 3 does not apply the cmd of putting 'k1' right now, then the follower read must
37+
// be blocked.
38+
must_get_none(&cluster.get_engine(3), b"k1");
39+
let mut request = new_request(
40+
region.get_id(),
41+
region.get_region_epoch().clone(),
42+
vec![new_get_cf_cmd("default", b"k1")],
43+
false,
44+
);
45+
request.mut_header().set_peer(p3.clone());
46+
request.mut_header().set_follower_read(true);
47+
let (cb, rx) = make_cb(&request);
48+
cluster
49+
.sim
50+
.rl()
51+
.async_command_on_node(3, request, cb)
52+
.unwrap();
53+
// Must timeout here
54+
assert!(rx.recv_timeout(Duration::from_millis(500)).is_err());
55+
fail::cfg("on_apply_write_cmd", "off").unwrap();
56+
57+
// After write cmd applied, the follower read will be executed.
58+
match rx.recv_timeout(Duration::from_secs(3)) {
59+
Ok(resp) => {
60+
assert_eq!(resp.get_responses().len(), 1);
61+
assert_eq!(resp.get_responses()[0].get_get().get_value(), b"v1");
62+
}
63+
Err(_) => panic!("follower read failed"),
64+
}
65+
}

tests/integrations/storage/test_raftkv.rs

+62
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use kvproto::kvrpcpb::Context;
44

55
use engine::IterOption;
66
use engine::{CfName, CF_DEFAULT};
7+
use std::thread;
8+
use std::time;
79
use test_raftstore::*;
810
use tikv::storage::kv::*;
911
use tikv::storage::{CFStatistics, Key};
@@ -123,6 +125,66 @@ fn test_read_index_on_replica() {
123125
);
124126
}
125127

128+
#[test]
129+
fn test_read_on_replica() {
130+
let count = 3;
131+
let mut cluster = new_server_cluster(0, count);
132+
cluster.run();
133+
134+
let k1 = b"k1";
135+
let (k2, v2) = (b"k2", b"v2");
136+
let (k3, v3) = (b"k3", b"v3");
137+
let (k4, v4) = (b"k4", b"v4");
138+
139+
// make sure leader has been elected.
140+
assert_eq!(cluster.must_get(k1), None);
141+
142+
let region = cluster.get_region(b"");
143+
let leader = cluster.leader_of_region(region.get_id()).unwrap();
144+
let leader_storage = cluster.sim.rl().storages[&leader.get_id()].clone();
145+
146+
let mut leader_ctx = Context::new();
147+
leader_ctx.set_region_id(region.get_id());
148+
leader_ctx.set_region_epoch(region.get_region_epoch().clone());
149+
leader_ctx.set_peer(leader.clone());
150+
151+
// write some data
152+
let peers = region.get_peers();
153+
assert_none(&leader_ctx, &leader_storage, k2);
154+
must_put(&leader_ctx, &leader_storage, k2, v2);
155+
156+
// read on follower
157+
let mut follower_peer = None;
158+
let mut follower_id = 0;
159+
for p in peers {
160+
if p.get_id() != leader.get_id() {
161+
follower_id = p.get_id();
162+
follower_peer = Some(p.clone());
163+
break;
164+
}
165+
}
166+
167+
assert!(follower_peer.is_some());
168+
let mut follower_ctx = Context::new();
169+
follower_ctx.set_region_id(region.get_id());
170+
follower_ctx.set_region_epoch(region.get_region_epoch().clone());
171+
follower_ctx.set_peer(follower_peer.as_ref().unwrap().clone());
172+
follower_ctx.set_follower_read(true);
173+
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
174+
assert_has(&follower_ctx, &follower_storage, k2, v2);
175+
176+
must_put(&leader_ctx, &leader_storage, k3, v3);
177+
assert_has(&follower_ctx, &follower_storage, k3, v3);
178+
179+
cluster.stop_node(follower_id);
180+
must_put(&leader_ctx, &leader_storage, k4, v4);
181+
cluster.run_node(follower_id).unwrap();
182+
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
183+
// sleep to ensure the follower has received a heartbeat from the leader
184+
thread::sleep(time::Duration::from_millis(300));
185+
assert_has(&follower_ctx, &follower_storage, k4, v4);
186+
}
187+
126188
fn must_put<E: Engine>(ctx: &Context, engine: &E, key: &[u8], value: &[u8]) {
127189
engine.put(ctx, Key::from_raw(key), value.to_vec()).unwrap();
128190
}

0 commit comments

Comments
 (0)