Skip to content

Commit 7acb763

Browse files
committedJun 24, 2024
get, set, echo done
0 parents  commit 7acb763

10 files changed

+709
-0
lines changed
 

‎.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* text=auto

‎.gitignore

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
debug/
2+
target/
3+
4+
**/*.rs.bk
5+
6+
*.pdb
7+
dump.rdb
8+

‎Cargo.lock

+351
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# DON'T EDIT THIS!
2+
#
3+
# Codecrafters relies on this file being intact to run tests successfully. Any changes
4+
# here will not reflect when CodeCrafters tests your code, and might even cause build
5+
# failures.
6+
#
7+
# DON'T EDIT THIS!
8+
[package]
9+
name = "redis-starter-rust"
10+
version = "0.1.0"
11+
authors = ["Codecrafters <hello@codecrafters.io>"]
12+
edition = "2021"
13+
14+
# DON'T EDIT THIS!
15+
#
16+
# Codecrafters relies on this file being intact to run tests successfully. Any changes
17+
# here will not reflect when CodeCrafters tests your code, and might even cause build
18+
# failures.
19+
#
20+
# DON'T EDIT THIS!
21+
[dependencies]
22+
anyhow = "1.0.59" # error handling
23+
bytes = "1.3.0" # helps manage buffers
24+
thiserror = "1.0.32" # error handling
25+
tokio = { version = "1.23.0", features = ["full"] } # async networking

‎README.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Minimal Redis Implementation in Rust.

‎concurrent.sh

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
3+
redis-cli ping &
4+
redis-cli ping &
5+
redis-cli ping

‎spawn_redis_server.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/sh
2+
exec cargo run \
3+
--quiet \
4+
--release \
5+
--target-dir=/tmp/codecrafters-redis-target \
6+
--manifest-path $(dirname $0)/Cargo.toml \
7+
-- "$@"

‎src/lib.rs

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::{
2+
sync::{mpsc, Arc, Mutex},
3+
thread,
4+
};
5+
6+
7+
pub struct ThreadPool {
8+
workers: Vec<Worker>,
9+
sender: Option<mpsc::Sender<Job>>,
10+
}
11+
12+
type Job = Box<dyn FnOnce() + Send + 'static>;
13+
14+
impl ThreadPool {
15+
/// Create a new ThreadPool.
16+
///
17+
/// The size is the number of threads in the pool.
18+
///
19+
/// # Panics
20+
///
21+
/// The `new` function will panic if the size is zero.
22+
#[tokio::main]
23+
pub async fn new(size: usize) -> ThreadPool {
24+
assert!(size > 0);
25+
26+
let (sender, receiver) = mpsc::channel();
27+
28+
let receiver = Arc::new(Mutex::new(receiver));
29+
30+
let mut workers = Vec::with_capacity(size);
31+
32+
for id in 0..size {
33+
workers.push(Worker::new(id, Arc::clone(&receiver)));
34+
}
35+
36+
ThreadPool {
37+
workers,
38+
sender: Some(sender),
39+
}
40+
}
41+
#[tokio::main]
42+
pub async fn execute<F>(&self, f: F)
43+
where
44+
F: FnOnce() + Send + 'static,
45+
{
46+
let job = Box::new(f);
47+
48+
self.sender.as_ref().unwrap().send(job).unwrap();
49+
}
50+
51+
pub async fn async_execute<F>(&self, f: F)
52+
where
53+
F: FnOnce() + Send + 'static,
54+
{
55+
let job = Box::new(f);
56+
57+
self.sender.as_ref().unwrap().send(job).unwrap();
58+
}
59+
}
60+
61+
//for each thread to finish the requests they're working on before closing
62+
impl Drop for ThreadPool {
63+
#[tokio::main]
64+
async fn drop(&mut self) {
65+
drop(self.sender.take());
66+
67+
for worker in &mut self.workers {
68+
println!("Shutting down worker {}", worker.id);
69+
70+
if let Some(thread) = worker.thread.take() {
71+
thread.join().unwrap();
72+
}
73+
}
74+
}
75+
}
76+
77+
struct Worker {
78+
id: usize,
79+
thread: Option<thread::JoinHandle<()>>,
80+
}
81+
82+
impl Worker {
83+
#[tokio::main]
84+
async fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
85+
let thread = thread::spawn(move || loop {
86+
let message = receiver.lock().unwrap().recv();
87+
88+
match message {
89+
Ok(job) => {
90+
println!("Worker {id} got a job; executing.");
91+
92+
job();
93+
}
94+
Err(_) => {
95+
println!("Worker {id} disconnected; shutting down.");
96+
break;
97+
}
98+
}
99+
});
100+
101+
Worker {
102+
id,
103+
thread: Some(thread),
104+
}
105+
}
106+
}

‎src/main.rs

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// use std::{
2+
// io::{Read, Write},
3+
// };
4+
use redis_starter_rust::ThreadPool;
5+
use std::collections::HashMap;
6+
use tokio::net::{TcpListener, TcpStream};
7+
use anyhow::Result;
8+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
9+
use std::sync::{Arc, Mutex};
10+
use resp::Value;
11+
12+
mod resp;
13+
14+
#[tokio::main]
15+
async fn main() {
16+
// listening for connections
17+
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
18+
//creating a global k-v store, which gets updated each time a client(prev/new) adds a new (k,v)
19+
//Arc is used
20+
let kv_store: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::<String,String>::new()));
21+
// for stream in listener.incoming() {
22+
//HANDLING CONCURRENT CLIENTS, NEW THREAD FOR EACH CLIENTi/SERVER stream
23+
loop{ //INSTEAD OF USING for stream in listener.incoming() and synchronously iterating over each stream, we are asynchronously iterating over each stream till the data from the buffer ends
24+
let stream = listener.accept().await; // listener.accept().await ASYNCHRONOUSLY WAITS FOR A NEW CONNECTION, INSTEAD OF AN SYNCHRONOUS ITERATOR LIKE listener.incoming() which takes each new connection and puts inside it
25+
let mut kv_store = Arc::clone(&kv_store);
26+
match stream {
27+
Ok((stream, _)) => {
28+
//SPAWNING A NEW THREAD FOR EACH CLIENT REQ->S
29+
//tried using threadpool and pool.execute, turns out each thread in it was unable to handle ASYNC read/write tasks
30+
//the below spawns a new ASYNC THREAD for each new client request to the redis server
31+
tokio::spawn(async move{
32+
handle_conn(stream, &mut kv_store).await;
33+
});
34+
35+
//ECHO print command
36+
}
37+
Err(e) => {
38+
println!("{e}");
39+
}
40+
}
41+
}
42+
}
43+
44+
async fn handle_conn(stream: TcpStream, kv_store: &mut Arc<std::sync::Mutex<HashMap<String, String>>>) {
45+
let mut handler = resp::RespHandler::new(stream);
46+
// let mut kv_store = HashMap::<String,String>::new();
47+
loop{
48+
let value = handler.read_value().await.unwrap(); //ALL PARSING HAPPENS IN THS FUNCTION
49+
50+
let res = if let Some(v) = value {
51+
let (command, args) = extract_command(v).unwrap();
52+
match command.as_str().to_lowercase().as_str() {
53+
"ping" => Value::SimpleString("PONG".to_string()),
54+
"echo" => args.first().unwrap().clone(),
55+
"set" => {store_key_value(unpack_bulk_str(args[0].clone()).unwrap(), unpack_bulk_str(args[1].clone()).unwrap(), kv_store)},
56+
"get" => {get_value_from_key(unpack_bulk_str(args[0].clone()).unwrap(), kv_store)}, //by default, consider a input string as bulk string
57+
c => panic!("Cannot handle command {}", c),
58+
}
59+
} else {
60+
break;
61+
};
62+
handler.write_value(res).await;
63+
}
64+
}
65+
//makes sense to store in a global shared hashmap
66+
67+
fn store_key_value(key: String, value: String, kv_store: &mut Arc<std::sync::Mutex<HashMap<String, String>>>) -> Value{
68+
kv_store.lock().unwrap().insert(key, value);
69+
println!("{:?}", kv_store.lock().unwrap());
70+
Value::SimpleString("OK".to_string())
71+
}
72+
73+
fn get_value_from_key(key: String, kv_store: &mut Arc<std::sync::Mutex<HashMap<String, String>>>) -> Value{
74+
println!("{:?}", kv_store);
75+
match kv_store.lock().unwrap().get(&key) {
76+
Some(v) => Value::BulkString(v.to_string()),
77+
None => Value::SimpleString("(null)".to_string())
78+
}
79+
}
80+
81+
//extracting the command used after redis-cli, along with the args after the command[redis-cli <command> [..args]]
82+
// returning (command, [..args])
83+
fn extract_command(value: Value) -> Result<(String, Vec<Value>)> {
84+
match value {
85+
Value::Array(a) => { //[command, ..arguments]
86+
Ok((
87+
unpack_bulk_str(a.first().unwrap().clone())?, //command
88+
a.into_iter().skip(1).collect(), //[..arguments]
89+
))
90+
},
91+
_ => Err(anyhow::anyhow!("Unexpected command format")),
92+
}
93+
}
94+
fn unpack_bulk_str(value: Value) -> Result<String> {
95+
match value {
96+
Value::BulkString(s) => Ok(s),
97+
_ => Err(anyhow::anyhow!("Expected command to be a bulk string"))
98+
}
99+
}

‎src/resp.rs

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, stream};
2+
use bytes::{Buf, BytesMut};
3+
use anyhow::{Error, Ok, Result};
4+
5+
#[derive(Clone, Debug)]
6+
pub enum Value {
7+
SimpleString(String),
8+
BulkString(String),
9+
Array(Vec<Value>),
10+
}
11+
impl Value {
12+
pub fn serialize(self) -> String {
13+
match self {
14+
Value::SimpleString(s) => format!("+{}\r\n", s),
15+
Value::BulkString(s) => format!("${}\r\n{}\r\n", s.chars().count(), s),
16+
_ => panic!("Unsupported value for serialize"),
17+
}
18+
}
19+
}
20+
21+
pub struct RespHandler {
22+
stream: TcpStream,
23+
buffer: BytesMut,
24+
}
25+
26+
impl RespHandler{
27+
pub fn new(stream: TcpStream) -> Self{
28+
RespHandler {
29+
stream,
30+
buffer: BytesMut::with_capacity(512),
31+
}
32+
}
33+
34+
pub async fn read_value(&mut self) -> Result<Option<Value>>{
35+
let mut buf = [0; 512];
36+
let read_count = self.stream.read_buf(&mut self.buffer).await?; //async READ using tokio
37+
if read_count == 0 {
38+
return Ok(None);
39+
}
40+
// convert [buffer] => [command, ..args]
41+
let (v, _) = parse(self.buffer.split())?;
42+
Ok(Some(v))
43+
}
44+
45+
pub async fn write_value(&mut self, response: Value){
46+
self.stream.write(response.serialize().as_bytes()).await.unwrap();
47+
}
48+
}
49+
50+
fn parse(buffer: BytesMut) -> Result<(Value, usize)> {
51+
match buffer[0] as char {
52+
'+' => parse_simple_string(buffer),
53+
'*' => parse_array(buffer),
54+
'$' => parse_bulk_string(buffer),
55+
_ => Err(anyhow::anyhow!("Not a known value type {:?}", buffer)),
56+
}
57+
}
58+
59+
fn parse_simple_string(buffer: BytesMut) -> Result<(Value, usize)> {
60+
if let Some((line, len)) = read_until_crlf(&buffer[1..]) {
61+
let string = String::from_utf8(line.to_vec()).unwrap();
62+
return Ok((Value::SimpleString(string), len + 1))
63+
}
64+
return Err(anyhow::anyhow!("Invalid string {:?}", buffer));
65+
}
66+
67+
fn parse_array(buffer: BytesMut) -> Result<(Value, usize)> {
68+
let (array_length, mut bytes_consumed) = if let Some((line, len)) = read_until_crlf(&buffer[1..]) {
69+
let array_length = parse_int(line)?;
70+
(array_length, len + 1)
71+
} else {
72+
return Err(anyhow::anyhow!("Invalid array format {:?}", buffer));
73+
};
74+
let mut items = vec![];
75+
for _ in 0..array_length {
76+
let (array_item, len) = parse(BytesMut::from(&buffer[bytes_consumed..]))?;
77+
items.push(array_item);
78+
bytes_consumed += len;
79+
}
80+
return Ok((Value::Array(items), bytes_consumed))
81+
}
82+
83+
fn parse_bulk_string(buffer: BytesMut) -> Result<(Value, usize)> {
84+
let (bulk_str_len, bytes_consumed) = if let Some((line, len)) = read_until_crlf(&buffer[1..]) {
85+
let bulk_str_len = parse_int(line)?;
86+
(bulk_str_len, len + 1)
87+
} else {
88+
return Err(anyhow::anyhow!("Invalid array format {:?}", buffer));
89+
};
90+
let end_of_bulk_str = bytes_consumed + bulk_str_len as usize;
91+
let total_parsed = end_of_bulk_str + 2;
92+
Ok((Value::BulkString(String::from_utf8(buffer[bytes_consumed..end_of_bulk_str].to_vec())?), total_parsed))
93+
}
94+
95+
fn read_until_crlf(buffer: &[u8]) -> Option<(&[u8], usize)> {
96+
for i in 1..buffer.len() {
97+
if buffer[i - 1] == b'\r' && buffer[i] == b'\n' {
98+
return Some((&buffer[0..(i - 1)], i + 1));
99+
}
100+
}
101+
return None;
102+
}
103+
104+
fn parse_int(buffer: &[u8]) -> Result<i64> {
105+
Ok(String::from_utf8(buffer.to_vec())?.parse::<i64>()?)
106+
}

0 commit comments

Comments
 (0)
Please sign in to comment.