Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster pool refresh #242

Merged
merged 5 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions indy-vdr-proxy/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use super::AppState;
use indy_vdr::common::error::prelude::*;
use indy_vdr::ledger::identifiers::{CredentialDefinitionId, RevocationRegistryId, SchemaId};
use indy_vdr::pool::helpers::{perform_get_txn, perform_ledger_request};
use indy_vdr::pool::{LedgerType, Pool, PreparedRequest, RequestResult, TimingResult};
use indy_vdr::pool::{
LedgerType, Pool, PreparedRequest, RequestResult, RequestResultMeta, TimingResult,
};
use indy_vdr::resolver::did::DidUrl;
use indy_vdr::resolver::PoolResolver as Resolver;
use indy_vdr::utils::did::DidValue;
Expand All @@ -34,16 +36,16 @@ enum ResponseType {
Resolver(String),
}

impl<T> From<(RequestResult<T>, Option<TimingResult>)> for ResponseType
impl<T> From<(RequestResult<T>, RequestResultMeta)> for ResponseType
where
T: std::fmt::Display,
{
fn from(result: (RequestResult<T>, Option<TimingResult>)) -> ResponseType {
fn from(result: (RequestResult<T>, RequestResultMeta)) -> ResponseType {
match result {
(RequestResult::Reply(message), timing) => {
ResponseType::RequestReply(message.to_string(), timing)
(RequestResult::Reply(message), meta) => {
ResponseType::RequestReply(message.to_string(), meta.timing)
}
(RequestResult::Failed(err), timing) => ResponseType::RequestFailed(err, timing),
(RequestResult::Failed(err), meta) => ResponseType::RequestFailed(err, meta.timing),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion indy-vdr-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ async fn refresh_pool(
tokio::time::sleep(Duration::from_secs((delay_mins * 60 / n_pools) as u64)).await
}

let (txns, _timing) = perform_refresh(pool).await?;
let (txns, _meta) = perform_refresh(pool).await?;

let cloned_state = state.clone();
let pool_states = &cloned_state.borrow().pool_states;
Expand Down
2 changes: 1 addition & 1 deletion libindy_vdr/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "indy-vdr"
version = "0.4.0"
version = "0.4.1"
authors = [
"Hyperledger Indy Contributors <[email protected]>",
]
Expand Down
6 changes: 3 additions & 3 deletions libindy_vdr/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use once_cell::sync::Lazy;

use crate::pool::ProtocolVersion;

pub const DEFAULT_ACK_TIMEOUT: i64 = 20;
pub const DEFAULT_REPLY_TIMEOUT: i64 = 60;
pub const DEFAULT_ACK_TIMEOUT: i64 = 5;
pub const DEFAULT_REPLY_TIMEOUT: i64 = 30;
pub const DEFAULT_CONN_ACTIVE_TIMEOUT: i64 = 5;
pub const DEFAULT_CONN_REQUEST_LIMIT: usize = 5;
pub const DEFAULT_CONN_REQUEST_LIMIT: usize = 10;
pub const DEFAULT_REQUEST_READ_NODES: usize = 2;
pub const DEFAULT_FRESHNESS_TIMEOUT: u64 = 300;
pub const DEFAULT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Node1_4;
Expand Down
8 changes: 4 additions & 4 deletions libindy_vdr/src/ffi/pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::error::prelude::*;
use crate::common::handle::ResourceHandle;
use crate::pool::{
PoolBuilder, PoolRunner, PoolTransactions, RequestMethod, RequestResult, TimingResult,
PoolBuilder, PoolRunner, PoolTransactions, RequestMethod, RequestResult, RequestResultMeta,
};

use std::collections::{btree_map::Entry, BTreeMap, HashMap};
Expand Down Expand Up @@ -107,7 +107,7 @@ pub extern "C" fn indy_vdr_pool_refresh(
pool.refresh(Box::new(
move |result| {
let errcode = match result {
Ok((old_txns, new_txns, _timing)) => {
Ok((old_txns, new_txns, _meta)) => {
if let Some(new_txns) = new_txns {
// We must spawn a new thread here because this callback
// is being run in the PoolRunner's thread, and if we drop
Expand Down Expand Up @@ -227,10 +227,10 @@ pub extern "C" fn indy_vdr_pool_get_verifiers(
}

fn handle_request_result(
result: VdrResult<(RequestResult<String>, Option<TimingResult>)>,
result: VdrResult<(RequestResult<String>, RequestResultMeta)>,
) -> (ErrorCode, String) {
match result {
Ok((reply, _timing)) => match reply {
Ok((reply, _meta)) => match reply {
RequestResult::Reply(body) => (ErrorCode::Success, body),
RequestResult::Failed(err) => {
let code = ErrorCode::from(err.kind());
Expand Down
2 changes: 1 addition & 1 deletion libindy_vdr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
//! // Create a new GET_TXN request and dispatch it
//! let ledger_type = 1; // 1 identifies the Domain ledger, see pool::LedgerType
//! let seq_no = 1; // Transaction sequence number
//! let (result, _timing) = block_on(perform_get_txn(&pool, ledger_type, seq_no)).unwrap();
//! let (result, _meta) = block_on(perform_get_txn(&pool, ledger_type, seq_no)).unwrap();
#![cfg_attr(feature = "fatal_warnings", deny(warnings))]
#![recursion_limit = "1024"] // for select! macro usage
Expand Down
4 changes: 3 additions & 1 deletion libindy_vdr/src/pool/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ impl std::fmt::Debug for PoolTransactions {

impl std::fmt::Display for PoolTransactions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let vec_json = unwrap_or_return!(self.encode_json(), Err(std::fmt::Error {}));
let Ok(vec_json) = self.encode_json() else {
return Err(std::fmt::Error {});
};
let txns = SJsonValue::from(vec_json);
write!(f, "{}", txns)
}
Expand Down
53 changes: 22 additions & 31 deletions libindy_vdr/src/pool/handlers/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,49 @@ use crate::common::error::prelude::*;
use crate::common::merkle_tree::MerkleTree;

use super::types::Message;
use super::{check_cons_proofs, PoolRequest, RequestEvent, RequestResult, TimingResult};
use super::{check_cons_proofs, PoolRequest, RequestEvent, RequestResult, RequestResultMeta};

pub async fn handle_catchup_request<R: PoolRequest>(
request: &mut R,
merkle_tree: MerkleTree,
target_mt_root: Vec<u8>,
target_mt_size: usize,
) -> VdrResult<(RequestResult<Vec<Vec<u8>>>, Option<TimingResult>)> {
) -> VdrResult<(RequestResult<Vec<Vec<u8>>>, RequestResultMeta)> {
trace!("catchup request");
let config = request.pool_config();
let ack_timeout = config.ack_timeout;
request.send_to_any(config.request_read_nodes, ack_timeout)?;
loop {
match request.next().await {
Some(RequestEvent::Received(node_alias, _message, parsed)) => {
match parsed {
Message::CatchupRep(cr) => {
match process_catchup_reply(
&merkle_tree,
&target_mt_root,
target_mt_size,
cr.load_txns()?,
cr.consProof.clone(),
) {
Ok(txns) => {
return Ok((RequestResult::Reply(txns), request.get_timing()))
}
Err(_) => {
request.clean_timeout(node_alias)?;
request.send_to_any(1, ack_timeout)?;
}
Some(RequestEvent::Received(node_alias, _message, parsed)) => match parsed {
Message::CatchupRep(cr) => {
match process_catchup_reply(
&merkle_tree,
&target_mt_root,
target_mt_size,
cr.load_txns()?,
cr.consProof.clone(),
) {
Ok(txns) => return Ok((RequestResult::Reply(txns), request.get_meta())),
Err(_) => {
request.clean_timeout(node_alias)?;
request.send_to_any(1, ack_timeout)?;
}
}
_ => {
// FIXME could be more tolerant of ReqNACK etc
return Ok((
RequestResult::Failed(err_msg(
VdrErrorKind::Connection,
"Unexpected response",
)),
request.get_timing(),
));
}
}
}
_ => {
debug!("Unexpected reply from {}", &node_alias);
request.clean_timeout(node_alias)?;
request.send_to_any(1, ack_timeout)?;
}
},
Some(RequestEvent::Timeout(_node_alias)) => {
request.send_to_any(1, ack_timeout)?;
}
None => {
return Ok((
RequestResult::Failed(VdrErrorKind::PoolTimeout.into()),
request.get_timing(),
request.get_meta(),
))
}
}
Expand Down
71 changes: 38 additions & 33 deletions libindy_vdr/src/pool/handlers/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::utils::base64;
use super::types::Message;
use super::{
min_consensus, ConsensusState, HashableValue, PoolRequest, ReplyState, RequestEvent,
RequestResult, TimingResult,
RequestResult, RequestResultMeta,
};

pub async fn handle_consensus_request<R: PoolRequest>(
Expand All @@ -21,7 +21,7 @@ pub async fn handle_consensus_request<R: PoolRequest>(
state_proof_timestamps: (Option<u64>, Option<u64>),
as_read_request: bool,
custom_state_proof_parser: Option<&BoxedSPParser>,
) -> VdrResult<(RequestResult<String>, Option<TimingResult>)> {
) -> VdrResult<(RequestResult<String>, RequestResultMeta)> {
trace!("consensus request");
let config = request.pool_config();
let node_keys = request.node_keys();
Expand Down Expand Up @@ -72,40 +72,45 @@ pub async fn handle_consensus_request<R: PoolRequest>(
.clone(),
)
};
if cnt > f
|| (request_with_state_proof
&& check_state_proof(
result,
f,
&DEFAULT_GENERATOR,
&node_keys,
&raw_msg,
state_proof_key.as_deref(),
state_proof_timestamps,
last_write_time,
config.freshness_threshold,
custom_state_proof_parser,
))
{
if state_proof_key.is_some() {
if request_with_state_proof {
let sp_result = check_state_proof(
result,
f,
&DEFAULT_GENERATOR,
&node_keys,
&raw_msg,
state_proof_key.as_deref(),
state_proof_timestamps,
last_write_time,
config.freshness_threshold,
custom_state_proof_parser,
);
let verified = sp_result.is_verified();
request.set_state_proof_result(node_alias.clone(), sp_result);
if verified {
debug!(
"State proof verification succeeded for node: {}, sp_key: '{}'",
node_alias,
base64::encode(state_proof_key.as_ref().unwrap()),
);
} else {
debug!(
"State proof verification failed for node: {}, sp_key: '{}'",
node_alias,
base64::encode(state_proof_key.as_ref().unwrap()),
);
}
return Ok((
RequestResult::Reply(if cnt > f { soonest } else { raw_msg }),
request.get_timing(),
));
} else if state_proof_key.is_some() {
debug!(
"State proof verification failed for node: {}, sp_key: '{}'",
node_alias,
base64::encode(state_proof_key.as_ref().unwrap()),
);
request.clean_timeout(node_alias)?;
true
if verified || cnt > f {
return Ok((
RequestResult::Reply(if verified { raw_msg } else { soonest }),
request.get_meta(),
));
} else {
request.clean_timeout(node_alias)?;
true
}
} else if cnt > f {
return Ok((RequestResult::Reply(soonest), request.get_meta()));
} else {
false
}
Expand All @@ -132,7 +137,7 @@ pub async fn handle_consensus_request<R: PoolRequest>(
RequestResult::Failed(
VdrErrorKind::PoolRequestFailed(raw_msg).into(),
),
request.get_timing(),
request.get_meta(),
));
}
}
Expand All @@ -156,14 +161,14 @@ pub async fn handle_consensus_request<R: PoolRequest>(
VdrErrorKind::PoolTimeout,
"Request was interrupted",
)),
request.get_timing(),
request.get_meta(),
))
}
};
let total_replies = replies.len();
if total_replies >= total_nodes_count {
let err = replies.get_error();
return Ok((RequestResult::Failed(err), request.get_timing()));
return Ok((RequestResult::Failed(err), request.get_meta()));
}
if resend {
request.send_to_any(1, config.ack_timeout)?;
Expand Down
8 changes: 4 additions & 4 deletions libindy_vdr/src/pool/handlers/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use futures_util::stream::StreamExt;

use crate::common::error::prelude::*;

use super::types::{Message, NodeReplies, RequestResult, TimingResult};
use super::types::{Message, NodeReplies, RequestResult, RequestResultMeta};
use super::{PoolRequest, ReplyState, RequestEvent};

pub async fn handle_full_request<R: PoolRequest>(
request: &mut R,
nodes_to_send: Option<Vec<String>>,
local_timeout: Option<i64>,
) -> VdrResult<(RequestResult<NodeReplies<String>>, Option<TimingResult>)> {
) -> VdrResult<(RequestResult<NodeReplies<String>>, RequestResultMeta)> {
trace!("full request");
let timeout = local_timeout.unwrap_or(request.pool_config().reply_timeout);
let req_reply_count = if let Some(nodes) = nodes_to_send {
Expand Down Expand Up @@ -46,12 +46,12 @@ pub async fn handle_full_request<R: PoolRequest>(
VdrErrorKind::PoolTimeout,
"Request was interrupted",
)),
request.get_timing(),
request.get_meta(),
))
}
};
if replies.len() == req_reply_count {
return Ok((RequestResult::Reply(replies.result()), request.get_timing()));
return Ok((RequestResult::Reply(replies.result()), request.get_meta()));
}
}
}
11 changes: 4 additions & 7 deletions libindy_vdr/src/pool/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::utils::{base58, ValidationError};
use super::requests::{PoolRequest, RequestEvent};
use super::types::{
self, CatchupReq, LedgerStatus, LedgerType, Message, NodeReplies, ProtocolVersion,
RequestResult, SingleReply, TimingResult,
RequestResult, RequestResultMeta, SingleReply,
};

mod catchup;
Expand Down Expand Up @@ -119,16 +119,13 @@ impl<K: Eq + Hash, T: Eq + Hash> ConsensusState<K, T> {
}
}

fn max_entry(&self) -> Option<(&K, usize)> {
self.inner
.iter()
.map(|(key, set)| (key, set.len()))
.max_by_key(|entry| entry.1)
fn max_entry(&self) -> Option<(&K, &HashSet<T>)> {
self.inner.iter().max_by_key(|entry| entry.1.len())
}

#[allow(dead_code)]
fn max_len(&self) -> usize {
self.max_entry().map(|entry| entry.1).unwrap_or(0)
self.max_entry().map(|entry| entry.1.len()).unwrap_or(0)
}

pub fn insert(&mut self, key: K, reply: T) -> &mut HashSet<T> {
Expand Down
Loading