Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(redis): Implement parallel cmd execution of Redis calls #4118

Merged
merged 12 commits into from
Oct 14, 2024
120 changes: 91 additions & 29 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::error::Error;
use std::fmt;
use std::io::ErrorKind;
use std::time::Duration;
use std::{fmt, io, thread};

use r2d2::{Builder, ManageConnection, Pool, PooledConnection};
pub use redis;
Expand Down Expand Up @@ -34,28 +35,68 @@ enum ConnectionInner<'a> {
Single(&'a mut redis::Connection),
}

impl ConnectionLike for ConnectionInner<'_> {
fn req_packed_command(&mut self, cmd: &[u8]) -> redis::RedisResult<redis::Value> {
impl ConnectionInner<'_> {
fn get_connections(&mut self) -> Vec<&mut (dyn ConnectionLike + Send)> {
match self {
ConnectionInner::Cluster(ref mut con) => con.req_packed_command(cmd),
ConnectionInner::Cluster(con) => vec![con as &mut (dyn ConnectionLike + Send)],
ConnectionInner::MultiWrite {
primary: primary_connection,
secondaries: secondary_connections,
} => {
let primary_result = primary_connection.req_packed_command(cmd);
let mut connections = primary_connection.get_connections();
for secondary_connection in secondary_connections.iter_mut() {
if let Err(error) = secondary_connection.req_packed_command(cmd) {
connections.extend(secondary_connection.get_connections());
}

connections
}
ConnectionInner::Single(con) => vec![con as &mut (dyn ConnectionLike + Send)],
}
}
}

impl ConnectionLike for ConnectionInner<'_> {
fn req_packed_command(&mut self, cmd: &[u8]) -> redis::RedisResult<redis::Value> {
let mut results = thread::scope(|s| {
let handles: Vec<_> = self
.get_connections()
.into_iter()
.map(|connection| s.spawn(move || connection.req_packed_command(cmd)))
.collect();

handles
.into_iter()
.map(|handle| handle.join())
.collect::<Vec<_>>()
})
.into_iter();

let Some(Ok(primary_result)) = results.next() else {
relay_log::error!("sending cmd to primary Redis instance failed");
return Err(io::Error::new(
ErrorKind::Other,
"failed to send cmd to primary Redis instance",
)
.into());
};

for result in results {
match result {
Ok(result) => {
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"sending cmd to the secondary Redis instance failed",
);
}
}

primary_result
Err(_) => {
relay_log::error!("sending cmd to secondary Redis instance failed");
}
}
ConnectionInner::Single(ref mut con) => con.req_packed_command(cmd),
}

primary_result
}

fn req_packed_commands(
Expand All @@ -64,59 +105,80 @@ impl ConnectionLike for ConnectionInner<'_> {
offset: usize,
count: usize,
) -> redis::RedisResult<Vec<redis::Value>> {
match self {
ConnectionInner::Cluster(ref mut con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::MultiWrite {
primary: primary_connection,
secondaries: secondary_connections,
} => {
let primary_result = primary_connection.req_packed_commands(cmd, offset, count);
for secondary_connection in secondary_connections.iter_mut() {
if let Err(error) = secondary_connection.req_packed_commands(cmd, offset, count)
{
let mut results = thread::scope(|s| {
let handles: Vec<_> = self
.get_connections()
.into_iter()
.map(|connection| {
s.spawn(move || connection.req_packed_commands(cmd, offset, count))
})
.collect();

handles
.into_iter()
.map(|handle| handle.join())
.collect::<Vec<_>>()
})
.into_iter();

let Some(Ok(primary_result)) = results.next() else {
relay_log::error!("sending cmds to primary Redis instance failed");
return Err(io::Error::new(
ErrorKind::Other,
"failed to send cmds to primary Redis instance",
)
.into());
};

for result in results {
match result {
Ok(result) => {
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"sending cmds to the secondary Redis instance failed",
"sending cmds tos the secondary Redis instance failed",
);
}
}

primary_result
Err(_) => {
relay_log::error!("sending cmds to secondary Redis instance failed");
}
}
ConnectionInner::Single(ref mut con) => con.req_packed_commands(cmd, offset, count),
}

primary_result
}

fn get_db(&self) -> i64 {
match self {
ConnectionInner::Cluster(ref con) => con.get_db(),
ConnectionInner::Cluster(con) => con.get_db(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.get_db(),
ConnectionInner::Single(ref con) => con.get_db(),
ConnectionInner::Single(con) => con.get_db(),
}
}

fn check_connection(&mut self) -> bool {
match self {
ConnectionInner::Cluster(ref mut con) => con.check_connection(),
ConnectionInner::Cluster(con) => con.check_connection(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.check_connection(),
ConnectionInner::Single(ref mut con) => con.check_connection(),
ConnectionInner::Single(con) => con.check_connection(),
}
}

fn is_open(&self) -> bool {
match self {
ConnectionInner::Cluster(ref con) => con.is_open(),
ConnectionInner::Cluster(con) => con.is_open(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.is_open(),
ConnectionInner::Single(ref con) => con.is_open(),
ConnectionInner::Single(con) => con.is_open(),
}
}
}
Expand Down
Loading