Skip to content

Commit

Permalink
lighthouse: add heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
d4l3k committed Nov 3, 2024
1 parent 5d2e55f commit dbb5f64
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 25 deletions.
7 changes: 7 additions & 0 deletions proto/torchft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,15 @@ message LighthouseQuorumResponse {
Quorum quorum = 1;
}

message LighthouseHeartbeatRequest {
string replica_id = 1;
}

message LighthouseHeartbeatResponse {}

service LighthouseService {
rpc Quorum (LighthouseQuorumRequest) returns (LighthouseQuorumResponse);
rpc Heartbeat (LighthouseHeartbeatRequest) returns (LighthouseHeartbeatResponse);
}

message ManagerQuorumRequest {
Expand Down
57 changes: 44 additions & 13 deletions src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use tonic::{Request, Response, Status};

use crate::torchftpb::{
lighthouse_service_server::{LighthouseService, LighthouseServiceServer},
LighthouseQuorumRequest, LighthouseQuorumResponse, Quorum, QuorumMember,
LighthouseHeartbeatRequest, LighthouseHeartbeatResponse, LighthouseQuorumRequest,
LighthouseQuorumResponse, Quorum, QuorumMember,
};

struct QuorumMemberDetails {
Expand All @@ -34,6 +35,10 @@ struct State {
participants: HashMap<String, QuorumMemberDetails>,
prev_quorum: Option<Quorum>,
quorum_id: i64,

// heartbeat information
// replica_id -> last heartbeat
heartbeats: HashMap<String, Instant>,
}

pub struct Lighthouse {
Expand Down Expand Up @@ -74,6 +79,7 @@ impl Lighthouse {
channel: tx,
prev_quorum: None,
quorum_id: 0,
heartbeats: HashMap::new(),
}),
opt: opt,
})
Expand Down Expand Up @@ -176,15 +182,15 @@ impl Lighthouse {
Ok(())
}

pub async fn _run_quorum(self: Arc<Self>) -> Result<()> {
async fn _run_quorum(self: Arc<Self>) -> Result<()> {
loop {
self.clone()._quorum_tick().await?;

sleep(Duration::from_millis(self.opt.quorum_tick_ms)).await;
}
}

pub async fn _run_grpc(self: Arc<Self>) -> Result<()> {
async fn _run_grpc(self: Arc<Self>) -> Result<()> {
let bind = self.opt.bind.parse()?;
info!("Lighthouse listening on {}", bind);

Expand Down Expand Up @@ -248,6 +254,21 @@ impl LighthouseService for Arc<Lighthouse> {

Ok(Response::new(reply))
}

async fn heartbeat(
&self,
request: Request<LighthouseHeartbeatRequest>,
) -> Result<Response<LighthouseHeartbeatResponse>, Status> {
let replica_id = request.into_inner().replica_id;

{
let mut state = self.state.lock().await;
state.heartbeats.insert(replica_id, Instant::now());
}

let reply = LighthouseHeartbeatResponse {};
Ok(Response::new(reply))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -364,18 +385,28 @@ mod tests {
.await
.unwrap();

let request = tonic::Request::new(LighthouseQuorumRequest {
requester: Some(QuorumMember {
{
let request = tonic::Request::new(LighthouseHeartbeatRequest {
replica_id: "foo".to_string(),
address: "".to_string(),
store_address: "".to_string(),
step: 10,
}),
});
});

let response = client.heartbeat(request).await.unwrap();
}

{
let request = tonic::Request::new(LighthouseQuorumRequest {
requester: Some(QuorumMember {
replica_id: "foo".to_string(),
address: "".to_string(),
store_address: "".to_string(),
step: 10,
}),
});

let response = client.quorum(request).await.unwrap();
let quorum = response.into_inner().quorum.unwrap();
assert_eq!(quorum.participants.len(), 1);
let response = client.quorum(request).await.unwrap();
let quorum = response.into_inner().quorum.unwrap();
assert_eq!(quorum.participants.len(), 1);
}

lighthouse_task.abort();
}
Expand Down
45 changes: 33 additions & 12 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use std::time::Duration;
use anyhow::Result;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tonic::transport::Server;
use tonic::{Request, Response, Status};

Expand All @@ -21,9 +23,9 @@ use crate::torchftpb::lighthouse_service_client::LighthouseServiceClient;
use crate::torchftpb::manager_service_client::ManagerServiceClient;
use crate::torchftpb::{
manager_service_server::{ManagerService, ManagerServiceServer},
CheckpointAddressRequest, CheckpointAddressResponse, LighthouseQuorumRequest,
ManagerQuorumRequest, ManagerQuorumResponse, Quorum, QuorumMember, ShouldCommitRequest,
ShouldCommitResponse,
CheckpointAddressRequest, CheckpointAddressResponse, LighthouseHeartbeatRequest,
LighthouseQuorumRequest, ManagerQuorumRequest, ManagerQuorumResponse, Quorum, QuorumMember,
ShouldCommitRequest, ShouldCommitResponse,
};

#[cfg(not(test))]
Expand Down Expand Up @@ -99,6 +101,19 @@ impl Manager {
}

pub async fn run(self: Arc<Self>) -> Result<()> {
let mut set = JoinSet::new();

set.spawn(self.clone()._run_heartbeat());

set.spawn(self.clone()._run_grpc());

while let Some(res) = set.join_next().await {
res??;
}
Ok(())
}

async fn _run_grpc(self: Arc<Self>) -> Result<()> {
let bind = self.bind.parse()?;
info!("Manager {} listening on {}", self.replica_id, bind);

Expand All @@ -109,6 +124,19 @@ impl Manager {
.map_err(|e| e.into())
}

async fn _run_heartbeat(self: Arc<Self>) -> Result<()> {
let mut client = self.lighthouse_client_new().await?;
loop {
let request = tonic::Request::new(LighthouseHeartbeatRequest {
replica_id: self.replica_id.clone(),
});

let response = client.heartbeat(request).await;

sleep(Duration::from_millis(100)).await;
}
}

async fn lighthouse_client_new(&self) -> Result<LighthouseServiceClient<Channel>> {
info!(
"Manager: connecting to lighthouse at {}",
Expand Down Expand Up @@ -333,17 +361,14 @@ mod tests {
#[tokio::test]
async fn test_should_commit() -> Result<()> {
let manager = Manager::new(
"repid".to_string(),
"rep_id".to_string(),
"lighthouse".to_string(),
"addr".to_string(),
"0.0.0.0:29531".to_string(),
"store_addr".to_string(),
2,
);
println!("manager spawn");
let manager_fut = tokio::spawn(manager.run());

println!("should_commit1");
let manager_fut = tokio::spawn(manager._run_grpc());

let fut_a = tokio::spawn(should_commit(0, true));
let fut_b = tokio::spawn(should_commit(1, true));
Expand All @@ -353,8 +378,6 @@ mod tests {
assert!(resp_a.should_commit);
assert!(resp_b.should_commit);

println!("should_commit2");

let fut_a = tokio::spawn(should_commit(0, true));
let fut_b = tokio::spawn(should_commit(1, false));
let resp_a = fut_a.await??;
Expand All @@ -363,8 +386,6 @@ mod tests {
assert!(!resp_a.should_commit);
assert!(!resp_b.should_commit);

println!("aborting");

manager_fut.abort();

Ok(())
Expand Down

0 comments on commit dbb5f64

Please sign in to comment.