Skip to content

Commit 8599d4a

Browse files
committed
feat(network): support network for simple redis
1 parent 7d42d6b commit 8599d4a

File tree

8 files changed

+609
-12
lines changed

8 files changed

+609
-12
lines changed

Cargo.lock

Lines changed: 446 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,24 @@ anyhow = "^1.0"
1111
bytes = "^1.6.1"
1212
dashmap = "6.0.1"
1313
enum_dispatch = "^0.3.13"
14-
lazy_static = "^1.5.0"
14+
futures = { version = "^0.3.30", default-features = false }
15+
# lazy_static = "^1.5.0"
1516
thiserror = "^1.0.62"
17+
tokio = { version = "^1.37.0", features = [
18+
"rt",
19+
"rt-multi-thread",
20+
"macros",
21+
"net",
22+
] }
23+
tokio-stream = "^0.1.15"
24+
tokio-util = { version = "^0.7.10", features = ["codec"] }
25+
tracing = "^0.1.40"
26+
tracing-subscriber = { version = "^0.3.18", features = ["env-filter"] }
27+
winnow = { version = "^0.6.8", features = ["simd"] }
28+
29+
# [dev-dependencies]
30+
# criterion = { version = "^0.5.1", features = ["html_reports"] }
31+
32+
# [[bench]]
33+
# name = "resp"
34+
# harness = false

src/cmd/map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::{
22
cmd::{extract_args, validate_command, Get, Set},
3-
CommandError, RespArray, RespFrame, RespNull,
3+
RespArray, RespFrame, RespNull,
44
};
55

6-
use super::{CommandExecutor, RESP_OK};
6+
use super::{CommandError, CommandExecutor, RESP_OK};
77

88
impl CommandExecutor for Get {
99
fn execute(self, backend: &crate::Backend) -> RespFrame {

src/cmd/mod.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use std::sync::LazyLock;
2+
13
use enum_dispatch::enum_dispatch;
2-
use lazy_static::lazy_static;
34
use thiserror::Error;
45

56
use crate::{Backend, RespArray, RespError, RespFrame, SimpleString};
@@ -12,9 +13,13 @@ mod map;
1213
// 1. init in runtime
1314
// 2. thread safe
1415
// 3. improve performance
15-
lazy_static! {
16-
static ref RESP_OK: RespFrame = SimpleString::new("OK").into();
17-
}
16+
// lazy_static! {
17+
// static ref RESP_OK: RespFrame = SimpleString::new("OK").into();
18+
// }
19+
20+
// > Rust 1.80.0
21+
// https://blog.rust-lang.org/2024/07/25/Rust-1.80.0.html
22+
static RESP_OK: LazyLock<RespFrame> = LazyLock::new(|| SimpleString::new("OK").into());
1823

1924
// region: --- Traits
2025
#[enum_dispatch]
@@ -108,6 +113,18 @@ impl TryFrom<RespArray> for Command {
108113
}
109114
}
110115
}
116+
117+
impl TryFrom<RespFrame> for Command {
118+
type Error = CommandError;
119+
fn try_from(v: RespFrame) -> Result<Self, Self::Error> {
120+
match v {
121+
RespFrame::Array(array) => array.try_into(),
122+
_ => Err(CommandError::InvalidCommand(
123+
"Command must be an Array".to_string(),
124+
)),
125+
}
126+
}
127+
}
111128
// endregion: --- impls
112129

113130
// region: --- functions

src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
mod backend;
2-
mod cmd;
2+
33
mod resp;
44

5+
pub mod cmd;
6+
pub mod network;
7+
58
pub use backend::*;
6-
pub use cmd::*;
9+
// pub use cmd::*;
10+
// pub use network::*;
711
pub use resp::*;

src/main.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1-
fn main() {
2-
println!("Hello, world!");
1+
use anyhow::Result;
2+
use simple_redis::{network, Backend};
3+
use tokio::net::TcpListener;
4+
use tracing::info;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<()> {
8+
tracing_subscriber::fmt::init();
9+
10+
let addr = "0.0.0.0:6389";
11+
let listener = TcpListener::bind(addr).await?;
12+
info!("Simple-Redis-Server is listening on {}", addr);
13+
14+
let backend = Backend::new();
15+
loop {
16+
let (stream, remote_addr) = listener.accept().await?;
17+
info!("Accepted connection from {}", remote_addr);
18+
let cloned_backend = backend.clone();
19+
tokio::spawn(async move {
20+
// handling of stream
21+
match network::stream_handler(stream, cloned_backend).await {
22+
Ok(_) => info!("Connection from {} closed", remote_addr),
23+
Err(e) => info!("Connection from {} closed with error: {}", remote_addr, e),
24+
}
25+
});
26+
}
327
}

src/network.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use anyhow::Result;
2+
use futures::SinkExt;
3+
use tokio::net::TcpStream;
4+
use tokio_stream::StreamExt;
5+
use tokio_util::codec::{Decoder, Encoder, Framed};
6+
use tracing::info;
7+
8+
use crate::{
9+
cmd::{Command, CommandExecutor},
10+
Backend, RespDecode, RespEncode, RespError, RespFrame,
11+
};
12+
13+
#[derive(Debug)]
14+
struct RedisRequest {
15+
frame: RespFrame,
16+
backend: Backend,
17+
}
18+
19+
#[derive(Debug)]
20+
struct RedisResponse {
21+
frame: RespFrame,
22+
}
23+
24+
#[derive(Debug)]
25+
struct RespFrameCodec;
26+
27+
pub async fn stream_handler(stream: TcpStream, backend: Backend) -> Result<()> {
28+
// how to get a frame from the stream?
29+
let mut framed = Framed::new(stream, RespFrameCodec);
30+
loop {
31+
match framed.next().await {
32+
Some(Ok(frame)) => {
33+
info!("Received frame: {:?}", frame);
34+
let request = RedisRequest {
35+
frame,
36+
backend: backend.clone(),
37+
};
38+
let response = request_handler(request).await?;
39+
info!("Sending response: {:?}", response.frame);
40+
framed.send(response.frame).await?;
41+
}
42+
Some(Err(e)) => return Err(e),
43+
None => return Ok(()),
44+
}
45+
}
46+
}
47+
48+
// NOTE: need a backend to process the frame
49+
// async fn request_handler(request: RespFrame) -> Result<RespFrame> {
50+
// todo!()
51+
// }
52+
async fn request_handler(request: RedisRequest) -> Result<RedisResponse> {
53+
let (frame, backend) = (request.frame, request.backend);
54+
let cmd = Command::try_from(frame)?;
55+
info!("Executing command: {:?}", cmd);
56+
let frame = cmd.execute(&backend);
57+
Ok(RedisResponse { frame })
58+
}
59+
60+
impl Encoder<RespFrame> for RespFrameCodec {
61+
type Error = anyhow::Error;
62+
63+
fn encode(&mut self, item: RespFrame, dst: &mut bytes::BytesMut) -> Result<()> {
64+
let encoded = item.encode();
65+
dst.extend_from_slice(&encoded);
66+
Ok(())
67+
}
68+
}
69+
70+
impl Decoder for RespFrameCodec {
71+
type Item = RespFrame;
72+
type Error = anyhow::Error;
73+
74+
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<RespFrame>> {
75+
match RespFrame::decode(src) {
76+
Ok(frame) => Ok(Some(frame)),
77+
Err(RespError::NotComplete) => Ok(None),
78+
Err(e) => Err(e.into()),
79+
}
80+
}
81+
}

src/resp/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ pub enum RespError {
5858
// fn expect_length(buf: &[u8]) -> Result<usize, RespError>;
5959
// }
6060

61+
// 关于 enum 的知识点
62+
// 枚举变体: 直接包含数据, 结构体类型, 无数据
63+
64+
// 元组变体: 当枚举变体直接包含一组命名未指定的值时-> SimpleString(String) 和 Integer(i64),
65+
// 结构体变体: 枚举的变体被定义为包含具有名称的字段-> StructVariant { name: String, id: i32 }
66+
// 单元变体: RespNull
67+
6168
// 之所以要定义一些新的结构体, 是因为要在实现 trait 的时候, 要区分开这些类型
6269
#[enum_dispatch(RespEncode)]
6370
#[derive(Debug, Clone, PartialEq)]

0 commit comments

Comments
 (0)